dannycranmer commented on a change in pull request #18518:
URL: https://github.com/apache/flink/pull/18518#discussion_r830970295



##########
File path: flink-connectors/flink-connector-dynamodb/pom.xml
##########
@@ -0,0 +1,211 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.15-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <artifactId>flink-connector-dynamodb</artifactId>
+       <name>Flink : Connectors : DynamoDB</name>
+       <properties>
+               <aws.sdk.version>2.17.116</aws.sdk.version>
+               <commons-lang3.version>3.11</commons-lang3.version>

Review comment:
       Can you use the managed version from parent instead? 
https://github.com/apache/flink/blob/master/pom.xml#L478

##########
File path: flink-connectors/flink-connector-dynamodb/pom.xml
##########
@@ -0,0 +1,211 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.15-SNAPSHOT</version>

Review comment:
       Please rebase this PR on master, which is now Flink 1.16

##########
File path: flink-connectors/flink-connector-dynamodb/pom.xml
##########
@@ -0,0 +1,211 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.15-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <artifactId>flink-connector-dynamodb</artifactId>

Review comment:
       Please rename inline with the other new `aws` connectors > 
`flink-connector-aws-dynamodb`

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkBuilder.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+import 
org.apache.flink.streaming.connectors.dynamodb.config.DynamoDbTablesConfig;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Builder to construct {@link DynamoDbSink}.
+ *
+ * <p>The following example shows the minimum setup to create a {@link 
DynamoDbSink} that writes
+ * records into DynamoDb
+ *
+ * <pre>{@code
+ * private static class DummyElementConverter
+ *         implements ElementConverter<String, DynamoDbWriteRequest> {

Review comment:
       There was some discussion before about exposing the `ElementConverter` 
to the user. For KDS/KDF we decided to hide the `ElementConverter`, since we do 
not need to bubble up AWS SDK classes to user code (and this causes other 
issues). Is there a good reason to bubble up to user for DynamoDB? I would 
prefer we hide if possible, look at KDS/KDS implementation for inspiration 

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkBuilder.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+import 
org.apache.flink.streaming.connectors.dynamodb.config.DynamoDbTablesConfig;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Builder to construct {@link DynamoDbSink}.
+ *
+ * <p>The following example shows the minimum setup to create a {@link 
DynamoDbSink} that writes
+ * records into DynamoDb
+ *
+ * <pre>{@code
+ * private static class DummyElementConverter
+ *         implements ElementConverter<String, DynamoDbWriteRequest> {
+ *
+ *     @Override
+ *     public DynamoDbWriteRequest apply(String element, SinkWriter.Context 
context) {
+ *         final Map<String, AttributeValue> map = new HashMap<>();
+ *         map.put("your-key", AttributeValue.builder().s(element).build());
+ *         return new DynamoDbWriteRequest(
+ *                 "your-table-name",
+ *                 WriteRequest.builder()
+ *                         .putRequest(PutRequest.builder().item(map).build())
+ *                         .build());
+ *      }
+ * }
+ * DynamoDbSink<String> dynamoDbSink = DynamoDbSink.<String>builder()
+ *                                          .setElementConverter(new 
DummyElementConverter())
+ *                                       .build();
+ * }</pre>
+ *
+ * <p>If the following parameters are not set in this builder, the following 
defaults will be used:
+ *
+ * <ul>
+ *   <li>{@code maxBatchSize} will be 25
+ *   <li>{@code maxInFlightRequests} will be 50
+ *   <li>{@code maxBufferedRequests} will be 10000
+ *   <li>{@code maxBatchSizeInBytes} will be 16 MB i.e. {@code 16 * 1000 * 
1000}
+ *   <li>{@code maxTimeInBufferMS} will be 5000ms
+ *   <li>{@code maxRecordSizeInBytes} will be 400 KB i.e. {@code 400 * 1000 * 
1000}
+ *   <li>{@code failOnError} will be false
+ *   <li>{@code dynamoDbTablesConfig} will be empty meaning no records 
deduplication will be
+ *       performed by the sink

Review comment:
       Have you done any performance testing to backup these values or are you 
picking sensible defaults?

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSink.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.dynamodb.config.DynamoDbTablesConfig;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A DynamoDB Sink that performs async requests against a destination stream 
using the buffering

Review comment:
       > against a destination stream
   
   Copy and paste error?

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkWriter.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import 
org.apache.flink.streaming.connectors.dynamodb.config.DynamoDbTablesConfig;
+import org.apache.flink.streaming.connectors.dynamodb.util.AWSDynamoDbUtil;
+import 
org.apache.flink.streaming.connectors.dynamodb.util.DynamoDbExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Sink writer created by {@link DynamoDbSink} to write to DynamoDB. More 
details on the operation
+ * of this sink writer may be found in the doc for {@link DynamoDbSink}. More 
details on the
+ * internals of this sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * <p>The {@link DynamoDbAsyncClient} used here may be configured in the 
standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, 
DynamoDbWriteRequest> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDbSinkWriter.class);
+
+    /* A counter for the total number of records that have encountered an 
error during put */
+    private final Counter numRecordsOutErrorsCounter;
+
+    /* The sink writer metric group */
+    private final SinkWriterMetricGroup metrics;
+
+    private final DynamoDbTablesConfig tablesConfig;
+    private final DynamoDbAsyncClient client;
+    private final boolean failOnError;
+
+    public DynamoDbSinkWriter(
+            ElementConverter<InputT, DynamoDbWriteRequest> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            boolean failOnError,
+            DynamoDbTablesConfig tablesConfig,
+            Properties dynamoDbClientProperties) {
+        super(
+                elementConverter,
+                context,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes);
+        this.failOnError = failOnError;
+        this.tablesConfig = tablesConfig;
+        this.metrics = context.metricGroup();
+        this.numRecordsOutErrorsCounter = 
metrics.getNumRecordsOutErrorsCounter();
+        this.client = AWSDynamoDbUtil.createClient(dynamoDbClientProperties);
+    }
+
+    @Override
+    protected void submitRequestEntries(
+            List<DynamoDbWriteRequest> requestEntries,
+            Consumer<Collection<DynamoDbWriteRequest>> requestResultConsumer) {
+
+        TableRequestsContainer container = new 
TableRequestsContainer(tablesConfig);
+        requestEntries.forEach(container::put);
+
+        CompletableFuture<BatchWriteItemResponse> future =
+                client.batchWriteItem(
+                        BatchWriteItemRequest.builder()
+                                .requestItems(container.getRequestItems())
+                                .build());
+
+        future.whenComplete(
+                (response, err) -> {
+                    if (err != null) {
+                        handleFullyFailedRequest(err, requestEntries, 
requestResultConsumer);
+                    } else if (response.unprocessedItems() != null
+                            && !response.unprocessedItems().isEmpty()) {
+                        handlePartiallyUnprocessedRequest(response, 
requestResultConsumer);
+                    } else {
+                        requestResultConsumer.accept(Collections.emptyList());
+                    }
+                });
+    }
+
+    private void handlePartiallyUnprocessedRequest(
+            BatchWriteItemResponse response,
+            Consumer<Collection<DynamoDbWriteRequest>> requestResult) {
+        List<DynamoDbWriteRequest> unprocessed = new ArrayList<>();
+
+        for (String tableName : response.unprocessedItems().keySet()) {
+            for (WriteRequest request : 
response.unprocessedItems().get(tableName)) {
+                unprocessed.add(new DynamoDbWriteRequest(tableName, request));
+            }
+        }
+
+        LOG.warn("DynamoDB Sink failed to persist {} entries", 
unprocessed.size());

Review comment:
       For KDA/KDF we changed level to `debug` to prevent logs spamming during 
throttling when destination is under-provisioned. I think we should do the same 
here  since the metric (`numRecordsOutErrorsCounter`) provides the same info

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbWriteRequest.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Represents a single DynamoDb {@link WriteRequest}. Contains the name of the 
DynamoDb table name
+ * to write to as well as the {@link WriteRequest}
+ */
+@PublicEvolving
+public class DynamoDbWriteRequest implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String tableName;
+    private final WriteRequest writeRequest;
+
+    public DynamoDbWriteRequest(String tableName, WriteRequest writeRequest) {
+        this.tableName = tableName;
+        this.writeRequest = writeRequest;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public WriteRequest getWriteRequest() {
+        return writeRequest;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        DynamoDbWriteRequest that = (DynamoDbWriteRequest) o;
+        return Objects.equals(tableName, that.tableName)
+                && Objects.equals(writeRequest, that.writeRequest);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(tableName, writeRequest);
+    }
+}

Review comment:
       What is the need for this? Flink coding standard say to only use when 
actually necessary in src code: 
https://flink.apache.org/contributing/code-style-and-quality-java.html#equals--hashcode

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbWriterStateSerializer.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collection;
+
+/** A serializer used to serialize a collection of {@link 
DynamoDbWriteRequest}. */
+@Internal
+public class DynamoDbWriterStateSerializer
+        implements SimpleVersionedSerializer<Collection<DynamoDbWriteRequest>> 
{
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+
+    @Override
+    public byte[] serialize(Collection<DynamoDbWriteRequest> obj) throws 
IOException {
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                final ObjectOutputStream out = new ObjectOutputStream(baos)) {

Review comment:
       The Flink coding guidelines quite strongly suggest to not use Java 
serialisation: 
https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization
   
   Other sinks have implemented this mechanism manually, can you please follow? 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsStateSerializer.java

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/util/AWSDynamoDbUtil.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import 
org.apache.flink.streaming.connectors.dynamodb.config.AWSDynamoDbConfigConstants;
+
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
+import software.amazon.awssdk.core.client.config.SdkClientOption;
+import software.amazon.awssdk.core.retry.RetryPolicy;
+import software.amazon.awssdk.core.retry.backoff.BackoffStrategy;
+import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy;
+import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy;
+import software.amazon.awssdk.core.retry.backoff.FullJitterBackoffStrategy;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.Properties;
+
+/** Some utilities specific to Amazon Web Service. */

Review comment:
       I think you meant DynamoDB utils? 

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/util/DynamoDbExceptionUtils.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.util;
+
+import org.apache.flink.annotation.Internal;
+
+import software.amazon.awssdk.awscore.exception.AwsServiceException;
+import software.amazon.awssdk.core.exception.ApiCallAttemptTimeoutException;
+import software.amazon.awssdk.core.exception.SdkClientException;
+import 
software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
+import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
+import 
software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+
+/** A collection of utility functions to simplify work with DynamoDB service 
exceptions. */
+@Internal
+public class DynamoDbExceptionUtils {

Review comment:
       As discussed previously, we have implemented some error handling classes 
in AsyncSink to help with the common things, for example 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-aws-base/src/main/java/org/apache/flink/connector/aws/util/AWSCredentialFatalExceptionClassifiers.java

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/TableRequestsContainer.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import 
org.apache.flink.streaming.connectors.dynamodb.config.DynamoDbTablesConfig;
+import org.apache.flink.streaming.connectors.dynamodb.sink.key.PrimaryKey;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Container to accumulate requests in batches per table. De-duplicates batch 
request entities as
+ * per PrimaryKey definition. DynamoDB Batch API rejects the whole batch 
request if the request
+ * contains at least two items with identical hash and range keys (which 
essentially is two put
+ * operations).
+ */
+class TableRequestsContainer {
+
+    private final DynamoDbTablesConfig tablesConfig;
+    private final LinkedHashMap<String, Map<PrimaryKey, WriteRequest>> 
container;

Review comment:
       Does not look fixed in this PR

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSink.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.connector.sink.SinkWriter;
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import 
org.apache.flink.streaming.connectors.dynamodb.config.DynamoDbTablesConfig;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A DynamoDB Sink that performs async requests against a destination stream 
using the buffering
+ * protocol specified in {@link AsyncSinkBase}.
+ *
+ * <p>The sink internally uses a {@link
+ * software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient} to 
communicate with the AWS
+ * endpoint.
+ *
+ * <p>The behaviour of the buffering may be specified by providing 
configuration during the sink
+ * build time.
+ *
+ * <ul>
+ *   <li>{@code maxBatchSize}: the maximum size of a batch of entries that may 
be written to
+ *       DynamoDb.
+ *   <li>{@code maxInFlightRequests}: the maximum number of in flight requests 
that may exist, if
+ *       any more in flight requests need to be initiated once the maximum has 
been reached, then it
+ *       will be blocked until some have completed
+ *   <li>{@code maxBufferedRequests}: the maximum number of elements held in 
the buffer, requests to
+ *       add elements will be blocked while the number of elements in the 
buffer is at the maximum
+ *   <li>{@code maxBatchSizeInBytes}: the maximum size of a batch of entries 
that may be written to
+ *       DynamoDb measured in bytes
+ *   <li>{@code maxTimeInBufferMS}: the maximum amount of time an entry is 
allowed to live in the
+ *       buffer, if any element reaches this age, the entire buffer will be 
flushed immediately
+ *   <li>{@code maxRecordSizeInBytes}: the maximum size of a record the sink 
will accept into the
+ *       buffer, a record of size larger than this will be rejected when 
passed to the sink
+ *   <li>{@code failOnError}: when an exception is encountered while 
persisting to DynamoDb, the job
+ *       will fail immediately if failOnError is set
+ *   <li>{@code dynamoDbTablesConfig}: if provided for the table, the DynamoDb 
sink will attempt to
+ *       deduplicate records with the same primary and/or secondary keys in 
the same batch request.
+ *       Only the latest record with the same combination of key attributes is 
preserved in the
+ *       request.
+ * </ul>
+ *
+ * <p>Please see the writer implementation in {@link DynamoDbSinkWriter}
+ *
+ * @param <InputT> Type of the elements handled by this sink
+ */
+@PublicEvolving
+public class DynamoDbSink<InputT> extends AsyncSinkBase<InputT, 
DynamoDbWriteRequest> {

Review comment:
       Let's keep it as `DynamoDbSink` for consistency with the code base. I do 
not think the underlying implementation of sync vs async is necessarily a 
concern to the user and therefore adding it to the class name does not add much 
value.

##########
File path: flink-connectors/flink-connector-dynamodb/pom.xml
##########
@@ -0,0 +1,211 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.15-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <artifactId>flink-connector-dynamodb</artifactId>
+       <name>Flink : Connectors : DynamoDB</name>
+       <properties>
+               <aws.sdk.version>2.17.116</aws.sdk.version>
+               <commons-lang3.version>3.11</commons-lang3.version>
+               <testcontainers.version>1.16.2</testcontainers.version>

Review comment:
       Can you use the managed version from parent instead?
   https://github.com/apache/flink/blob/master/pom.xml#L139

##########
File path: flink-connectors/flink-connector-dynamodb/pom.xml
##########
@@ -0,0 +1,211 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-connectors</artifactId>
+               <version>1.15-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <artifactId>flink-connector-dynamodb</artifactId>
+       <name>Flink : Connectors : DynamoDB</name>
+       <properties>
+               <aws.sdk.version>2.17.116</aws.sdk.version>
+               <commons-lang3.version>3.11</commons-lang3.version>
+               <testcontainers.version>1.16.2</testcontainers.version>
+       </properties>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <!-- Connectors -->
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-base</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-aws-base</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <!-- Amazon AWS SDK v2.x dependencies -->
+               <dependency>
+                       <groupId>software.amazon.awssdk</groupId>
+                       <artifactId>aws-core</artifactId>
+                       <version>${aws.sdk.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>software.amazon.awssdk</groupId>
+                       <artifactId>dynamodb</artifactId>
+                       <version>${aws.sdk.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>software.amazon.awssdk</groupId>
+                       <artifactId>profiles</artifactId>
+                       <version>${aws.sdk.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>software.amazon.awssdk</groupId>
+                       <artifactId>sdk-core</artifactId>
+                       <version>${aws.sdk.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>software.amazon.awssdk</groupId>
+                       <artifactId>auth</artifactId>
+                       <version>${aws.sdk.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>software.amazon.awssdk</groupId>
+                       <artifactId>regions</artifactId>
+                       <version>${aws.sdk.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>software.amazon.awssdk</groupId>
+                       <artifactId>sts</artifactId>
+                       <version>${aws.sdk.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>software.amazon.awssdk</groupId>
+                       <artifactId>http-client-spi</artifactId>
+                       <version>${aws.sdk.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>software.amazon.awssdk</groupId>
+                       <artifactId>netty-nio-client</artifactId>
+                       <version>${aws.sdk.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.commons</groupId>
+                       <artifactId>commons-lang3</artifactId>
+                       <version>${commons-lang3.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- Flink ecosystem -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- Test dependencies -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-aws-base</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-streaming-java</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+                       <type>test-jar</type>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-runtime</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.testcontainers</groupId>
+                       <artifactId>testcontainers</artifactId>
+                       <version>${testcontainers.version}</version>
+                       <scope>test</scope>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-dependency-plugin</artifactId>
+                       </plugin>
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-jar-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <goals>
+                                                       <goal>test-jar</goal>

Review comment:
       Why are we building a test jar? I cannot see any consumers of this

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/config/AWSDynamoDbConfigConstants.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.connectors.dynamodb.util.AWSDynamoDbUtil;
+
+import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy;
+import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy;
+import software.amazon.awssdk.core.retry.backoff.FullJitterBackoffStrategy;
+
+/** Defaults for {@link AWSDynamoDbUtil}. */
+@PublicEvolving
+public class AWSDynamoDbConfigConstants {
+
+    public static final String BASE_DYNAMODB_USER_AGENT_PREFIX_FORMAT =
+            "Apache Flink %s (%s) DynamoDB Connector";
+
+    /** Identifier for user agent prefix. */
+    public static final String DYNAMODB_CLIENT_USER_AGENT_PREFIX =
+            "aws.dynamodb.client.user-agent-prefix";
+
+    public static final String AWS_DYNAMODB_CLIENT_RETRY_PREFIX = 
"aws.dynamodb.client.retry";
+
+    /**
+     * Possible configuration values for backoff strategy on retry when 
writing to DynamoDB.
+     * Internally, a corresponding implementation of {@link
+     * software.amazon.awssdk.core.retry.backoff.BackoffStrategy} will be used.
+     */
+    @PublicEvolving
+    public enum BackoffStrategy {
+
+        /**
+         * Backoff strategy that uses a full jitter strategy for computing the 
next backoff delay. A
+         * full jitter strategy will always compute a new random delay between 
and the computed
+         * exponential backoff for each subsequent request. See example: {@link
+         * FullJitterBackoffStrategy}
+         */
+        FULL_JITTER,
+
+        /**
+         * Backoff strategy that uses equal jitter for computing the delay 
before the next retry. An
+         * equal jitter backoff strategy will first compute an exponential 
delay based on the
+         * current number of retries, base delay and max delay. The final 
computed delay before the
+         * next retry will keep half of this computed delay plus a random 
delay computed as a random
+         * number between 0 and half of the exponential delay plus one. See 
example: {@link
+         * EqualJitterBackoffStrategy}
+         */
+        EQUAL_JITTER,
+
+        /**
+         * Simple backoff strategy that always uses a fixed delay for the 
delay * before the next
+         * retry attempt. See example: {@link FixedDelayBackoffStrategy}
+         */
+        FIXED_DELAY
+    }
+
+    /**
+     * The maximum number of times that a single request should be retried by 
the DynamoDB client,
+     * assuming it fails for a retryable error.
+     */
+    public static final String NUMBER_RETRIES =
+            AWS_DYNAMODB_CLIENT_RETRY_PREFIX + ".number-retries";
+
+    /**
+     * Backoff strategy when writing to DynamoDb on retries. Supported values: 
{@link
+     * BackoffStrategy}.
+     */
+    public static final String BACKOFF_STRATEGY =
+            AWS_DYNAMODB_CLIENT_RETRY_PREFIX + ".backoff-strategy";
+
+    /**
+     * Configure the backoff strategy that should be used for waiting in 
between retry attempts
+     * after a throttling error is encountered. If the retry is not because of 
throttling reasons,
+     * BACKOFF_STRATEGY is used. Supported values: {@link BackoffStrategy}.
+     */
+    public static final String THROTTLING_BACKOFF_STRATEGY =
+            AWS_DYNAMODB_CLIENT_RETRY_PREFIX + ".throttling-backoff-strategy";
+
+    public static final String FULL_JITTER_BASE_DELAY_MS =
+            AWS_DYNAMODB_CLIENT_RETRY_PREFIX + ".full-jitter.base-delay-ms";
+    public static final String FULL_JITTER_MAX_BACKOFF_TIME_MS =
+            AWS_DYNAMODB_CLIENT_RETRY_PREFIX + 
".full-jitter.max-backoff-time-ms";
+
+    public static final String EQUAL_JITTER_BASE_DELAY_MS =
+            AWS_DYNAMODB_CLIENT_RETRY_PREFIX + ".equal-jitter.base-delay-ms";
+    public static final String EQUAL_JITTER_MAX_BACKOFF_TIME_MS =
+            AWS_DYNAMODB_CLIENT_RETRY_PREFIX + 
".equal-jitter.max-backoff-time-ms";
+
+    public static final String FIXED_DELAY_BACKOFF_MS =
+            AWS_DYNAMODB_CLIENT_RETRY_PREFIX + 
".fixed-delay.fixed-backoff-time-ms";

Review comment:
       This all looks pretty standard. Can we pull up into for reuse across 
other implementations? 

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/config/AWSDynamoDbConfigConstants.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.config;

Review comment:
       Please root all your classes in a package consistent with the new 
KDS/KDF connector:
   - `org.apache.flink.connector.dynamodb.sink`

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkWriter.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import 
org.apache.flink.streaming.connectors.dynamodb.config.DynamoDbTablesConfig;
+import org.apache.flink.streaming.connectors.dynamodb.util.AWSDynamoDbUtil;
+import 
org.apache.flink.streaming.connectors.dynamodb.util.DynamoDbExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Sink writer created by {@link DynamoDbSink} to write to DynamoDB. More 
details on the operation
+ * of this sink writer may be found in the doc for {@link DynamoDbSink}. More 
details on the
+ * internals of this sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * <p>The {@link DynamoDbAsyncClient} used here may be configured in the 
standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, 
DynamoDbWriteRequest> {

Review comment:
       Please see 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java,
 you are missing `close()` to cleanup client and HTTP client

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkException.java
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+/** Exception is thrown when DynamoDb sink failed to write data. */
+public class DynamoDbSinkException extends RuntimeException {

Review comment:
       This might need to be `@PublicEvolving` , at least it is here 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseException.java#L26

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkWriter.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import 
org.apache.flink.streaming.connectors.dynamodb.config.DynamoDbTablesConfig;
+import org.apache.flink.streaming.connectors.dynamodb.util.AWSDynamoDbUtil;
+import 
org.apache.flink.streaming.connectors.dynamodb.util.DynamoDbExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Sink writer created by {@link DynamoDbSink} to write to DynamoDB. More 
details on the operation
+ * of this sink writer may be found in the doc for {@link DynamoDbSink}. More 
details on the
+ * internals of this sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * <p>The {@link DynamoDbAsyncClient} used here may be configured in the 
standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, 
DynamoDbWriteRequest> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDbSinkWriter.class);
+
+    /* A counter for the total number of records that have encountered an 
error during put */
+    private final Counter numRecordsOutErrorsCounter;
+
+    /* The sink writer metric group */
+    private final SinkWriterMetricGroup metrics;
+
+    private final DynamoDbTablesConfig tablesConfig;
+    private final DynamoDbAsyncClient client;
+    private final boolean failOnError;
+
+    public DynamoDbSinkWriter(
+            ElementConverter<InputT, DynamoDbWriteRequest> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            boolean failOnError,
+            DynamoDbTablesConfig tablesConfig,
+            Properties dynamoDbClientProperties) {
+        super(
+                elementConverter,
+                context,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes);
+        this.failOnError = failOnError;
+        this.tablesConfig = tablesConfig;
+        this.metrics = context.metricGroup();
+        this.numRecordsOutErrorsCounter = 
metrics.getNumRecordsOutErrorsCounter();
+        this.client = AWSDynamoDbUtil.createClient(dynamoDbClientProperties);

Review comment:
       You will also need a reference to the HTTP client in order to invoke 
`close()`

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkBuilder.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
+import 
org.apache.flink.streaming.connectors.dynamodb.config.DynamoDbTablesConfig;
+
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * Builder to construct {@link DynamoDbSink}.
+ *
+ * <p>The following example shows the minimum setup to create a {@link 
DynamoDbSink} that writes
+ * records into DynamoDb
+ *
+ * <pre>{@code
+ * private static class DummyElementConverter
+ *         implements ElementConverter<String, DynamoDbWriteRequest> {
+ *
+ *     @Override
+ *     public DynamoDbWriteRequest apply(String element, SinkWriter.Context 
context) {
+ *         final Map<String, AttributeValue> map = new HashMap<>();
+ *         map.put("your-key", AttributeValue.builder().s(element).build());
+ *         return new DynamoDbWriteRequest(
+ *                 "your-table-name",
+ *                 WriteRequest.builder()
+ *                         .putRequest(PutRequest.builder().item(map).build())
+ *                         .build());
+ *      }
+ * }
+ * DynamoDbSink<String> dynamoDbSink = DynamoDbSink.<String>builder()
+ *                                          .setElementConverter(new 
DummyElementConverter())
+ *                                       .build();
+ * }</pre>
+ *
+ * <p>If the following parameters are not set in this builder, the following 
defaults will be used:
+ *
+ * <ul>
+ *   <li>{@code maxBatchSize} will be 25
+ *   <li>{@code maxInFlightRequests} will be 50
+ *   <li>{@code maxBufferedRequests} will be 10000
+ *   <li>{@code maxBatchSizeInBytes} will be 16 MB i.e. {@code 16 * 1000 * 
1000}
+ *   <li>{@code maxTimeInBufferMS} will be 5000ms
+ *   <li>{@code maxRecordSizeInBytes} will be 400 KB i.e. {@code 400 * 1000 * 
1000}
+ *   <li>{@code failOnError} will be false
+ *   <li>{@code dynamoDbTablesConfig} will be empty meaning no records 
deduplication will be
+ *       performed by the sink
+ * </ul>
+ *
+ * @param <InputT> type of elements that should be persisted in the destination
+ */
+@PublicEvolving
+public class DynamoDbSinkBuilder<InputT>
+        extends AsyncSinkBaseBuilder<InputT, DynamoDbWriteRequest, 
DynamoDbSinkBuilder<InputT>> {
+
+    private static final int DEFAULT_MAX_BATCH_SIZE = 25;
+    private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
+    private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10000;
+    private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 16 * 1000 * 1000;
+    private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
+    private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 400 * 1000;
+    private static final boolean DEFAULT_FAIL_ON_ERROR = false;
+
+    private boolean failOnError;
+    private DynamoDbTablesConfig dynamoDbTablesConfig;
+    private Properties dynamodbClientProperties;
+
+    public DynamoDbSinkBuilder<InputT> setDynamoDbProperties(Properties 
properties) {
+        this.dynamodbClientProperties = properties;
+        return this;
+    }
+
+    /**
+     * @param dynamoDbTablesConfig the {@link DynamoDbTablesConfig} if 
provided for the table, the
+     *     DynamoDb sink will attempt to deduplicate records with the same 
primary and/or secondary
+     *     keys in the same batch request. Only the latest record with the 
same combination of key
+     *     attributes is preserved in the request.
+     */

Review comment:
       > the DynamoDb sink will attempt to deduplicate records with the same 
primary and/or secondary keys in the same batch request. Only the latest record 
with the same combination of key attributes is preserved in the request.
   
   This comment feels out of place, how is it related to 
`setDynamoDbTablesConfig`? Is there a config within `setDynamoDbTablesConfig` 
to control this behaviour? 

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkWriter.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import 
org.apache.flink.streaming.connectors.dynamodb.config.DynamoDbTablesConfig;
+import org.apache.flink.streaming.connectors.dynamodb.util.AWSDynamoDbUtil;
+import 
org.apache.flink.streaming.connectors.dynamodb.util.DynamoDbExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Sink writer created by {@link DynamoDbSink} to write to DynamoDB. More 
details on the operation
+ * of this sink writer may be found in the doc for {@link DynamoDbSink}. More 
details on the
+ * internals of this sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * <p>The {@link DynamoDbAsyncClient} used here may be configured in the 
standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, 
DynamoDbWriteRequest> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDbSinkWriter.class);
+
+    /* A counter for the total number of records that have encountered an 
error during put */
+    private final Counter numRecordsOutErrorsCounter;
+
+    /* The sink writer metric group */
+    private final SinkWriterMetricGroup metrics;
+
+    private final DynamoDbTablesConfig tablesConfig;
+    private final DynamoDbAsyncClient client;
+    private final boolean failOnError;
+
+    public DynamoDbSinkWriter(
+            ElementConverter<InputT, DynamoDbWriteRequest> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            boolean failOnError,
+            DynamoDbTablesConfig tablesConfig,
+            Properties dynamoDbClientProperties) {
+        super(
+                elementConverter,
+                context,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes);
+        this.failOnError = failOnError;
+        this.tablesConfig = tablesConfig;
+        this.metrics = context.metricGroup();
+        this.numRecordsOutErrorsCounter = 
metrics.getNumRecordsOutErrorsCounter();
+        this.client = AWSDynamoDbUtil.createClient(dynamoDbClientProperties);
+    }
+
+    @Override
+    protected void submitRequestEntries(
+            List<DynamoDbWriteRequest> requestEntries,
+            Consumer<Collection<DynamoDbWriteRequest>> requestResultConsumer) {
+
+        TableRequestsContainer container = new 
TableRequestsContainer(tablesConfig);
+        requestEntries.forEach(container::put);
+
+        CompletableFuture<BatchWriteItemResponse> future =
+                client.batchWriteItem(
+                        BatchWriteItemRequest.builder()
+                                .requestItems(container.getRequestItems())
+                                .build());
+
+        future.whenComplete(
+                (response, err) -> {
+                    if (err != null) {
+                        handleFullyFailedRequest(err, requestEntries, 
requestResultConsumer);
+                    } else if (response.unprocessedItems() != null
+                            && !response.unprocessedItems().isEmpty()) {
+                        handlePartiallyUnprocessedRequest(response, 
requestResultConsumer);
+                    } else {
+                        requestResultConsumer.accept(Collections.emptyList());
+                    }
+                });
+    }
+
+    private void handlePartiallyUnprocessedRequest(
+            BatchWriteItemResponse response,
+            Consumer<Collection<DynamoDbWriteRequest>> requestResult) {
+        List<DynamoDbWriteRequest> unprocessed = new ArrayList<>();
+
+        for (String tableName : response.unprocessedItems().keySet()) {
+            for (WriteRequest request : 
response.unprocessedItems().get(tableName)) {
+                unprocessed.add(new DynamoDbWriteRequest(tableName, request));
+            }
+        }
+
+        LOG.warn("DynamoDB Sink failed to persist {} entries", 
unprocessed.size());
+        numRecordsOutErrorsCounter.inc(unprocessed.size());
+
+        requestResult.accept(unprocessed);
+    }
+
+    private void handleFullyFailedRequest(
+            Throwable err,
+            List<DynamoDbWriteRequest> requestEntries,
+            Consumer<Collection<DynamoDbWriteRequest>> requestResult) {
+        LOG.warn("DynamoDB Sink failed to persist {} entries", 
requestEntries.size(), err);
+        numRecordsOutErrorsCounter.inc(requestEntries.size());
+
+        if (DynamoDbExceptionUtils.isNotRetryableException(err.getCause())) {

Review comment:
       We added an exception classifier chaining mechanism for KDS/KDF that you 
can reuse here to appropriately handle things like `InterruptedException` etc: 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java#L246

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkWriter.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.Sink;

Review comment:
       This needs to be migrated to sinkV2 
`org.apache.flink.api.connector.sink2.Sink`

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkWriter.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import 
org.apache.flink.streaming.connectors.dynamodb.config.DynamoDbTablesConfig;
+import org.apache.flink.streaming.connectors.dynamodb.util.AWSDynamoDbUtil;
+import 
org.apache.flink.streaming.connectors.dynamodb.util.DynamoDbExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Sink writer created by {@link DynamoDbSink} to write to DynamoDB. More 
details on the operation
+ * of this sink writer may be found in the doc for {@link DynamoDbSink}. More 
details on the
+ * internals of this sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * <p>The {@link DynamoDbAsyncClient} used here may be configured in the 
standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, 
DynamoDbWriteRequest> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDbSinkWriter.class);
+
+    /* A counter for the total number of records that have encountered an 
error during put */
+    private final Counter numRecordsOutErrorsCounter;
+
+    /* The sink writer metric group */
+    private final SinkWriterMetricGroup metrics;
+
+    private final DynamoDbTablesConfig tablesConfig;
+    private final DynamoDbAsyncClient client;
+    private final boolean failOnError;
+
+    public DynamoDbSinkWriter(
+            ElementConverter<InputT, DynamoDbWriteRequest> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            boolean failOnError,
+            DynamoDbTablesConfig tablesConfig,
+            Properties dynamoDbClientProperties) {
+        super(
+                elementConverter,
+                context,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes);
+        this.failOnError = failOnError;
+        this.tablesConfig = tablesConfig;
+        this.metrics = context.metricGroup();
+        this.numRecordsOutErrorsCounter = 
metrics.getNumRecordsOutErrorsCounter();
+        this.client = AWSDynamoDbUtil.createClient(dynamoDbClientProperties);
+    }
+
+    @Override
+    protected void submitRequestEntries(
+            List<DynamoDbWriteRequest> requestEntries,
+            Consumer<Collection<DynamoDbWriteRequest>> requestResultConsumer) {
+
+        TableRequestsContainer container = new 
TableRequestsContainer(tablesConfig);
+        requestEntries.forEach(container::put);
+
+        CompletableFuture<BatchWriteItemResponse> future =
+                client.batchWriteItem(
+                        BatchWriteItemRequest.builder()
+                                .requestItems(container.getRequestItems())
+                                .build());
+
+        future.whenComplete(
+                (response, err) -> {
+                    if (err != null) {
+                        handleFullyFailedRequest(err, requestEntries, 
requestResultConsumer);
+                    } else if (response.unprocessedItems() != null
+                            && !response.unprocessedItems().isEmpty()) {
+                        handlePartiallyUnprocessedRequest(response, 
requestResultConsumer);
+                    } else {
+                        requestResultConsumer.accept(Collections.emptyList());
+                    }
+                });
+    }
+
+    private void handlePartiallyUnprocessedRequest(
+            BatchWriteItemResponse response,
+            Consumer<Collection<DynamoDbWriteRequest>> requestResult) {
+        List<DynamoDbWriteRequest> unprocessed = new ArrayList<>();
+
+        for (String tableName : response.unprocessedItems().keySet()) {
+            for (WriteRequest request : 
response.unprocessedItems().get(tableName)) {
+                unprocessed.add(new DynamoDbWriteRequest(tableName, request));
+            }
+        }
+
+        LOG.warn("DynamoDB Sink failed to persist {} entries", 
unprocessed.size());
+        numRecordsOutErrorsCounter.inc(unprocessed.size());
+
+        requestResult.accept(unprocessed);
+    }
+
+    private void handleFullyFailedRequest(
+            Throwable err,
+            List<DynamoDbWriteRequest> requestEntries,
+            Consumer<Collection<DynamoDbWriteRequest>> requestResult) {
+        LOG.warn("DynamoDB Sink failed to persist {} entries", 
requestEntries.size(), err);

Review comment:
       Same as above

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkException.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+
+/** Exception is thrown when DynamoDb sink failed to write data. */
+@Internal
+public class DynamoDbSinkException extends RuntimeException {
+    public DynamoDbSinkException(String message) {
+        super(message);
+    }
+
+    public DynamoDbSinkException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    /**
+     * When the flag {@code failOnError} is set in {@link DynamoDbSinkWriter}, 
this exception is
+     * raised as soon as any exception occurs or sink returns unprocessed 
items.
+     */
+    @Internal
+    static class DynamoDbSinkFailFastException extends DynamoDbSinkException {
+
+        public DynamoDbSinkFailFastException() {
+            super(
+                    "Encountered an exception while persisting records, not 
retrying due to {failOnError} being set.");

Review comment:
       nit: "Encountered an exception while persisting records, not retrying 
due to {failOnError} being set." is duplicated, can you dedupe

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/InvalidRequestException.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+
+/** Exception is thrown if a DynamoDB request was invalid. */
+@Internal
+public class InvalidRequestException extends RuntimeException {

Review comment:
       Why is this not a subclass of `DynamoDbSinkException`?

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/TableRequestsContainer.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.streaming.connectors.dynamodb.config.DynamoDbTablesConfig;
+import org.apache.flink.streaming.connectors.dynamodb.sink.key.PrimaryKey;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Container to accumulate requests in batches per table. De-duplicates batch 
request entities as
+ * per PrimaryKey definition. DynamoDB Batch API rejects the whole batch 
request if the request
+ * contains at least two items with identical hash and range keys (which 
essentially is two put
+ * operations).
+ */
+@Internal
+class TableRequestsContainer {

Review comment:
       nit: This is really a dedupe class right? The name of this confused me 
until I looked up how it is used

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbWriterStateSerializer.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Collection;
+
+/** A serializer used to serialize a collection of {@link 
DynamoDbWriteRequest}. */
+@Internal
+public class DynamoDbWriterStateSerializer

Review comment:
       There is an `AsyncSink` base that can help here: 
`AsyncSinkWriterStateSerializer`

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/sink/DynamoDbSinkWriter.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+import 
org.apache.flink.streaming.connectors.dynamodb.config.DynamoDbTablesConfig;
+import org.apache.flink.streaming.connectors.dynamodb.util.AWSDynamoDbUtil;
+import 
org.apache.flink.streaming.connectors.dynamodb.util.DynamoDbExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
+import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Sink writer created by {@link DynamoDbSink} to write to DynamoDB. More 
details on the operation
+ * of this sink writer may be found in the doc for {@link DynamoDbSink}. More 
details on the
+ * internals of this sink writer may be found in {@link AsyncSinkWriter}.
+ *
+ * <p>The {@link DynamoDbAsyncClient} used here may be configured in the 
standard way for the AWS
+ * SDK 2.x. e.g. the provision of {@code AWS_REGION}, {@code 
AWS_ACCESS_KEY_ID} and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+class DynamoDbSinkWriter<InputT> extends AsyncSinkWriter<InputT, 
DynamoDbWriteRequest> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DynamoDbSinkWriter.class);
+
+    /* A counter for the total number of records that have encountered an 
error during put */
+    private final Counter numRecordsOutErrorsCounter;
+
+    /* The sink writer metric group */
+    private final SinkWriterMetricGroup metrics;
+
+    private final DynamoDbTablesConfig tablesConfig;
+    private final DynamoDbAsyncClient client;
+    private final boolean failOnError;
+
+    public DynamoDbSinkWriter(
+            ElementConverter<InputT, DynamoDbWriteRequest> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long maxBatchSizeInBytes,
+            long maxTimeInBufferMS,
+            long maxRecordSizeInBytes,
+            boolean failOnError,
+            DynamoDbTablesConfig tablesConfig,
+            Properties dynamoDbClientProperties) {
+        super(
+                elementConverter,
+                context,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                maxBatchSizeInBytes,
+                maxTimeInBufferMS,
+                maxRecordSizeInBytes);
+        this.failOnError = failOnError;
+        this.tablesConfig = tablesConfig;
+        this.metrics = context.metricGroup();
+        this.numRecordsOutErrorsCounter = 
metrics.getNumRecordsOutErrorsCounter();
+        this.client = AWSDynamoDbUtil.createClient(dynamoDbClientProperties);
+    }
+
+    @Override
+    protected void submitRequestEntries(
+            List<DynamoDbWriteRequest> requestEntries,
+            Consumer<Collection<DynamoDbWriteRequest>> requestResultConsumer) {
+
+        TableRequestsContainer container = new 
TableRequestsContainer(tablesConfig);
+        requestEntries.forEach(container::put);

Review comment:
       nit: Since you are deduplicating here, the actual batch size will be 
less than the configured batch size. I cannot see a better way to do it though

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/util/AWSDynamoDbUtil.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import 
org.apache.flink.streaming.connectors.dynamodb.config.AWSDynamoDbConfigConstants;
+
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
+import software.amazon.awssdk.core.client.config.SdkClientOption;
+import software.amazon.awssdk.core.retry.RetryPolicy;
+import software.amazon.awssdk.core.retry.backoff.BackoffStrategy;
+import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy;
+import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy;
+import software.amazon.awssdk.core.retry.backoff.FullJitterBackoffStrategy;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.Properties;
+
+/** Some utilities specific to Amazon Web Service. */
+@Internal
+public class AWSDynamoDbUtil extends AWSGeneralUtil {
+
+    public static DynamoDbAsyncClient createClient(final Properties 
properties) {
+        DynamoDbAsyncClientBuilder clientBuilder = 
DynamoDbAsyncClient.builder();
+
+        if (properties.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
+            final URI endpointOverride =
+                    
URI.create(properties.getProperty(AWSConfigConstants.AWS_ENDPOINT));
+            clientBuilder.endpointOverride(endpointOverride);
+        }
+
+        return clientBuilder
+                .httpClient(createAsyncHttpClient(properties))
+                .region(getRegion(properties))
+                .credentialsProvider(getCredentialsProvider(properties))
+                .overrideConfiguration(getOverrideConfiguration(properties))
+                .build();
+    }
+
+    @VisibleForTesting
+    static ClientOverrideConfiguration getOverrideConfiguration(Properties 
properties) {
+        ClientOverrideConfiguration.Builder builder = 
ClientOverrideConfiguration.builder();
+        SdkClientConfiguration config = 
SdkClientConfiguration.builder().build();
+
+        builder.putAdvancedOption(
+                        SdkAdvancedClientOption.USER_AGENT_PREFIX,
+                        getFlinkUserAgentPrefix(properties))
+                .putAdvancedOption(
+                        SdkAdvancedClientOption.USER_AGENT_SUFFIX,
+                        
config.option(SdkAdvancedClientOption.USER_AGENT_SUFFIX));
+
+        
Optional.ofNullable(config.option(SdkClientOption.API_CALL_ATTEMPT_TIMEOUT))
+                .ifPresent(builder::apiCallAttemptTimeout);
+
+        Optional.ofNullable(config.option(SdkClientOption.API_CALL_TIMEOUT))
+                .ifPresent(builder::apiCallTimeout);
+
+        builder.retryPolicy(getRetryPolicy(properties));
+        return builder.build();
+    }
+
+    @VisibleForTesting
+    static String getFlinkUserAgentPrefix(Properties properties) {
+        if 
(properties.containsKey(AWSDynamoDbConfigConstants.DYNAMODB_CLIENT_USER_AGENT_PREFIX))
 {
+            return properties.getProperty(
+                    
AWSDynamoDbConfigConstants.DYNAMODB_CLIENT_USER_AGENT_PREFIX);
+        }
+        return String.format(
+                
AWSDynamoDbConfigConstants.BASE_DYNAMODB_USER_AGENT_PREFIX_FORMAT,
+                EnvironmentInformation.getVersion(),
+                EnvironmentInformation.getRevisionInformation().commitId);
+    }
+
+    @VisibleForTesting
+    static RetryPolicy getRetryPolicy(Properties properties) {
+        if (hasRetryConfiguration(properties)) {
+            RetryPolicy.Builder builder = RetryPolicy.builder();
+
+            if 
(properties.containsKey(AWSDynamoDbConfigConstants.NUMBER_RETRIES)) {
+                builder.numRetries(
+                        Integer.parseInt(
+                                
properties.getProperty(AWSDynamoDbConfigConstants.NUMBER_RETRIES)));
+            }
+
+            if 
(properties.containsKey(AWSDynamoDbConfigConstants.BACKOFF_STRATEGY)) {
+                builder.backoffStrategy(
+                        getBackoffStrategy(
+                                properties, 
AWSDynamoDbConfigConstants.BACKOFF_STRATEGY));
+            }
+
+            if 
(properties.containsKey(AWSDynamoDbConfigConstants.THROTTLING_BACKOFF_STRATEGY))
 {
+                builder.throttlingBackoffStrategy(
+                        getBackoffStrategy(
+                                properties,
+                                
AWSDynamoDbConfigConstants.THROTTLING_BACKOFF_STRATEGY));
+            }
+            return builder.build();
+        }
+        return RetryPolicy.defaultRetryPolicy();
+    }
+
+    @VisibleForTesting
+    static boolean hasRetryConfiguration(Properties properties) {
+        return properties.stringPropertyNames().stream()
+                .anyMatch(
+                        name ->
+                                name.startsWith(
+                                        AWSDynamoDbConfigConstants
+                                                
.AWS_DYNAMODB_CLIENT_RETRY_PREFIX));
+    }
+
+    @VisibleForTesting
+    static BackoffStrategy getBackoffStrategy(Properties properties, String 
strategy) {
+        AWSDynamoDbConfigConstants.BackoffStrategy backoffStrategy =
+                AWSDynamoDbConfigConstants.BackoffStrategy.valueOf(
+                        properties.getProperty(strategy));
+
+        switch (backoffStrategy) {
+            case FULL_JITTER:
+                return FullJitterBackoffStrategy.builder()
+                        .baseDelay(
+                                getDuration(
+                                        properties.getProperty(
+                                                AWSDynamoDbConfigConstants
+                                                        
.FULL_JITTER_BASE_DELAY_MS)))
+                        .maxBackoffTime(
+                                getDuration(
+                                        properties.getProperty(
+                                                AWSDynamoDbConfigConstants
+                                                        
.FULL_JITTER_MAX_BACKOFF_TIME_MS)))
+                        .build();
+            case EQUAL_JITTER:
+                return EqualJitterBackoffStrategy.builder()
+                        .baseDelay(
+                                getDuration(
+                                        properties.getProperty(
+                                                AWSDynamoDbConfigConstants
+                                                        
.EQUAL_JITTER_BASE_DELAY_MS)))
+                        .maxBackoffTime(
+                                getDuration(
+                                        properties.getProperty(
+                                                AWSDynamoDbConfigConstants
+                                                        
.EQUAL_JITTER_MAX_BACKOFF_TIME_MS)))
+                        .build();
+            case FIXED_DELAY:
+                return FixedDelayBackoffStrategy.create(
+                        getDuration(
+                                properties.getProperty(
+                                        
AWSDynamoDbConfigConstants.FIXED_DELAY_BACKOFF_MS)));
+            default:
+                return BackoffStrategy.defaultStrategy();
+        }
+    }

Review comment:
        I believe these configs can be made generic and pulled in to 
`AWSGeneralUtil`?

##########
File path: 
flink-connectors/flink-connector-dynamodb/src/main/java/org/apache/flink/streaming/connectors/dynamodb/util/AWSDynamoDbUtil.java
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.dynamodb.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import 
org.apache.flink.streaming.connectors.dynamodb.config.AWSDynamoDbConfigConstants;
+
+import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
+import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
+import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
+import software.amazon.awssdk.core.client.config.SdkClientOption;
+import software.amazon.awssdk.core.retry.RetryPolicy;
+import software.amazon.awssdk.core.retry.backoff.BackoffStrategy;
+import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy;
+import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy;
+import software.amazon.awssdk.core.retry.backoff.FullJitterBackoffStrategy;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
+import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.Properties;
+
+/** Some utilities specific to Amazon Web Service. */
+@Internal
+public class AWSDynamoDbUtil extends AWSGeneralUtil {
+
+    public static DynamoDbAsyncClient createClient(final Properties 
properties) {
+        DynamoDbAsyncClientBuilder clientBuilder = 
DynamoDbAsyncClient.builder();
+
+        if (properties.containsKey(AWSConfigConstants.AWS_ENDPOINT)) {
+            final URI endpointOverride =
+                    
URI.create(properties.getProperty(AWSConfigConstants.AWS_ENDPOINT));
+            clientBuilder.endpointOverride(endpointOverride);
+        }
+
+        return clientBuilder
+                .httpClient(createAsyncHttpClient(properties))
+                .region(getRegion(properties))
+                .credentialsProvider(getCredentialsProvider(properties))
+                .overrideConfiguration(getOverrideConfiguration(properties))
+                .build();
+    }
+
+    @VisibleForTesting
+    static ClientOverrideConfiguration getOverrideConfiguration(Properties 
properties) {
+        ClientOverrideConfiguration.Builder builder = 
ClientOverrideConfiguration.builder();
+        SdkClientConfiguration config = 
SdkClientConfiguration.builder().build();
+
+        builder.putAdvancedOption(
+                        SdkAdvancedClientOption.USER_AGENT_PREFIX,
+                        getFlinkUserAgentPrefix(properties))
+                .putAdvancedOption(
+                        SdkAdvancedClientOption.USER_AGENT_SUFFIX,
+                        
config.option(SdkAdvancedClientOption.USER_AGENT_SUFFIX));
+
+        
Optional.ofNullable(config.option(SdkClientOption.API_CALL_ATTEMPT_TIMEOUT))
+                .ifPresent(builder::apiCallAttemptTimeout);
+
+        Optional.ofNullable(config.option(SdkClientOption.API_CALL_TIMEOUT))
+                .ifPresent(builder::apiCallTimeout);
+
+        builder.retryPolicy(getRetryPolicy(properties));
+        return builder.build();
+    }
+
+    @VisibleForTesting
+    static String getFlinkUserAgentPrefix(Properties properties) {
+        if 
(properties.containsKey(AWSDynamoDbConfigConstants.DYNAMODB_CLIENT_USER_AGENT_PREFIX))
 {
+            return properties.getProperty(
+                    
AWSDynamoDbConfigConstants.DYNAMODB_CLIENT_USER_AGENT_PREFIX);
+        }
+        return String.format(
+                
AWSDynamoDbConfigConstants.BASE_DYNAMODB_USER_AGENT_PREFIX_FORMAT,
+                EnvironmentInformation.getVersion(),
+                EnvironmentInformation.getRevisionInformation().commitId);
+    }
+
+    @VisibleForTesting
+    static RetryPolicy getRetryPolicy(Properties properties) {
+        if (hasRetryConfiguration(properties)) {
+            RetryPolicy.Builder builder = RetryPolicy.builder();
+
+            if 
(properties.containsKey(AWSDynamoDbConfigConstants.NUMBER_RETRIES)) {
+                builder.numRetries(
+                        Integer.parseInt(
+                                
properties.getProperty(AWSDynamoDbConfigConstants.NUMBER_RETRIES)));
+            }
+
+            if 
(properties.containsKey(AWSDynamoDbConfigConstants.BACKOFF_STRATEGY)) {
+                builder.backoffStrategy(
+                        getBackoffStrategy(
+                                properties, 
AWSDynamoDbConfigConstants.BACKOFF_STRATEGY));
+            }
+
+            if 
(properties.containsKey(AWSDynamoDbConfigConstants.THROTTLING_BACKOFF_STRATEGY))
 {
+                builder.throttlingBackoffStrategy(
+                        getBackoffStrategy(
+                                properties,
+                                
AWSDynamoDbConfigConstants.THROTTLING_BACKOFF_STRATEGY));
+            }
+            return builder.build();
+        }
+        return RetryPolicy.defaultRetryPolicy();
+    }

Review comment:
       I believe these configs can be made generic and pulled in to 
`AWSGeneralUtil`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to