This is an automated email from the ASF dual-hosted git repository.

hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new d22078f  [FLINK-36770]  make AWS sink writers use ResultHandler, move 
flink version to 1.20
d22078f is described below

commit d22078fc784ef3a13c2cf0ca1cf7777f7fa6a5ac
Author: Ahmed Hamdy <[email protected]>
AuthorDate: Fri Dec 20 12:22:47 2024 +0000

    [FLINK-36770]  make AWS sink writers use ResultHandler, move flink version 
to 1.20
---
 .github/workflows/nightly.yml                      |   4 +-
 .github/workflows/push_pr.yml                      |   4 +-
 .../firehose/sink/KinesisFirehoseSinkWriter.java   |  30 +++---
 .../kinesis/sink/KinesisStreamsSinkWriter.java     |  34 +++----
 .../dynamodb/sink/DynamoDbSinkWriter.java          |  30 +++---
 .../dynamodb/sink/DynamoDbSinkWriterTest.java      |  99 +++++++++++++-------
 .../flink/connector/sqs/sink/SqsSinkWriter.java    |  42 ++++-----
 .../connector/sqs/sink/SqsSinkWriterTest.java      | 101 +++++++++++++--------
 pom.xml                                            |   2 +-
 9 files changed, 204 insertions(+), 142 deletions(-)

diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml
index 1eb0f2e..e167e2e 100644
--- a/.github/workflows/nightly.yml
+++ b/.github/workflows/nightly.yml
@@ -25,7 +25,7 @@ jobs:
     if: github.repository_owner == 'apache'
     strategy:
       matrix:
-        flink: [1.19-SNAPSHOT, 1.20-SNAPSHOT]
+        flink: [1.20-SNAPSHOT]
         java: [ '8, 11, 17']
     uses: ./.github/workflows/common.yml
     with:
@@ -38,7 +38,7 @@ jobs:
   python_test:
     strategy:
       matrix:
-        flink: [1.19-SNAPSHOT, 1.20-SNAPSHOT]
+        flink: [1.20-SNAPSHOT]
     uses: 
apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
     with:
       flink_version: ${{ matrix.flink }}
diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index b47e29f..759d3ae 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -26,7 +26,7 @@ jobs:
     uses: ./.github/workflows/common.yml
     strategy:
       matrix:
-        flink: [1.19.1, 1.20.0]
+        flink: [1.20.0]
         java: [ '8, 11, 17']
     with:
       flink_version: ${{ matrix.flink }}
@@ -38,7 +38,7 @@ jobs:
   python_test:
     strategy:
       matrix:
-        flink: [1.19.0, 1.20.0]
+        flink: [1.20.0]
     uses: 
apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
     with:
       flink_version: ${{ matrix.flink }}
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
index 6cfa607..7660a4d 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
 import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
 import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
 import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -45,7 +46,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
 
 import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
 import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
@@ -171,7 +171,7 @@ class KinesisFirehoseSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, Record>
 
     @Override
     protected void submitRequestEntries(
-            List<Record> requestEntries, Consumer<List<Record>> requestResult) 
{
+            List<Record> requestEntries, ResultHandler<Record> resultHandler) {
 
         PutRecordBatchRequest batchRequest =
                 PutRecordBatchRequest.builder()
@@ -185,11 +185,11 @@ class KinesisFirehoseSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, Record>
         future.whenComplete(
                 (response, err) -> {
                     if (err != null) {
-                        handleFullyFailedRequest(err, requestEntries, 
requestResult);
+                        handleFullyFailedRequest(err, requestEntries, 
resultHandler);
                     } else if (response.failedPutCount() > 0) {
-                        handlePartiallyFailedRequest(response, requestEntries, 
requestResult);
+                        handlePartiallyFailedRequest(response, requestEntries, 
resultHandler);
                     } else {
-                        requestResult.accept(Collections.emptyList());
+                        resultHandler.complete();
                     }
                 });
     }
