hlteoh37 commented on code in PR #15:
URL: 
https://github.com/apache/flink-connector-aws/pull/15#discussion_r1024629827


##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java:
##########
@@ -0,0 +1,50 @@
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequest;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+/** Implementation of an {@link ElementConverter} specific for DynamoDb sink. 
*/

Review Comment:
   done



##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java:
##########
@@ -0,0 +1,198 @@
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSink;
+import org.apache.flink.connector.base.table.sink.AsyncDynamicTableSinkBuilder;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbSink;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbSinkBuilder;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequest;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * A {@link DynamicTableSink} that describes how to create a {@link 
DynamoDbSink} from a logical
+ * description.
+ */
+@Internal
+public class DynamoDbDynamicSink extends 
AsyncDynamicTableSink<DynamoDbWriteRequest>
+        implements SupportsPartitioning {
+
+    private final String destinationTableName;
+    private final boolean failOnError;
+    private final Properties dynamoDbClientProperties;
+    private final DataType physicalDataType;
+    private final Set<String> overwriteByPartitionKeys;
+
+    protected DynamoDbDynamicSink(
+            @Nullable Integer maxBatchSize,
+            @Nullable Integer maxInFlightRequests,
+            @Nullable Integer maxBufferedRequests,
+            @Nullable Long maxBufferSizeInBytes,
+            @Nullable Long maxTimeInBufferMS,
+            String destinationTableName,
+            boolean failOnError,
+            Properties dynamoDbClientProperties,
+            DataType physicalDataType,
+            Set<String> overwriteByPartitionKeys) {
+        super(
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBufferSizeInBytes,
+                maxTimeInBufferMS);
+        this.destinationTableName = destinationTableName;
+        this.failOnError = failOnError;
+        this.dynamoDbClientProperties = dynamoDbClientProperties;
+        this.physicalDataType = physicalDataType;
+        this.overwriteByPartitionKeys = overwriteByPartitionKeys;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
+        // TODO: We can support consuming from CDC streams here
+        return ChangelogMode.insertOnly();
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        DynamoDbSinkBuilder<RowData> builder =
+                DynamoDbSink.<RowData>builder()
+                        .setDestinationTableName(destinationTableName)
+                        .setFailOnError(failOnError)
+                        .setOverwriteByPartitionKeys(new 
ArrayList<>(overwriteByPartitionKeys))
+                        .setDynamoDbProperties(dynamoDbClientProperties)
+                        .setElementConverter(new 
RowDataElementConverter(physicalDataType));
+
+        addAsyncOptionsToSinkBuilder(builder);
+
+        // TODO: check if parallelism needed here
+        return SinkV2Provider.of(builder.build());
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        return new DynamoDbDynamicSink(
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBufferSizeInBytes,
+                maxTimeInBufferMS,
+                destinationTableName,
+                failOnError,
+                dynamoDbClientProperties,
+                physicalDataType,
+                overwriteByPartitionKeys);
+    }
+
+    @Override
+    public String asSummaryString() {
+        // TODO: check when this is called/returned
+        return "DynamoDB";
+    }
+
+    @Override
+    public void applyStaticPartition(Map<String, String> partitions) {
+        this.overwriteByPartitionKeys.addAll(partitions.keySet());
+    }
+
+    public static DynamoDbDynamicTableSinkBuilder builder() {
+        return new DynamoDbDynamicTableSinkBuilder();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        DynamoDbDynamicSink that = (DynamoDbDynamicSink) o;
+        return failOnError == that.failOnError
+                && Objects.equals(destinationTableName, 
that.destinationTableName)
+                && Objects.equals(dynamoDbClientProperties, 
that.dynamoDbClientProperties)
+                && Objects.equals(physicalDataType, that.physicalDataType)
+                && Objects.equals(overwriteByPartitionKeys, 
that.overwriteByPartitionKeys);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                super.hashCode(),
+                destinationTableName,
+                failOnError,
+                dynamoDbClientProperties,
+                physicalDataType,
+                overwriteByPartitionKeys);
+    }

Review Comment:
   Yep, these were only used for tests. Removed and improved tests.



