This is an automated email from the ASF dual-hosted git repository.
ferenc-csaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new da3b970 [FLINK-39106] Support `ROW` and `ARRAY<ROW>` data types in
DynamoDB API / SQL sink
da3b970 is described below
commit da3b9703e387642cbb8a5860348170cad3fd9b84
Author: ddebowczyk92 <[email protected]>
AuthorDate: Wed May 27 18:51:09 2026 +0200
[FLINK-39106] Support `ROW` and `ARRAY<ROW>` data types in DynamoDB API /
SQL sink
---
.../table/RowDataToAttributeValueConverter.java | 103 +++++++++++++--
.../table/DynamoDbDynamicSinkFactoryTest.java | 7 +-
.../RowDataToAttributeValueConverterTest.java | 141 ++++++++++++++++++++-
.../src/test/resources/create-table.sql | 3 +
4 files changed, 238 insertions(+), 16 deletions(-)
diff --git
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java
index dbfa66b..08d6234 100644
---
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java
+++
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java
@@ -19,14 +19,20 @@
package org.apache.flink.connector.dynamodb.table;
import org.apache.flink.annotation.Internal;
+import
org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverter;
import
org.apache.flink.connector.dynamodb.table.converter.ArrayAttributeConverterProvider;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.CollectionDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.types.Row;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter;
import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider;
+import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType;
import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
@@ -34,6 +40,8 @@ import
software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.stream.IntStream;
import static org.apache.flink.table.data.RowData.createFieldGetter;
@@ -84,18 +92,63 @@ public class RowDataToAttributeValueConverter {
DataTypes.Field field,
RowData.FieldGetter fieldGetter) {
+ EnhancedType<Object> enhancedType =
getEnhancedType(field.getDataType());
return builder.addAttribute(
- getEnhancedType(field.getDataType()),
- a ->
- a.name(field.getName())
- .getter(
- rowData ->
-
DataStructureConverters.getConverter(
-
field.getDataType())
- .toExternalOrNull(
-
fieldGetter.getFieldOrNull(
-
rowData)))
- .setter(((rowData, t) -> {})));
+ enhancedType,
+ a -> {
+ a.name(field.getName())
+ .getter(
+ rowData ->
+
DataStructureConverters.getConverter(
+
field.getDataType())
+ .toExternalOrNull(
+
fieldGetter.getFieldOrNull(rowData)))
+ .setter(((rowData, t) -> {}));
+ buildRowAttributeConverter(field.getDataType())
+ .ifPresent(a::attributeConverter);
+ });
+ }
+
+ private Optional<AttributeConverter> buildRowAttributeConverter(DataType
dataType) {
+ if (LogicalTypeRoot.ROW == dataType.getLogicalType().getTypeRoot()) {
+ return
Optional.of(createRowDocumentConverter(buildRowTableSchema(dataType)));
+ }
+ if (dataType instanceof CollectionDataType) {
+ DataType elementDataType = ((CollectionDataType)
dataType).getElementDataType();
+ if (LogicalTypeRoot.ROW ==
elementDataType.getLogicalType().getTypeRoot()) {
+ AttributeConverter<Row> elementConverter =
+
createRowDocumentConverter(buildRowTableSchema(elementDataType));
+ return Optional.of(
+ new ArrayAttributeConverter<>(
+ elementConverter,
EnhancedType.of(Row[].class)));
+ }
+ }
+ return Optional.empty();
+ }
+
+ private static AttributeConverter<Row> createRowDocumentConverter(
+ TableSchema<Row> tableSchema) {
+ return new AttributeConverter<Row>() {
+ @Override
+ public AttributeValue transformFrom(Row input) {
+ return AttributeValue.builder().m(tableSchema.itemToMap(input,
false)).build();
+ }
+
+ @Override
+ public Row transformTo(AttributeValue input) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public EnhancedType<Row> type() {
+ return EnhancedType.of(Row.class);
+ }
+
+ @Override
+ public AttributeValueType attributeValueType() {
+ return AttributeValueType.M;
+ }
+ };
}
private <T> EnhancedType<T> getEnhancedType(DataType dataType) {
@@ -104,8 +157,36 @@ public class RowDataToAttributeValueConverter {
EnhancedType.mapOf(
getEnhancedType(((KeyValueDataType)
dataType).getKeyDataType()),
getEnhancedType(((KeyValueDataType)
dataType).getValueDataType()));
+ } else if (LogicalTypeRoot.ROW ==
dataType.getLogicalType().getTypeRoot()) {
+ return (EnhancedType<T>) EnhancedType.of(Row.class);
} else {
return (EnhancedType<T>)
EnhancedType.of(dataType.getConversionClass());
}
}
+
+ private TableSchema<Row> buildRowTableSchema(DataType dataType) {
+ StaticTableSchema.Builder<Row> builder =
TableSchema.builder(Row.class);
+ AttributeConverterProvider newAttributeConverterProvider =
+ new ArrayAttributeConverterProvider();
+ builder.attributeConverterProviders(
+ newAttributeConverterProvider,
AttributeConverterProvider.defaultProvider());
+
+ final List<DataTypes.Field> fields = DataType.getFields(dataType);
+ IntStream.range(0, fields.size())
+ .forEach(
+ idx -> {
+ final DataTypes.Field field = fields.get(idx);
+ final DataType fieldDataType = field.getDataType();
+ builder.addAttribute(
+ getEnhancedType(fieldDataType),
+ a -> {
+ a.name(field.getName())
+ .getter(row ->
row.getField(idx))
+ .setter((row, t) -> {});
+
buildRowAttributeConverter(fieldDataType)
+
.ifPresent(a::attributeConverter);
+ });
+ });
+ return builder.build();
+ }
}
diff --git
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
index 862ab42..f76205e 100644
---
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
+++
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactoryTest.java
@@ -336,7 +336,12 @@ public class DynamoDbDynamicSinkFactoryTest {
Column.physical("some_timestamp_array",
DataTypes.ARRAY(DataTypes.TIMESTAMP())),
Column.physical(
"some_timestamp_ltz_array",
DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ())),
- Column.physical("some_map", DataTypes.MAP(DataTypes.STRING(),
DataTypes.BIGINT())));
+ Column.physical("some_map", DataTypes.MAP(DataTypes.STRING(),
DataTypes.BIGINT())),
+ Column.physical(
+ "some_row",
+ DataTypes.ROW(
+ DataTypes.FIELD("inner_string",
DataTypes.STRING()),
+ DataTypes.FIELD("inner_int",
DataTypes.INT()))));
}
private ResolvedSchema defaultSinkSchema() {
diff --git
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java
index daba6aa..974a19f 100644
---
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java
+++
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverterTest.java
@@ -37,6 +37,7 @@ import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static java.util.Collections.singletonMap;
import static org.assertj.core.api.Assertions.assertThat;
@@ -69,7 +70,7 @@ public class RowDataToAttributeValueConverterTest {
RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
new RowDataToAttributeValueConverter(dataType, true);
Map<String, AttributeValue> actualResult =
-
rowDataToAttributeValueConverter.convertRowData(createElement(null));
+
rowDataToAttributeValueConverter.convertRowData(createElement((Object) null));
assertThat(actualResult.isEmpty()).isEqualTo(true);
}
@@ -254,6 +255,93 @@ public class RowDataToAttributeValueConverterTest {
assertThat(actualResult).containsAllEntriesOf(expectedResult);
}
+ @Test
+ void testRowDataType() {
+ DataType dataType =
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ "innerRow",
+ DataTypes.ROW(
+ DataTypes.FIELD("f1",
DataTypes.STRING()),
+ DataTypes.FIELD("f2",
DataTypes.INT()))));
+ RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+ new RowDataToAttributeValueConverter(dataType);
+ Map<String, AttributeValue> actualResult =
+ rowDataToAttributeValueConverter.convertRowData(
+
createElement(createElement(StringData.fromString("some string"), 123)));
+
+ Map<String, AttributeValue> innerMap =
+ Map.of(
+ "f1", AttributeValue.builder().s("some
string").build(),
+ "f2", AttributeValue.builder().n("123").build());
+ Map<String, AttributeValue> expectedResult =
+ singletonMap("innerRow",
AttributeValue.builder().m(innerMap).build());
+
+ assertThat(actualResult).containsAllEntriesOf(expectedResult);
+ }
+
+ @Test
+ void testRowDataTypeNullInnerField() {
+ DataType dataType =
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ "innerRow",
+ DataTypes.ROW(
+ DataTypes.FIELD("f1",
DataTypes.STRING()),
+ DataTypes.FIELD("f2",
DataTypes.INT()))));
+ RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+ new RowDataToAttributeValueConverter(dataType);
+ Map<String, AttributeValue> actualResult =
+ rowDataToAttributeValueConverter.convertRowData(
+
createElement(createElement(StringData.fromString("value"), null)));
+
+ Map<String, AttributeValue> innerMap =
+ Map.of(
+ "f1", AttributeValue.builder().s("value").build(),
+ "f2", AttributeValue.builder().nul(true).build());
+ Map<String, AttributeValue> expectedResult =
+ singletonMap("innerRow",
AttributeValue.builder().m(innerMap).build());
+
+ assertThat(actualResult).containsAllEntriesOf(expectedResult);
+ }
+
+ @Test
+ void testNestedRowDataType() {
+ DataType dataType =
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ "outer",
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ "middle",
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ "leaf",
DataTypes.STRING()))),
+ DataTypes.FIELD("sibling",
DataTypes.INT()))));
+ RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+ new RowDataToAttributeValueConverter(dataType);
+
+ GenericRowData leafRow = new GenericRowData(1);
+ leafRow.setField(0, StringData.fromString("deep"));
+ GenericRowData outerRow = new GenericRowData(2);
+ outerRow.setField(0, leafRow);
+ outerRow.setField(1, 42);
+
+ Map<String, AttributeValue> actualResult =
+
rowDataToAttributeValueConverter.convertRowData(createElement(outerRow));
+
+ Map<String, AttributeValue> leafMap =
+ Map.of("leaf", AttributeValue.builder().s("deep").build());
+ Map<String, AttributeValue> outerMap =
+ Map.of(
+ "middle", AttributeValue.builder().m(leafMap).build(),
+ "sibling", AttributeValue.builder().n("42").build());
+ Map<String, AttributeValue> expectedResult =
+ singletonMap("outer",
AttributeValue.builder().m(outerMap).build());
+
+ assertThat(actualResult).containsAllEntriesOf(expectedResult);
+ }
+
@Test
void testStringArray() {
String key = "key";
@@ -560,9 +648,54 @@ public class RowDataToAttributeValueConverterTest {
assertThat(actualResult).containsAllEntriesOf(expectedResult);
}
- private RowData createElement(Object value) {
- GenericRowData element = new GenericRowData(1);
- element.setField(0, value);
+ @Test
+ void testRowDataTypeArray() {
+ DataType dataType =
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ "rows",
+ DataTypes.ARRAY(
+ DataTypes.ROW(
+ DataTypes.FIELD("name",
DataTypes.STRING()),
+ DataTypes.FIELD("value",
DataTypes.INT())))));
+ RowDataToAttributeValueConverter rowDataToAttributeValueConverter =
+ new RowDataToAttributeValueConverter(dataType);
+
+ GenericRowData row1 = new GenericRowData(2);
+ row1.setField(0, StringData.fromString("first"));
+ row1.setField(1, 1);
+ GenericRowData row2 = new GenericRowData(2);
+ row2.setField(0, StringData.fromString("second"));
+ row2.setField(1, 2);
+
+ Map<String, AttributeValue> actualResult =
+ rowDataToAttributeValueConverter.convertRowData(
+ createElement(new GenericArrayData(new RowData[]
{row1, row2})));
+
+ Map<String, AttributeValue> row1Map =
+ Map.of(
+ "name", AttributeValue.builder().s("first").build(),
+ "value", AttributeValue.builder().n("1").build());
+ Map<String, AttributeValue> row2Map =
+ Map.of(
+ "name", AttributeValue.builder().s("second").build(),
+ "value", AttributeValue.builder().n("2").build());
+ Map<String, AttributeValue> expectedResult =
+ singletonMap(
+ "rows",
+ AttributeValue.builder()
+ .l(
+
AttributeValue.builder().m(row1Map).build(),
+
AttributeValue.builder().m(row2Map).build())
+ .build());
+
+ assertThat(actualResult).containsAllEntriesOf(expectedResult);
+ }
+
+ private RowData createElement(Object... values) {
+ final int valuesLength = values.length;
+ GenericRowData element = new GenericRowData(valuesLength);
+ IntStream.range(0, valuesLength).forEach(idx -> element.setField(idx,
values[idx]));
return element;
}
diff --git
a/flink-connector-aws/flink-connector-dynamodb/src/test/resources/create-table.sql
b/flink-connector-aws/flink-connector-dynamodb/src/test/resources/create-table.sql
index 3c47cab..9655b24 100644
---
a/flink-connector-aws/flink-connector-dynamodb/src/test/resources/create-table.sql
+++
b/flink-connector-aws/flink-connector-dynamodb/src/test/resources/create-table.sql
@@ -15,6 +15,8 @@ CREATE TABLE dynamo_db_table (
`some_time` TIME,
`some_timestamp` TIMESTAMP(3),
`some_timestamp_ltz` TIMESTAMP_LTZ(5),
+ `some_row` ROW<myField INT, myOtherField String>,
+ `some_row_in_row` ROW<myField INT, myInnerRow ROW<innerRowField INT,
innerRowOtherField String>>,
`some_char_array` ARRAY<CHAR>,
`some_varchar_array` ARRAY<VARCHAR>,
`some_string_array` ARRAY<STRING>,
@@ -29,6 +31,7 @@ CREATE TABLE dynamo_db_table (
`some_time_array` ARRAY<TIME>,
`some_timestamp_array` ARRAY<TIMESTAMP(3)>,
`some_timestamp_ltz_array` ARRAY<TIMESTAMP_LTZ(5)>,
+ `some_rows_array` ARRAY<ROW<myField INT, myOtherField STRING>>,
`some_string_map` MAP<STRING,STRING>,
`some_boolean_map` MAP<STRING,BOOLEAN>
) PARTITIONED BY ( partition_key )