@@ -205,17 +205,19 @@ class KinesisFirehoseSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, Record>
     }
 
     private void handleFullyFailedRequest(
-            Throwable err, List<Record> requestEntries, Consumer<List<Record>> 
requestResult) {
+            Throwable err, List<Record> requestEntries, ResultHandler<Record> 
resultHandler) {
 
         numRecordsOutErrorsCounter.inc(requestEntries.size());
-        boolean isFatal = FIREHOSE_EXCEPTION_HANDLER.consumeIfFatal(err, 
getFatalExceptionCons());
+        boolean isFatal =
+                FIREHOSE_EXCEPTION_HANDLER.consumeIfFatal(
+                        err, resultHandler::completeExceptionally);
         if (isFatal) {
             return;
         }
 
         if (failOnError) {
-            getFatalExceptionCons()
-                    .accept(new 
KinesisFirehoseException.KinesisFirehoseFailFastException(err));
+            resultHandler.completeExceptionally(
+                    new 
KinesisFirehoseException.KinesisFirehoseFailFastException(err));
             return;
         }
 
@@ -223,17 +225,17 @@ class KinesisFirehoseSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, Record>
                 "KDF Sink failed to write and will retry {} entries to KDF",
                 requestEntries.size(),
                 err);
-        requestResult.accept(requestEntries);
+        resultHandler.retryForEntries(requestEntries);
     }
 
     private void handlePartiallyFailedRequest(
             PutRecordBatchResponse response,
             List<Record> requestEntries,
-            Consumer<List<Record>> requestResult) {
+            ResultHandler<Record> resultHandler) {
         numRecordsOutErrorsCounter.inc(response.failedPutCount());
         if (failOnError) {
-            getFatalExceptionCons()
-                    .accept(new 
KinesisFirehoseException.KinesisFirehoseFailFastException());
+            resultHandler.completeExceptionally(
+                    new 
KinesisFirehoseException.KinesisFirehoseFailFastException());
             return;
         }
 
@@ -248,6 +250,6 @@ class KinesisFirehoseSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, Record>
             }
         }
 
-        requestResult.accept(failedRequestEntries);
+        resultHandler.retryForEntries(failedRequestEntries);
     }
 }
diff --git 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
index 684d018..aa900d5 100644
--- 
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
+++ 
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
 import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
 import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
 import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
 import 
org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
 import 
org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
@@ -48,7 +49,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
 
 import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
 import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
@@ -199,7 +199,7 @@ class KinesisStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRecord
     @Override
     protected void submitRequestEntries(
             List<PutRecordsRequestEntry> requestEntries,
-            Consumer<List<PutRecordsRequestEntry>> requestResult) {
+            ResultHandler<PutRecordsRequestEntry> resultHandler) {
 
         PutRecordsRequest batchRequest =
                 PutRecordsRequest.builder()
@@ -213,11 +213,11 @@ class KinesisStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRecord
         future.whenComplete(
                 (response, err) -> {
                     if (err != null) {
-                        handleFullyFailedRequest(err, requestEntries, 
requestResult);
+                        handleFullyFailedRequest(err, requestEntries, 
resultHandler);
                     } else if (response.failedRecordCount() > 0) {
-                        handlePartiallyFailedRequest(response, requestEntries, 
requestResult);
+                        handlePartiallyFailedRequest(response, requestEntries, 
resultHandler);
                     } else {
-                        requestResult.accept(Collections.emptyList());
+                        resultHandler.complete();
                     }
                 });
     }
@@ -230,15 +230,15 @@ class KinesisStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRecord
     private void handleFullyFailedRequest(
             Throwable err,
             List<PutRecordsRequestEntry> requestEntries,
-            Consumer<List<PutRecordsRequestEntry>> requestResult) {
+            ResultHandler<PutRecordsRequestEntry> resultHandler) {
         LOG.warn(
                 "KDS Sink failed to write and will retry {} entries to KDS",
                 requestEntries.size(),
                 err);
         numRecordsOutErrorsCounter.inc(requestEntries.size());
 
-        if (isRetryable(err)) {
-            requestResult.accept(requestEntries);
+        if (isRetryable(err, resultHandler)) {
+            resultHandler.retryForEntries(requestEntries);
         }
     }
 
@@ -250,15 +250,15 @@ class KinesisStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRecord
     private void handlePartiallyFailedRequest(
             PutRecordsResponse response,
             List<PutRecordsRequestEntry> requestEntries,
-            Consumer<List<PutRecordsRequestEntry>> requestResult) {
+            ResultHandler<PutRecordsRequestEntry> resultHandler) {
         LOG.warn(
                 "KDS Sink failed to write and will retry {} entries to KDS",
                 response.failedRecordCount());
         numRecordsOutErrorsCounter.inc(response.failedRecordCount());
 
         if (failOnError) {
-            getFatalExceptionCons()
-                    .accept(new 
KinesisStreamsException.KinesisStreamsFailFastException());
+            resultHandler.completeExceptionally(
+                    new 
KinesisStreamsException.KinesisStreamsFailFastException());
             return;
         }
         List<PutRecordsRequestEntry> failedRequestEntries =
@@ -271,17 +271,19 @@ class KinesisStreamsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, PutRecord
             }
         }
 