##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java:
##########
@@ -0,0 +1,50 @@
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequest;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+/** Implementation of an {@link ElementConverter} specific for DynamoDb sink. 
*/
+public class RowDataElementConverter implements ElementConverter<RowData, 
DynamoDbWriteRequest> {
+
+    private final DataType physicalDataType;
+    private final RowDataToAttributeValueConverter 
rowDataToAttributeValueConverter;
+
+    public RowDataElementConverter(DataType physicalDataType) {
+        this.physicalDataType = physicalDataType;
+        this.rowDataToAttributeValueConverter =
+                new RowDataToAttributeValueConverter(physicalDataType);
+    }
+
+    @Override
+    public DynamoDbWriteRequest apply(RowData element, SinkWriter.Context 
context) {
+        switch (element.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                return getPutRequest(element);
+            case UPDATE_BEFORE:
+            case DELETE:
+                return getDeleteRequest(element);
+            default:
+                throw new TableException("Unsupported message kind: " + 
element.getRowKind());
+        }
+    }
+
+    private DynamoDbWriteRequest getPutRequest(RowData row) {
+        return DynamoDbWriteRequest.builder()
+                .setType(DynamoDbWriteRequestType.PUT)
+                .setItem(rowDataToAttributeValueConverter.convertRowData(row))
+                .build();
+    }
+
+    private DynamoDbWriteRequest getDeleteRequest(RowData row) {
+        return DynamoDbWriteRequest.builder()
+                .setType(DynamoDbWriteRequestType.DELETE)
+                .setItem(rowDataToAttributeValueConverter.convertRowData(row))
+                .build();
+    }

Review Comment:
   Ok, done!



##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java:
##########
@@ -0,0 +1,72 @@
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.DataType;
+
+import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
+import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/** Converts from {@link RowData} to {@link AttributeValue}. */
+public class RowDataToAttributeValueConverter implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final DataType physicalDataType;
+    private transient TableSchema<RowData> tableSchema;
+
+    public RowDataToAttributeValueConverter(DataType physicalDataType) {
+        this.physicalDataType = physicalDataType;
+        this.tableSchema = createTableSchema();
+    }
+
+    private StaticTableSchema<RowData> createTableSchema() {
+        List<DataTypes.Field> fields = DataType.getFields(physicalDataType);
+        StaticTableSchema.Builder<RowData> builder = 
TableSchema.builder(RowData.class);
+        for (int i = 0; i < fields.size(); i++) {
+            DataTypes.Field field = fields.get(i);
+            RowData.FieldGetter fieldGetter =
+                    createFieldGetter(field.getDataType().getLogicalType(), i);
+
+            builder =
+                    addAttribute(
+                            builder, field.getDataType().getConversionClass(), 
field, fieldGetter);
+        }
+        return builder.build();
+    }
+
+    private <T> StaticTableSchema.Builder<RowData> addAttribute(
+            StaticTableSchema.Builder<RowData> builder,
+            Class<T> convertedClass,
+            DataTypes.Field field,
+            RowData.FieldGetter fieldGetter) {
+
+        return builder.addAttribute(
+                convertedClass,
+                a ->
+                        a.name(field.getName())
+                                .getter(
+                                        rowData ->
+                                                (T)
+                                                        
DataStructureConverters.getConverter(
+                                                                        
field.getDataType())
+                                                                .toExternal(
+                                                                        
fieldGetter.getFieldOrNull(
+                                                                               
 rowData)))
+                                .setter(((rowData, t) -> {})));
+    }

Review Comment:
   :O Can't think of a nice way to simplify this...! It has 2 nested lambdas, 
probably why spotless went mad...



##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java:
##########
@@ -0,0 +1,24 @@
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.configuration.ReadableConfig;
+
+import static 
org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.FAIL_ON_ERROR;
+import static 
org.apache.flink.connector.dynamodb.table.DynamoDbConnectorOptions.TABLE_NAME;
+
+/** DynamoDb specific configuration. */
+public class DynamoDbConfiguration {

Review Comment:
   Done



##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataToAttributeValueConverter.java:
##########
@@ -0,0 +1,72 @@
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.DataType;
+
+import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
+import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/** Converts from {@link RowData} to {@link AttributeValue}. */
+public class RowDataToAttributeValueConverter implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final DataType physicalDataType;
+    private transient TableSchema<RowData> tableSchema;
+
+    public RowDataToAttributeValueConverter(DataType physicalDataType) {
+        this.physicalDataType = physicalDataType;
+        this.tableSchema = createTableSchema();
+    }
+
+    private StaticTableSchema<RowData> createTableSchema() {
+        List<DataTypes.Field> fields = DataType.getFields(physicalDataType);
+        StaticTableSchema.Builder<RowData> builder = 
TableSchema.builder(RowData.class);
+        for (int i = 0; i < fields.size(); i++) {
+            DataTypes.Field field = fields.get(i);
+            RowData.FieldGetter fieldGetter =
+                    createFieldGetter(field.getDataType().getLogicalType(), i);
+
+            builder =
+                    addAttribute(
+                            builder, field.getDataType().getConversionClass(), 
field, fieldGetter);
+        }
+        return builder.build();
+    }
+
+    private <T> StaticTableSchema.Builder<RowData> addAttribute(
+            StaticTableSchema.Builder<RowData> builder,
+            Class<T> convertedClass,
+            DataTypes.Field field,
+            RowData.FieldGetter fieldGetter) {
+
+        return builder.addAttribute(
+                convertedClass,
+                a ->
+                        a.name(field.getName())
+                                .getter(
+                                        rowData ->
+                                                (T)
+                                                        
DataStructureConverters.getConverter(
+                                                                        
field.getDataType())
+                                                                .toExternal(
+                                                                        
fieldGetter.getFieldOrNull(
+                                                                               
 rowData)))
+                                .setter(((rowData, t) -> {})));
+    }
+
+    public Map<String, AttributeValue> convertRowData(RowData row) {

Review Comment:
   done



##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbConfiguration.java:
##########
@@ -0,0 +1,24 @@
+package org.apache.flink.connector.dynamodb.table;

Review Comment:
   Done. Thanks for flagging



##########
flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java:
##########
@@ -0,0 +1,50 @@
+package org.apache.flink.connector.dynamodb.table;
+
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequest;
+import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+/** Implementation of an {@link ElementConverter} specific for DynamoDb sink. 
*/
+public class RowDataElementConverter implements ElementConverter<RowData, 
DynamoDbWriteRequest> {
+
+    private final DataType physicalDataType;
+    private final RowDataToAttributeValueConverter 
rowDataToAttributeValueConverter;
+
+    public RowDataElementConverter(DataType physicalDataType) {
+        this.physicalDataType = physicalDataType;
+        this.rowDataToAttributeValueConverter =
+                new RowDataToAttributeValueConverter(physicalDataType);
+    }
+
+    @Override
+    public DynamoDbWriteRequest apply(RowData element, SinkWriter.Context 
context) {
+        switch (element.getRowKind()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                return getPutRequest(element);
+            case UPDATE_BEFORE:

Review Comment:
   Good spot. This was leftover from initial rough-cut implementation. 
`UPDATE_BEFORE` doesn't really make sense in the context of DDB sink since we 
have partition keys. Changed this to unsupported. Also specified that we only 
support `ChangelogMode.upsert()`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to