-        requestResult.accept(failedRequestEntries);
+        resultHandler.retryForEntries(failedRequestEntries);
     }
 
-    private boolean isRetryable(Throwable err) {
+    private boolean isRetryable(
+            Throwable err, ResultHandler<PutRecordsRequestEntry> 
resultHandler) {
 
-        if (!KINESIS_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, 
getFatalExceptionCons())) {
+        if (!KINESIS_FATAL_EXCEPTION_CLASSIFIER.isFatal(
+                err, resultHandler::completeExceptionally)) {
             return false;
         }
         if (failOnError) {
-            getFatalExceptionCons()
-                    .accept(new 
KinesisStreamsException.KinesisStreamsFailFastException(err));
+            resultHandler.completeExceptionally(
+                    new 
KinesisStreamsException.KinesisStreamsFailFastException(err));
             return false;
         }
 
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
index e0ef4c4..7a4e229 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
 import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
 import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
 import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider;
 import org.apache.flink.connector.dynamodb.util.PrimaryKeyBuilder;
 import org.apache.flink.metrics.Counter;
@@ -45,12 +46,10 @@ import 
software.amazon.awssdk.services.dynamodb.model.WriteRequest;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
 
 import static java.util.Collections.singletonMap;
 import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
@@ -160,7 +159,7 @@ class DynamoDbSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, DynamoDbWriteRe
     @Override
     protected void submitRequestEntries(
             List<DynamoDbWriteRequest> requestEntries,
-            Consumer<List<DynamoDbWriteRequest>> requestResultConsumer) {
+            ResultHandler<DynamoDbWriteRequest> resultHandler) {
 
         List<WriteRequest> items = new ArrayList<>();
 
@@ -190,17 +189,17 @@ class DynamoDbSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, DynamoDbWriteRe
         future.whenComplete(
                 (response, err) -> {
                     if (err != null) {
-                        handleFullyFailedRequest(err, requestEntries, 
requestResultConsumer);
+                        handleFullyFailedRequest(err, requestEntries, 
resultHandler);
                     } else if 
(!CollectionUtil.isNullOrEmpty(response.unprocessedItems())) {
-                        handlePartiallyUnprocessedRequest(response, 
requestResultConsumer);
+                        handlePartiallyUnprocessedRequest(response, 
resultHandler);
                     } else {
-                        requestResultConsumer.accept(Collections.emptyList());
+                        resultHandler.complete();
                     }
                 });
     }
 
     private void handlePartiallyUnprocessedRequest(
-            BatchWriteItemResponse response, 
Consumer<List<DynamoDbWriteRequest>> requestResult) {
+            BatchWriteItemResponse response, 
ResultHandler<DynamoDbWriteRequest> resultHandler) {
         List<DynamoDbWriteRequest> unprocessed = new ArrayList<>();
 
         for (WriteRequest writeRequest : 
response.unprocessedItems().get(tableName)) {
@@ -211,32 +210,33 @@ class DynamoDbSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, DynamoDbWriteRe
         numRecordsSendErrorsCounter.inc(unprocessed.size());
         numRecordsSendPartialFailure.inc(unprocessed.size());
 
-        requestResult.accept(unprocessed);
+        resultHandler.retryForEntries(unprocessed);
     }
 
     private void handleFullyFailedRequest(
             Throwable err,
             List<DynamoDbWriteRequest> requestEntries,
-            Consumer<List<DynamoDbWriteRequest>> requestResult) {
+            ResultHandler<DynamoDbWriteRequest> resultHandler) {
         LOG.warn(
                 "DynamoDB Sink failed to persist and will retry {} entries.",
                 requestEntries.size(),
                 err);
         numRecordsSendErrorsCounter.inc(requestEntries.size());
 
-        if (isRetryable(err.getCause())) {
-            requestResult.accept(requestEntries);
+        if (isRetryable(err.getCause(), resultHandler)) {
+            resultHandler.retryForEntries(requestEntries);
         }
     }
 
-    private boolean isRetryable(Throwable err) {
+    private boolean isRetryable(Throwable err, 
ResultHandler<DynamoDbWriteRequest> resultHandler) {
         // isFatal() is really isNotFatal()
-        if (!DYNAMODB_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, 
getFatalExceptionCons())) {
+        if (!DYNAMODB_FATAL_EXCEPTION_CLASSIFIER.isFatal(
+                err, resultHandler::completeExceptionally)) {
             return false;
         }
         if (failOnError) {
-            getFatalExceptionCons()
-                    .accept(new 
DynamoDbSinkException.DynamoDbSinkFailFastException(err));
+            resultHandler.completeExceptionally(
+                    new 
DynamoDbSinkException.DynamoDbSinkFailFastException(err));
             return false;
         }
 
diff --git 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
index f0d139f..44adf5c 100644
--- 
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
+++ 
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.dynamodb.sink;
 
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
 import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
 import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider;
 
@@ -48,8 +49,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 
@@ -64,7 +63,6 @@ public class DynamoDbSinkWriterTest {
     private static final String PARTITION_KEY = "partition_key";
     private static final String SORT_KEY = "sort_key";
     private static final String TABLE_NAME = "table_name";
-    private static final long FUTURE_TIMEOUT_MS = 1000;
 
     @Test
     public void testSuccessfulRequestWithNoDeduplication() throws Exception {
@@ -86,14 +84,13 @@ public class DynamoDbSinkWriterTest {
         DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
                 getDefaultSinkWriter(
                         true, overwriteByPartitionKeys, () -> 
trackingDynamoDbAsyncClient);
-        CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new 
CompletableFuture<>();
-        Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = 
failedRequests::complete;
+        TestingResultHandler resultHandler = new TestingResultHandler();
 
-        dynamoDbSinkWriter.submitRequestEntries(inputRequests, 
failedRequestConsumer);
+        dynamoDbSinkWriter.submitRequestEntries(inputRequests, resultHandler);
         assertThat(trackingDynamoDbAsyncClient.getRequestHistory())
                 .isNotEmpty()
                 .containsExactly(expectedClientRequests);
-        assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)).isEmpty();
+        assertThat(resultHandler.isComplete()).isTrue();
     }
 
     @Test
@@ -108,14 +105,13 @@ public class DynamoDbSinkWriterTest {
         DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
                 getDefaultSinkWriter(
                         true, overwriteByPartitionKeys, () -> 
trackingDynamoDbAsyncClient);
-        CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new 
CompletableFuture<>();
-        Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = 
failedRequests::complete;
 
-        dynamoDbSinkWriter.submitRequestEntries(inputRequests, 
failedRequestConsumer);
+        TestingResultHandler resultHandler = new TestingResultHandler();
+        dynamoDbSinkWriter.submitRequestEntries(inputRequests, resultHandler);
         assertThat(trackingDynamoDbAsyncClient.getRequestHistory())
                 .isNotEmpty()
                 .containsExactly(expectedClientRequests);
-        assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)).isEmpty();
+        assertThat(resultHandler.isComplete()).isTrue();
     }
 
     @Test
@@ -131,14 +127,13 @@ public class DynamoDbSinkWriterTest {
         DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
                 getDefaultSinkWriter(
                         true, overwriteByPartitionKeys, () -> 
trackingDynamoDbAsyncClient);
-        CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new 
CompletableFuture<>();
-        Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = 
failedRequests::complete;
+        TestingResultHandler resultHandler = new TestingResultHandler();
 
-        dynamoDbSinkWriter.submitRequestEntries(inputRequests, 
failedRequestConsumer);
+        dynamoDbSinkWriter.submitRequestEntries(inputRequests, resultHandler);
         assertThat(trackingDynamoDbAsyncClient.getRequestHistory())
                 .isNotEmpty()
                 .containsExactly(expectedClientRequests);
-        assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)).isEmpty();
+        assertThat(resultHandler.isComplete()).isTrue();
     }
 
     @Test
@@ -159,10 +154,9 @@ public class DynamoDbSinkWriterTest {
         DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
                 getDefaultSinkWriter(
                         true, overwriteByPartitionKeys, () -> 
trackingDynamoDbAsyncClient);
-        CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new 
CompletableFuture<>();
-        Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = 
failedRequests::complete;
+        TestingResultHandler resultHandler = new TestingResultHandler();
 
-        dynamoDbSinkWriter.submitRequestEntries(inputRequests, 
failedRequestConsumer);
+        dynamoDbSinkWriter.submitRequestEntries(inputRequests, resultHandler);
         // Order does not matter in a batch write request
         assertThat(trackingDynamoDbAsyncClient.getRequestHistory())
                 .isNotEmpty()
@@ -171,7 +165,7 @@ public class DynamoDbSinkWriterTest {
                                 assertThat(clientBatchRequest)
                                         .containsExactlyInAnyOrderElementsOf(
                                                 expectedClientRequests));
-        assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)).isEmpty();
+        assertThat(resultHandler.isComplete()).isTrue();
     }
 
     @Test
@@ -193,12 +187,12 @@ public class DynamoDbSinkWriterTest {
         DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
                 getDefaultSinkWriter(
                         failOnError, Collections.emptyList(), () -> 
throwingDynamoDbAsyncClient);
-        CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new 
CompletableFuture<>();
-        Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = 
failedRequests::complete;
+        TestingResultHandler resultHandler = new TestingResultHandler();
 
-        dynamoDbSinkWriter.submitRequestEntries(inputRequests, 
failedRequestConsumer);
+        dynamoDbSinkWriter.submitRequestEntries(inputRequests, resultHandler);
 
-        assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, 
TimeUnit.MILLISECONDS))
+        assertThat(resultHandler.isComplete()).isFalse();
+        assertThat(resultHandler.getRetryEntries())
                 .containsExactlyInAnyOrderElementsOf(inputRequests);
     }
 
@@ -224,12 +218,12 @@ public class DynamoDbSinkWriterTest {
                         failOnError,
                         Collections.emptyList(),
                         () -> failingRecordsDynamoDbAsyncClient);
-        CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new 
CompletableFuture<>();
-        Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = 
failedRequests::complete;
+        TestingResultHandler resultHandler = new TestingResultHandler();
 
-        dynamoDbSinkWriter.submitRequestEntries(inputRequests, 
failedRequestConsumer);
+        dynamoDbSinkWriter.submitRequestEntries(inputRequests, resultHandler);
 
-        assertThat(failedRequests.get(FUTURE_TIMEOUT_MS, 
TimeUnit.MILLISECONDS))
+        assertThat(resultHandler.isComplete()).isFalse();
+        assertThat(resultHandler.getRetryEntries())
                 .usingRecursiveComparison()
                 .isEqualTo(expectedRetriedRecords);
     }
@@ -334,11 +328,19 @@ public class DynamoDbSinkWriterTest {
         DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
                 getDefaultSinkWriter(
                         failOnError, Collections.emptyList(), () -> 
throwingDynamoDbAsyncClient);
-        CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new 
CompletableFuture<>();
-        Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer = 
failedRequests::complete;
+        TestingResultHandler resultHandler = new TestingResultHandler();
 
-        dynamoDbSinkWriter.submitRequestEntries(getDefaultInputRequests(), 
failedRequestConsumer);
-        assertThat(failedRequests).isNotCompleted();
+        dynamoDbSinkWriter.submitRequestEntries(getDefaultInputRequests(), 
resultHandler);
+        assertThat(resultHandler.isComplete()).isFalse();
+        assertThat(resultHandler.getRetryEntries()).isEmpty();
+        // assert exceptionToThrow is thrown or wrapped
+        assertThat(resultHandler.getException())
+                .satisfies(
+                        e ->
+                                assertThat(
+                                                (e == exceptionToThrow.get()
+                                                        || e.getCause() == 
exceptionToThrow.get()))
+                                        .isTrue());
     }
 
     private DynamoDbSinkWriter<Map<String, AttributeValue>> 
getDefaultSinkWriter(
@@ -442,6 +444,41 @@ public class DynamoDbSinkWriterTest {
         return item;
     }
 
+    private static class TestingResultHandler implements 
ResultHandler<DynamoDbWriteRequest> {
+
+        private boolean isComplete = false;
+        private Exception exception;
+
+        private List<DynamoDbWriteRequest> retryEntries = new ArrayList<>();
+
+        @Override
+        public void complete() {
+            isComplete = true;
+        }
+
+        @Override
+        public void completeExceptionally(Exception e) {
+            exception = e;
+        }
+
+        @Override
+        public void retryForEntries(List<DynamoDbWriteRequest> list) {
+            retryEntries.addAll(list);
+        }
+
+        public boolean isComplete() {
+            return isComplete;
+        }
+
+        public Exception getException() {
+            return exception;
+        }
+
+        public List<DynamoDbWriteRequest> getRetryEntries() {
+            return retryEntries;
+        }
+    }
+
     private static class TestAsyncDynamoDbClientProvider
             implements SdkClientProvider<DynamoDbAsyncClient> {
 
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java
 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java
index 459e068..dfb845e 100644
--- 
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java
+++ 
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
 import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
 import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
 import 
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
 import org.apache.flink.connector.sqs.sink.client.SdkClientProvider;
 import org.apache.flink.metrics.Counter;
@@ -41,11 +42,9 @@ import 
software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
 
 import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
 import static 
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
@@ -123,7 +122,7 @@ class SqsSinkWriter<InputT> extends AsyncSinkWriter<InputT, 
SendMessageBatchRequ
     @Override
     protected void submitRequestEntries(
             List<SendMessageBatchRequestEntry> requestEntries,
-            Consumer<List<SendMessageBatchRequestEntry>> requestResult) {
+            ResultHandler<SendMessageBatchRequestEntry> resultHandler) {
 
         final SendMessageBatchRequest batchRequest =
                 
SendMessageBatchRequest.builder().entries(requestEntries).queueUrl(sqsUrl).build();
@@ -134,20 +133,19 @@ class SqsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, SendMessageBatchRequ
         future.whenComplete(
                         (response, err) -> {
                             if (err != null) {
-                                handleFullyFailedRequest(err, requestEntries, 
requestResult);
-                            } else if (response.failed() != null && 
response.failed().size() > 0) {
+                                handleFullyFailedRequest(err, requestEntries, 
resultHandler);
+                            } else if (response.failed() != null && 
!response.failed().isEmpty()) {
                                 handlePartiallyFailedRequest(
-                                        response, requestEntries, 
requestResult);
+                                        response, requestEntries, 
resultHandler);
                             } else {
-                                requestResult.accept(Collections.emptyList());
+                                resultHandler.complete();
                             }
                         })
                 .exceptionally(
                         ex -> {
-                            getFatalExceptionCons()
-                                    .accept(
-                                            new 
SqsSinkException.SqsFailFastSinkException(
-                                                    ex.getMessage(), ex));
+                            resultHandler.completeExceptionally(
+                                    new 
SqsSinkException.SqsFailFastSinkException(
+                                            ex.getMessage(), ex));
                             return null;
                         });
     }
@@ -165,16 +163,17 @@ class SqsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, SendMessageBatchRequ
     private void handleFullyFailedRequest(
             Throwable err,
             List<SendMessageBatchRequestEntry> requestEntries,
-            Consumer<List<SendMessageBatchRequestEntry>> requestResult) {
+            ResultHandler<SendMessageBatchRequestEntry> resultHandler) {
 
         numRecordsOutErrorsCounter.inc(requestEntries.size());
-        boolean isFatal = SQS_EXCEPTION_HANDLER.consumeIfFatal(err, 
getFatalExceptionCons());
+        boolean isFatal =
+                SQS_EXCEPTION_HANDLER.consumeIfFatal(err, 
resultHandler::completeExceptionally);
         if (isFatal) {
             return;
         }
 
         if (failOnError) {
-            getFatalExceptionCons().accept(new 
SqsSinkException.SqsFailFastSinkException(err));
+            resultHandler.completeExceptionally(new 
SqsSinkException.SqsFailFastSinkException(err));
             return;
         }
 
@@ -183,13 +182,13 @@ class SqsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, SendMessageBatchRequ
                 requestEntries.size(),
                 requestEntries.get(0).toString(),
                 err);
-        requestResult.accept(requestEntries);
+        resultHandler.retryForEntries(requestEntries);
     }
 
     private void handlePartiallyFailedRequest(
             SendMessageBatchResponse response,
             List<SendMessageBatchRequestEntry> requestEntries,
-            Consumer<List<SendMessageBatchRequestEntry>> requestResult) {
+            ResultHandler<SendMessageBatchRequestEntry> resultHandler) {
 
         LOG.warn(
                 "handlePartiallyFailedRequest: SQS Sink failed to write and 
will retry {} entries to SQS",
@@ -197,7 +196,7 @@ class SqsSinkWriter<InputT> extends AsyncSinkWriter<InputT, 
SendMessageBatchRequ
         numRecordsOutErrorsCounter.inc(response.failed().size());
 
         if (failOnError) {
-            getFatalExceptionCons().accept(new 
SqsSinkException.SqsFailFastSinkException());
+            resultHandler.completeExceptionally(new 
SqsSinkException.SqsFailFastSinkException());
             return;
         }
 
@@ -212,15 +211,14 @@ class SqsSinkWriter<InputT> extends 
AsyncSinkWriter<InputT, SendMessageBatchRequ
             } else {
                 LOG.error(
                         "handlePartiallyFailedRequest: SQS Sink failed to 
retry unsuccessful SQS publish request due to invalid failed requestId");
-                getFatalExceptionCons()
-                        .accept(
-                                new SqsSinkException.SqsFailFastSinkException(
-                                        "SQS Sink failed to retry unsuccessful 
SQS publish request due to invalid failed requestId"));
+                resultHandler.completeExceptionally(
+                        new SqsSinkException.SqsFailFastSinkException(
+                                "SQS Sink failed to retry unsuccessful SQS 
publish request due to invalid failed requestId"));
                 return;
             }
         }
 
-        requestResult.accept(failedRequestEntries);
+        resultHandler.retryForEntries(failedRequestEntries);
     }
 
     private Optional<SendMessageBatchRequestEntry> getFailedRecord(
diff --git 
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java
 
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java
index 38c25f9..c8d7073 100644
--- 
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java
+++ 
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java
@@ -21,10 +21,10 @@ import 
org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
 import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
 import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
 import org.apache.flink.connector.sqs.sink.client.SdkClientProvider;
 
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
 import software.amazon.awssdk.core.exception.SdkClientException;
@@ -48,7 +48,6 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
-import java.util.function.Consumer;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -71,12 +70,10 @@ public class SqsSinkWriterTest {
         TestSqsAsyncClientProvider testSqsAsyncClientProvider =
                 new TestSqsAsyncClientProvider(sqsAsyncClient);
         sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
-        CompletableFuture<List<SendMessageBatchRequestEntry>> failedRequests =
-                new CompletableFuture<>();
-        Consumer<List<SendMessageBatchRequestEntry>> failedRequestConsumer =
-                failedRequests::complete;
-        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
failedRequestConsumer);
-        assertThat(failedRequests).isNotCompleted();
+        TestingResultHandler resultHandler = new TestingResultHandler();
+        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
resultHandler);
+        assertThat(resultHandler.isComplete()).isFalse();
+        assertThat(resultHandler.getExceptionThrown()).isNotNull();
     }
 
     @Test
@@ -87,12 +84,10 @@ public class SqsSinkWriterTest {
                 new TestSqsAsyncClientProvider(sqsAsyncClient);
         sinkWriter = getSqsSinkWriter(true, testSqsAsyncClientProvider);
 
-        CompletableFuture<List<SendMessageBatchRequestEntry>> failedRequests =
-                new CompletableFuture<>();
-        Consumer<List<SendMessageBatchRequestEntry>> failedRequestConsumer =
-                failedRequests::complete;
-        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
failedRequestConsumer);
-        assertThat(failedRequests).isNotCompleted();
+        TestingResultHandler resultHandler = new TestingResultHandler();
+        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
resultHandler);
+        assertThat(resultHandler.isComplete()).isFalse();
+        assertThat(resultHandler.getExceptionThrown()).isNotNull();
     }
 
     @Test
@@ -103,12 +98,10 @@ public class SqsSinkWriterTest {
                 new TestSqsAsyncClientProvider(sqsAsyncClient);
         sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
 
-        CompletableFuture<List<SendMessageBatchRequestEntry>> failedRequests =
-                new CompletableFuture<>();
-        Consumer<List<SendMessageBatchRequestEntry>> failedRequestConsumer =
-                failedRequests::complete;
-        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
failedRequestConsumer);
-        assertThat(failedRequests).isCompleted();
+        TestingResultHandler resultHandler = new TestingResultHandler();
+        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
resultHandler);
+        assertThat(resultHandler.isComplete()).isFalse();
+        assertThat(resultHandler.getFailedRequests()).hasSize(2);
     }
 
     @Test
@@ -118,13 +111,12 @@ public class SqsSinkWriterTest {
                 new TestSqsAsyncClientProvider(sqsAsyncClient);
         sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
 
-        CompletableFuture<List<SendMessageBatchRequestEntry>> requests = new 
CompletableFuture<>();
-        Consumer<List<SendMessageBatchRequestEntry>> requestResult = 
requests::complete;
+        TestingResultHandler resultHandler = new TestingResultHandler();
 
-        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
requestResult);
+        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
resultHandler);
 
-        List<SendMessageBatchRequestEntry> result = requests.join();
-        Assertions.assertTrue(result.isEmpty());
+        assertThat(resultHandler.isComplete()).isTrue();
+        assertThat(resultHandler.getFailedRequests()).isEmpty();
     }
 
     @Test
@@ -135,13 +127,10 @@ public class SqsSinkWriterTest {
                 new TestSqsAsyncClientProvider(sqsAsyncClient);
         sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
 
-        CompletableFuture<List<SendMessageBatchRequestEntry>> requests = new 
CompletableFuture<>();
-        Consumer<List<SendMessageBatchRequestEntry>> requestResult = 
requests::complete;
+        TestingResultHandler resultHandler = new TestingResultHandler();
+        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
resultHandler);
 
-        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
requestResult);
-
-        List<SendMessageBatchRequestEntry> result = requests.join();
-        Assertions.assertEquals(1, result.size());
+        assertThat(resultHandler.getFailedRequests()).hasSize(1);
     }
 
     @Test
@@ -153,11 +142,11 @@ public class SqsSinkWriterTest {
                 new TestSqsAsyncClientProvider(sqsAsyncClient);
         sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
 
-        CompletableFuture<List<SendMessageBatchRequestEntry>> requests = new 
CompletableFuture<>();
-        Consumer<List<SendMessageBatchRequestEntry>> requestResult = 
requests::complete;
+        TestingResultHandler resultHandler = new TestingResultHandler();
 
-        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
requestResult);
-        assertThat(requests).isNotCompleted();
+        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
resultHandler);
+        assertThat(resultHandler.isComplete()).isFalse();
+        assertThat(resultHandler.getExceptionThrown()).isNotNull();
     }
 
     @Test
@@ -169,11 +158,11 @@ public class SqsSinkWriterTest {
                 new TestSqsAsyncClientProvider(sqsAsyncClient);
         sinkWriter = getSqsSinkWriter(true, testSqsAsyncClientProvider);
 
-        CompletableFuture<List<SendMessageBatchRequestEntry>> requests = new 
CompletableFuture<>();
-        Consumer<List<SendMessageBatchRequestEntry>> requestResult = 
requests::complete;
+        TestingResultHandler resultHandler = new TestingResultHandler();
 
-        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
requestResult);
-        assertThat(requests).isNotCompleted();
+        sinkWriter.submitRequestEntries(getDefaultInputRequests(), 
resultHandler);
+        assertThat(resultHandler.isComplete()).isFalse();
+        assertThat(resultHandler.getExceptionThrown()).isNotNull();
     }
 
     @Test
@@ -283,6 +272,40 @@ public class SqsSinkWriterTest {
                         .build());
     }
 
+    private static class TestingResultHandler
+            implements ResultHandler<SendMessageBatchRequestEntry> {
+        private boolean isComplete = false;
+        private Exception exceptionThrown = null;
+        private final List<SendMessageBatchRequestEntry> failedRequests = new 
ArrayList<>();
+
+        @Override
+        public void complete() {
+            isComplete = true;
+        }
+
+        @Override
+        public void completeExceptionally(Exception e) {
+            exceptionThrown = e;
+        }
+
+        @Override
+        public void retryForEntries(List<SendMessageBatchRequestEntry> list) {
+            failedRequests.addAll(list);
+        }
+
+        public boolean isComplete() {
+            return isComplete;
+        }
+
+        public Exception getExceptionThrown() {
+            return exceptionThrown;
+        }
+
+        public List<SendMessageBatchRequestEntry> getFailedRequests() {
+            return failedRequests;
+        }
+    }
+
     private static class ThrowingSqsAsyncClient<T extends Throwable> 
implements SqsAsyncClient {
 
         private final Optional<T> errorToReturn;
diff --git a/pom.xml b/pom.xml
index 5f7c556..189921a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,7 +56,7 @@ under the License.
         <aws.sdkv1.version>1.12.439</aws.sdkv1.version>
         <aws.sdkv2.version>2.26.19</aws.sdkv2.version>
         <netty.version>4.1.86.Final</netty.version>
-        <flink.version>1.19.0</flink.version>
+        <flink.version>1.20.0</flink.version>
         <jackson-bom.version>2.14.3</jackson-bom.version>
         <glue.schema.registry.version>1.1.18</glue.schema.registry.version>
         <guava.version>32.1.3-jre</guava.version>


Reply via email to