This is an automated email from the ASF dual-hosted git repository.
hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new d22078f [FLINK-36770] make AWS sink writers use ResultHandler, move
flink version to 1.20
d22078f is described below
commit d22078fc784ef3a13c2cf0ca1cf7777f7fa6a5ac
Author: Ahmed Hamdy <[email protected]>
AuthorDate: Fri Dec 20 12:22:47 2024 +0000
[FLINK-36770] make AWS sink writers use ResultHandler, move flink version
to 1.20
---
.github/workflows/nightly.yml | 4 +-
.github/workflows/push_pr.yml | 4 +-
.../firehose/sink/KinesisFirehoseSinkWriter.java | 30 +++---
.../kinesis/sink/KinesisStreamsSinkWriter.java | 34 +++----
.../dynamodb/sink/DynamoDbSinkWriter.java | 30 +++---
.../dynamodb/sink/DynamoDbSinkWriterTest.java | 99 +++++++++++++-------
.../flink/connector/sqs/sink/SqsSinkWriter.java | 42 ++++-----
.../connector/sqs/sink/SqsSinkWriterTest.java | 101 +++++++++++++--------
pom.xml | 2 +-
9 files changed, 204 insertions(+), 142 deletions(-)
diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml
index 1eb0f2e..e167e2e 100644
--- a/.github/workflows/nightly.yml
+++ b/.github/workflows/nightly.yml
@@ -25,7 +25,7 @@ jobs:
if: github.repository_owner == 'apache'
strategy:
matrix:
- flink: [1.19-SNAPSHOT, 1.20-SNAPSHOT]
+ flink: [1.20-SNAPSHOT]
java: [ '8, 11, 17']
uses: ./.github/workflows/common.yml
with:
@@ -38,7 +38,7 @@ jobs:
python_test:
strategy:
matrix:
- flink: [1.19-SNAPSHOT, 1.20-SNAPSHOT]
+ flink: [1.20-SNAPSHOT]
uses:
apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink }}
diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index b47e29f..759d3ae 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -26,7 +26,7 @@ jobs:
uses: ./.github/workflows/common.yml
strategy:
matrix:
- flink: [1.19.1, 1.20.0]
+ flink: [1.20.0]
java: [ '8, 11, 17']
with:
flink_version: ${{ matrix.flink }}
@@ -38,7 +38,7 @@ jobs:
python_test:
strategy:
matrix:
- flink: [1.19.0, 1.20.0]
+ flink: [1.20.0]
uses:
apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink }}
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
b/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
index 6cfa607..7660a4d 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java
@@ -26,6 +26,7 @@ import
org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
import
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -45,7 +46,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
import static
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
import static
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
@@ -171,7 +171,7 @@ class KinesisFirehoseSinkWriter<InputT> extends
AsyncSinkWriter<InputT, Record>
@Override
protected void submitRequestEntries(
- List<Record> requestEntries, Consumer<List<Record>> requestResult)
{
+ List<Record> requestEntries, ResultHandler<Record> resultHandler) {
PutRecordBatchRequest batchRequest =
PutRecordBatchRequest.builder()
@@ -185,11 +185,11 @@ class KinesisFirehoseSinkWriter<InputT> extends
AsyncSinkWriter<InputT, Record>
future.whenComplete(
(response, err) -> {
if (err != null) {
- handleFullyFailedRequest(err, requestEntries,
requestResult);
+ handleFullyFailedRequest(err, requestEntries,
resultHandler);
} else if (response.failedPutCount() > 0) {
- handlePartiallyFailedRequest(response, requestEntries,
requestResult);
+ handlePartiallyFailedRequest(response, requestEntries,
resultHandler);
} else {
- requestResult.accept(Collections.emptyList());
+ resultHandler.complete();
}
});
}
@@ -205,17 +205,19 @@ class KinesisFirehoseSinkWriter<InputT> extends
AsyncSinkWriter<InputT, Record>
}
private void handleFullyFailedRequest(
- Throwable err, List<Record> requestEntries, Consumer<List<Record>>
requestResult) {
+ Throwable err, List<Record> requestEntries, ResultHandler<Record>
resultHandler) {
numRecordsOutErrorsCounter.inc(requestEntries.size());
- boolean isFatal = FIREHOSE_EXCEPTION_HANDLER.consumeIfFatal(err,
getFatalExceptionCons());
+ boolean isFatal =
+ FIREHOSE_EXCEPTION_HANDLER.consumeIfFatal(
+ err, resultHandler::completeExceptionally);
if (isFatal) {
return;
}
if (failOnError) {
- getFatalExceptionCons()
- .accept(new
KinesisFirehoseException.KinesisFirehoseFailFastException(err));
+ resultHandler.completeExceptionally(
+ new
KinesisFirehoseException.KinesisFirehoseFailFastException(err));
return;
}
@@ -223,17 +225,17 @@ class KinesisFirehoseSinkWriter<InputT> extends
AsyncSinkWriter<InputT, Record>
"KDF Sink failed to write and will retry {} entries to KDF",
requestEntries.size(),
err);
- requestResult.accept(requestEntries);
+ resultHandler.retryForEntries(requestEntries);
}
private void handlePartiallyFailedRequest(
PutRecordBatchResponse response,
List<Record> requestEntries,
- Consumer<List<Record>> requestResult) {
+ ResultHandler<Record> resultHandler) {
numRecordsOutErrorsCounter.inc(response.failedPutCount());
if (failOnError) {
- getFatalExceptionCons()
- .accept(new
KinesisFirehoseException.KinesisFirehoseFailFastException());
+ resultHandler.completeExceptionally(
+ new
KinesisFirehoseException.KinesisFirehoseFailFastException());
return;
}
@@ -248,6 +250,6 @@ class KinesisFirehoseSinkWriter<InputT> extends
AsyncSinkWriter<InputT, Record>
}
}
- requestResult.accept(failedRequestEntries);
+ resultHandler.retryForEntries(failedRequestEntries);
}
}
diff --git
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
index 684d018..aa900d5 100644
---
a/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
+++
b/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java
@@ -25,6 +25,7 @@ import
org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
import
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import
org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
import
org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
@@ -48,7 +49,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
import static
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
import static
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
@@ -199,7 +199,7 @@ class KinesisStreamsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, PutRecord
@Override
protected void submitRequestEntries(
List<PutRecordsRequestEntry> requestEntries,
- Consumer<List<PutRecordsRequestEntry>> requestResult) {
+ ResultHandler<PutRecordsRequestEntry> resultHandler) {
PutRecordsRequest batchRequest =
PutRecordsRequest.builder()
@@ -213,11 +213,11 @@ class KinesisStreamsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, PutRecord
future.whenComplete(
(response, err) -> {
if (err != null) {
- handleFullyFailedRequest(err, requestEntries,
requestResult);
+ handleFullyFailedRequest(err, requestEntries,
resultHandler);
} else if (response.failedRecordCount() > 0) {
- handlePartiallyFailedRequest(response, requestEntries,
requestResult);
+ handlePartiallyFailedRequest(response, requestEntries,
resultHandler);
} else {
- requestResult.accept(Collections.emptyList());
+ resultHandler.complete();
}
});
}
@@ -230,15 +230,15 @@ class KinesisStreamsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, PutRecord
private void handleFullyFailedRequest(
Throwable err,
List<PutRecordsRequestEntry> requestEntries,
- Consumer<List<PutRecordsRequestEntry>> requestResult) {
+ ResultHandler<PutRecordsRequestEntry> resultHandler) {
LOG.warn(
"KDS Sink failed to write and will retry {} entries to KDS",
requestEntries.size(),
err);
numRecordsOutErrorsCounter.inc(requestEntries.size());
- if (isRetryable(err)) {
- requestResult.accept(requestEntries);
+ if (isRetryable(err, resultHandler)) {
+ resultHandler.retryForEntries(requestEntries);
}
}
@@ -250,15 +250,15 @@ class KinesisStreamsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, PutRecord
private void handlePartiallyFailedRequest(
PutRecordsResponse response,
List<PutRecordsRequestEntry> requestEntries,
- Consumer<List<PutRecordsRequestEntry>> requestResult) {
+ ResultHandler<PutRecordsRequestEntry> resultHandler) {
LOG.warn(
"KDS Sink failed to write and will retry {} entries to KDS",
response.failedRecordCount());
numRecordsOutErrorsCounter.inc(response.failedRecordCount());
if (failOnError) {
- getFatalExceptionCons()
- .accept(new
KinesisStreamsException.KinesisStreamsFailFastException());
+ resultHandler.completeExceptionally(
+ new
KinesisStreamsException.KinesisStreamsFailFastException());
return;
}
List<PutRecordsRequestEntry> failedRequestEntries =
@@ -271,17 +271,19 @@ class KinesisStreamsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, PutRecord
}
}
- requestResult.accept(failedRequestEntries);
+ resultHandler.retryForEntries(failedRequestEntries);
}
- private boolean isRetryable(Throwable err) {
+ private boolean isRetryable(
+ Throwable err, ResultHandler<PutRecordsRequestEntry>
resultHandler) {
- if (!KINESIS_FATAL_EXCEPTION_CLASSIFIER.isFatal(err,
getFatalExceptionCons())) {
+ if (!KINESIS_FATAL_EXCEPTION_CLASSIFIER.isFatal(
+ err, resultHandler::completeExceptionally)) {
return false;
}
if (failOnError) {
- getFatalExceptionCons()
- .accept(new
KinesisStreamsException.KinesisStreamsFailFastException(err));
+ resultHandler.completeExceptionally(
+ new
KinesisStreamsException.KinesisStreamsFailFastException(err));
return false;
}
diff --git
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
index e0ef4c4..7a4e229 100644
---
a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
+++
b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java
@@ -25,6 +25,7 @@ import
org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider;
import org.apache.flink.connector.dynamodb.util.PrimaryKeyBuilder;
import org.apache.flink.metrics.Counter;
@@ -45,12 +46,10 @@ import
software.amazon.awssdk.services.dynamodb.model.WriteRequest;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
import static java.util.Collections.singletonMap;
import static
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
@@ -160,7 +159,7 @@ class DynamoDbSinkWriter<InputT> extends
AsyncSinkWriter<InputT, DynamoDbWriteRe
@Override
protected void submitRequestEntries(
List<DynamoDbWriteRequest> requestEntries,
- Consumer<List<DynamoDbWriteRequest>> requestResultConsumer) {
+ ResultHandler<DynamoDbWriteRequest> resultHandler) {
List<WriteRequest> items = new ArrayList<>();
@@ -190,17 +189,17 @@ class DynamoDbSinkWriter<InputT> extends
AsyncSinkWriter<InputT, DynamoDbWriteRe
future.whenComplete(
(response, err) -> {
if (err != null) {
- handleFullyFailedRequest(err, requestEntries,
requestResultConsumer);
+ handleFullyFailedRequest(err, requestEntries,
resultHandler);
} else if
(!CollectionUtil.isNullOrEmpty(response.unprocessedItems())) {
- handlePartiallyUnprocessedRequest(response,
requestResultConsumer);
+ handlePartiallyUnprocessedRequest(response,
resultHandler);
} else {
- requestResultConsumer.accept(Collections.emptyList());
+ resultHandler.complete();
}
});
}
private void handlePartiallyUnprocessedRequest(
- BatchWriteItemResponse response,
Consumer<List<DynamoDbWriteRequest>> requestResult) {
+ BatchWriteItemResponse response,
ResultHandler<DynamoDbWriteRequest> resultHandler) {
List<DynamoDbWriteRequest> unprocessed = new ArrayList<>();
for (WriteRequest writeRequest :
response.unprocessedItems().get(tableName)) {
@@ -211,32 +210,33 @@ class DynamoDbSinkWriter<InputT> extends
AsyncSinkWriter<InputT, DynamoDbWriteRe
numRecordsSendErrorsCounter.inc(unprocessed.size());
numRecordsSendPartialFailure.inc(unprocessed.size());
- requestResult.accept(unprocessed);
+ resultHandler.retryForEntries(unprocessed);
}
private void handleFullyFailedRequest(
Throwable err,
List<DynamoDbWriteRequest> requestEntries,
- Consumer<List<DynamoDbWriteRequest>> requestResult) {
+ ResultHandler<DynamoDbWriteRequest> resultHandler) {
LOG.warn(
"DynamoDB Sink failed to persist and will retry {} entries.",
requestEntries.size(),
err);
numRecordsSendErrorsCounter.inc(requestEntries.size());
- if (isRetryable(err.getCause())) {
- requestResult.accept(requestEntries);
+ if (isRetryable(err.getCause(), resultHandler)) {
+ resultHandler.retryForEntries(requestEntries);
}
}
- private boolean isRetryable(Throwable err) {
+ private boolean isRetryable(Throwable err,
ResultHandler<DynamoDbWriteRequest> resultHandler) {
// isFatal() is really isNotFatal()
- if (!DYNAMODB_FATAL_EXCEPTION_CLASSIFIER.isFatal(err,
getFatalExceptionCons())) {
+ if (!DYNAMODB_FATAL_EXCEPTION_CLASSIFIER.isFatal(
+ err, resultHandler::completeExceptionally)) {
return false;
}
if (failOnError) {
- getFatalExceptionCons()
- .accept(new
DynamoDbSinkException.DynamoDbSinkFailFastException(err));
+ resultHandler.completeExceptionally(
+ new
DynamoDbSinkException.DynamoDbSinkFailFastException(err));
return false;
}
diff --git
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
index f0d139f..44adf5c 100644
---
a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
+++
b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriterTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.dynamodb.sink;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider;
@@ -48,8 +49,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
@@ -64,7 +63,6 @@ public class DynamoDbSinkWriterTest {
private static final String PARTITION_KEY = "partition_key";
private static final String SORT_KEY = "sort_key";
private static final String TABLE_NAME = "table_name";
- private static final long FUTURE_TIMEOUT_MS = 1000;
@Test
public void testSuccessfulRequestWithNoDeduplication() throws Exception {
@@ -86,14 +84,13 @@ public class DynamoDbSinkWriterTest {
DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
getDefaultSinkWriter(
true, overwriteByPartitionKeys, () ->
trackingDynamoDbAsyncClient);
- CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new
CompletableFuture<>();
- Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer =
failedRequests::complete;
+ TestingResultHandler resultHandler = new TestingResultHandler();
- dynamoDbSinkWriter.submitRequestEntries(inputRequests,
failedRequestConsumer);
+ dynamoDbSinkWriter.submitRequestEntries(inputRequests, resultHandler);
assertThat(trackingDynamoDbAsyncClient.getRequestHistory())
.isNotEmpty()
.containsExactly(expectedClientRequests);
- assertThat(failedRequests.get(FUTURE_TIMEOUT_MS,
TimeUnit.MILLISECONDS)).isEmpty();
+ assertThat(resultHandler.isComplete()).isTrue();
}
@Test
@@ -108,14 +105,13 @@ public class DynamoDbSinkWriterTest {
DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
getDefaultSinkWriter(
true, overwriteByPartitionKeys, () ->
trackingDynamoDbAsyncClient);
- CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new
CompletableFuture<>();
- Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer =
failedRequests::complete;
- dynamoDbSinkWriter.submitRequestEntries(inputRequests,
failedRequestConsumer);
+ TestingResultHandler resultHandler = new TestingResultHandler();
+ dynamoDbSinkWriter.submitRequestEntries(inputRequests, resultHandler);
assertThat(trackingDynamoDbAsyncClient.getRequestHistory())
.isNotEmpty()
.containsExactly(expectedClientRequests);
- assertThat(failedRequests.get(FUTURE_TIMEOUT_MS,
TimeUnit.MILLISECONDS)).isEmpty();
+ assertThat(resultHandler.isComplete()).isTrue();
}
@Test
@@ -131,14 +127,13 @@ public class DynamoDbSinkWriterTest {
DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
getDefaultSinkWriter(
true, overwriteByPartitionKeys, () ->
trackingDynamoDbAsyncClient);
- CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new
CompletableFuture<>();
- Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer =
failedRequests::complete;
+ TestingResultHandler resultHandler = new TestingResultHandler();
- dynamoDbSinkWriter.submitRequestEntries(inputRequests,
failedRequestConsumer);
+ dynamoDbSinkWriter.submitRequestEntries(inputRequests, resultHandler);
assertThat(trackingDynamoDbAsyncClient.getRequestHistory())
.isNotEmpty()
.containsExactly(expectedClientRequests);
- assertThat(failedRequests.get(FUTURE_TIMEOUT_MS,
TimeUnit.MILLISECONDS)).isEmpty();
+ assertThat(resultHandler.isComplete()).isTrue();
}
@Test
@@ -159,10 +154,9 @@ public class DynamoDbSinkWriterTest {
DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
getDefaultSinkWriter(
true, overwriteByPartitionKeys, () ->
trackingDynamoDbAsyncClient);
- CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new
CompletableFuture<>();
- Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer =
failedRequests::complete;
+ TestingResultHandler resultHandler = new TestingResultHandler();
- dynamoDbSinkWriter.submitRequestEntries(inputRequests,
failedRequestConsumer);
+ dynamoDbSinkWriter.submitRequestEntries(inputRequests, resultHandler);
// Order does not matter in a batch write request
assertThat(trackingDynamoDbAsyncClient.getRequestHistory())
.isNotEmpty()
@@ -171,7 +165,7 @@ public class DynamoDbSinkWriterTest {
assertThat(clientBatchRequest)
.containsExactlyInAnyOrderElementsOf(
expectedClientRequests));
- assertThat(failedRequests.get(FUTURE_TIMEOUT_MS,
TimeUnit.MILLISECONDS)).isEmpty();
+ assertThat(resultHandler.isComplete()).isTrue();
}
@Test
@@ -193,12 +187,12 @@ public class DynamoDbSinkWriterTest {
DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
getDefaultSinkWriter(
failOnError, Collections.emptyList(), () ->
throwingDynamoDbAsyncClient);
- CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new
CompletableFuture<>();
- Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer =
failedRequests::complete;
+ TestingResultHandler resultHandler = new TestingResultHandler();
- dynamoDbSinkWriter.submitRequestEntries(inputRequests,
failedRequestConsumer);
+ dynamoDbSinkWriter.submitRequestEntries(inputRequests, resultHandler);
- assertThat(failedRequests.get(FUTURE_TIMEOUT_MS,
TimeUnit.MILLISECONDS))
+ assertThat(resultHandler.isComplete()).isFalse();
+ assertThat(resultHandler.getRetryEntries())
.containsExactlyInAnyOrderElementsOf(inputRequests);
}
@@ -224,12 +218,12 @@ public class DynamoDbSinkWriterTest {
failOnError,
Collections.emptyList(),
() -> failingRecordsDynamoDbAsyncClient);
- CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new
CompletableFuture<>();
- Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer =
failedRequests::complete;
+ TestingResultHandler resultHandler = new TestingResultHandler();
- dynamoDbSinkWriter.submitRequestEntries(inputRequests,
failedRequestConsumer);
+ dynamoDbSinkWriter.submitRequestEntries(inputRequests, resultHandler);
- assertThat(failedRequests.get(FUTURE_TIMEOUT_MS,
TimeUnit.MILLISECONDS))
+ assertThat(resultHandler.isComplete()).isFalse();
+ assertThat(resultHandler.getRetryEntries())
.usingRecursiveComparison()
.isEqualTo(expectedRetriedRecords);
}
@@ -334,11 +328,19 @@ public class DynamoDbSinkWriterTest {
DynamoDbSinkWriter<Map<String, AttributeValue>> dynamoDbSinkWriter =
getDefaultSinkWriter(
failOnError, Collections.emptyList(), () ->
throwingDynamoDbAsyncClient);
- CompletableFuture<List<DynamoDbWriteRequest>> failedRequests = new
CompletableFuture<>();
- Consumer<List<DynamoDbWriteRequest>> failedRequestConsumer =
failedRequests::complete;
+ TestingResultHandler resultHandler = new TestingResultHandler();
- dynamoDbSinkWriter.submitRequestEntries(getDefaultInputRequests(),
failedRequestConsumer);
- assertThat(failedRequests).isNotCompleted();
+ dynamoDbSinkWriter.submitRequestEntries(getDefaultInputRequests(),
resultHandler);
+ assertThat(resultHandler.isComplete()).isFalse();
+ assertThat(resultHandler.getRetryEntries()).isEmpty();
+ // assert exceptionToThrow is thrown or wrapped
+ assertThat(resultHandler.getException())
+ .satisfies(
+ e ->
+ assertThat(
+ (e == exceptionToThrow.get()
+ || e.getCause() ==
exceptionToThrow.get()))
+ .isTrue());
}
private DynamoDbSinkWriter<Map<String, AttributeValue>>
getDefaultSinkWriter(
@@ -442,6 +444,41 @@ public class DynamoDbSinkWriterTest {
return item;
}
+ private static class TestingResultHandler implements
ResultHandler<DynamoDbWriteRequest> {
+
+ private boolean isComplete = false;
+ private Exception exception;
+
+ private List<DynamoDbWriteRequest> retryEntries = new ArrayList<>();
+
+ @Override
+ public void complete() {
+ isComplete = true;
+ }
+
+ @Override
+ public void completeExceptionally(Exception e) {
+ exception = e;
+ }
+
+ @Override
+ public void retryForEntries(List<DynamoDbWriteRequest> list) {
+ retryEntries.addAll(list);
+ }
+
+ public boolean isComplete() {
+ return isComplete;
+ }
+
+ public Exception getException() {
+ return exception;
+ }
+
+ public List<DynamoDbWriteRequest> getRetryEntries() {
+ return retryEntries;
+ }
+ }
+
private static class TestAsyncDynamoDbClientProvider
implements SdkClientProvider<DynamoDbAsyncClient> {
diff --git
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java
index 459e068..dfb845e 100644
---
a/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java
+++
b/flink-connector-aws/flink-connector-sqs/src/main/java/org/apache/flink/connector/sqs/sink/SqsSinkWriter.java
@@ -25,6 +25,7 @@ import
org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
import
org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.connector.sqs.sink.client.SdkClientProvider;
import org.apache.flink.metrics.Counter;
@@ -41,11 +42,9 @@ import
software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
import static
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
import static
org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
@@ -123,7 +122,7 @@ class SqsSinkWriter<InputT> extends AsyncSinkWriter<InputT,
SendMessageBatchRequ
@Override
protected void submitRequestEntries(
List<SendMessageBatchRequestEntry> requestEntries,
- Consumer<List<SendMessageBatchRequestEntry>> requestResult) {
+ ResultHandler<SendMessageBatchRequestEntry> resultHandler) {
final SendMessageBatchRequest batchRequest =
SendMessageBatchRequest.builder().entries(requestEntries).queueUrl(sqsUrl).build();
@@ -134,20 +133,19 @@ class SqsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, SendMessageBatchRequ
future.whenComplete(
(response, err) -> {
if (err != null) {
- handleFullyFailedRequest(err, requestEntries,
requestResult);
- } else if (response.failed() != null &&
response.failed().size() > 0) {
+ handleFullyFailedRequest(err, requestEntries,
resultHandler);
+ } else if (response.failed() != null &&
!response.failed().isEmpty()) {
handlePartiallyFailedRequest(
- response, requestEntries,
requestResult);
+ response, requestEntries,
resultHandler);
} else {
- requestResult.accept(Collections.emptyList());
+ resultHandler.complete();
}
})
.exceptionally(
ex -> {
- getFatalExceptionCons()
- .accept(
- new
SqsSinkException.SqsFailFastSinkException(
- ex.getMessage(), ex));
+ resultHandler.completeExceptionally(
+ new
SqsSinkException.SqsFailFastSinkException(
+ ex.getMessage(), ex));
return null;
});
}
@@ -165,16 +163,17 @@ class SqsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, SendMessageBatchRequ
private void handleFullyFailedRequest(
Throwable err,
List<SendMessageBatchRequestEntry> requestEntries,
- Consumer<List<SendMessageBatchRequestEntry>> requestResult) {
+ ResultHandler<SendMessageBatchRequestEntry> resultHandler) {
numRecordsOutErrorsCounter.inc(requestEntries.size());
- boolean isFatal = SQS_EXCEPTION_HANDLER.consumeIfFatal(err,
getFatalExceptionCons());
+ boolean isFatal =
+ SQS_EXCEPTION_HANDLER.consumeIfFatal(err,
resultHandler::completeExceptionally);
if (isFatal) {
return;
}
if (failOnError) {
- getFatalExceptionCons().accept(new
SqsSinkException.SqsFailFastSinkException(err));
+ resultHandler.completeExceptionally(new
SqsSinkException.SqsFailFastSinkException(err));
return;
}
@@ -183,13 +182,13 @@ class SqsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, SendMessageBatchRequ
requestEntries.size(),
requestEntries.get(0).toString(),
err);
- requestResult.accept(requestEntries);
+ resultHandler.retryForEntries(requestEntries);
}
private void handlePartiallyFailedRequest(
SendMessageBatchResponse response,
List<SendMessageBatchRequestEntry> requestEntries,
- Consumer<List<SendMessageBatchRequestEntry>> requestResult) {
+ ResultHandler<SendMessageBatchRequestEntry> resultHandler) {
LOG.warn(
"handlePartiallyFailedRequest: SQS Sink failed to write and
will retry {} entries to SQS",
@@ -197,7 +196,7 @@ class SqsSinkWriter<InputT> extends AsyncSinkWriter<InputT,
SendMessageBatchRequ
numRecordsOutErrorsCounter.inc(response.failed().size());
if (failOnError) {
- getFatalExceptionCons().accept(new
SqsSinkException.SqsFailFastSinkException());
+ resultHandler.completeExceptionally(new
SqsSinkException.SqsFailFastSinkException());
return;
}
@@ -212,15 +211,14 @@ class SqsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, SendMessageBatchRequ
} else {
LOG.error(
"handlePartiallyFailedRequest: SQS Sink failed to
retry unsuccessful SQS publish request due to invalid failed requestId");
- getFatalExceptionCons()
- .accept(
- new SqsSinkException.SqsFailFastSinkException(
- "SQS Sink failed to retry unsuccessful
SQS publish request due to invalid failed requestId"));
+ resultHandler.completeExceptionally(
+ new SqsSinkException.SqsFailFastSinkException(
+ "SQS Sink failed to retry unsuccessful SQS
publish request due to invalid failed requestId"));
return;
}
}
- requestResult.accept(failedRequestEntries);
+ resultHandler.retryForEntries(failedRequestEntries);
}
private Optional<SendMessageBatchRequestEntry> getFailedRecord(
diff --git
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java
index 38c25f9..c8d7073 100644
---
a/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java
+++
b/flink-connector-aws/flink-connector-sqs/src/test/java/org/apache/flink/connector/sqs/sink/SqsSinkWriterTest.java
@@ -21,10 +21,10 @@ import
org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.aws.testutils.AWSServicesTestUtils;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
import org.apache.flink.connector.sqs.sink.client.SdkClientProvider;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
import software.amazon.awssdk.core.exception.SdkClientException;
@@ -48,7 +48,6 @@ import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
-import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -71,12 +70,10 @@ public class SqsSinkWriterTest {
TestSqsAsyncClientProvider testSqsAsyncClientProvider =
new TestSqsAsyncClientProvider(sqsAsyncClient);
sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
- CompletableFuture<List<SendMessageBatchRequestEntry>> failedRequests =
- new CompletableFuture<>();
- Consumer<List<SendMessageBatchRequestEntry>> failedRequestConsumer =
- failedRequests::complete;
- sinkWriter.submitRequestEntries(getDefaultInputRequests(),
failedRequestConsumer);
- assertThat(failedRequests).isNotCompleted();
+ TestingResultHandler resultHandler = new TestingResultHandler();
+ sinkWriter.submitRequestEntries(getDefaultInputRequests(),
resultHandler);
+ assertThat(resultHandler.isComplete()).isFalse();
+ assertThat(resultHandler.getExceptionThrown()).isNotNull();
}
@Test
@@ -87,12 +84,10 @@ public class SqsSinkWriterTest {
new TestSqsAsyncClientProvider(sqsAsyncClient);
sinkWriter = getSqsSinkWriter(true, testSqsAsyncClientProvider);
- CompletableFuture<List<SendMessageBatchRequestEntry>> failedRequests =
- new CompletableFuture<>();
- Consumer<List<SendMessageBatchRequestEntry>> failedRequestConsumer =
- failedRequests::complete;
- sinkWriter.submitRequestEntries(getDefaultInputRequests(),
failedRequestConsumer);
- assertThat(failedRequests).isNotCompleted();
+ TestingResultHandler resultHandler = new TestingResultHandler();
+ sinkWriter.submitRequestEntries(getDefaultInputRequests(),
resultHandler);
+ assertThat(resultHandler.isComplete()).isFalse();
+ assertThat(resultHandler.getExceptionThrown()).isNotNull();
}
@Test
@@ -103,12 +98,10 @@ public class SqsSinkWriterTest {
new TestSqsAsyncClientProvider(sqsAsyncClient);
sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
- CompletableFuture<List<SendMessageBatchRequestEntry>> failedRequests =
- new CompletableFuture<>();
- Consumer<List<SendMessageBatchRequestEntry>> failedRequestConsumer =
- failedRequests::complete;
- sinkWriter.submitRequestEntries(getDefaultInputRequests(),
failedRequestConsumer);
- assertThat(failedRequests).isCompleted();
+ TestingResultHandler resultHandler = new TestingResultHandler();
+ sinkWriter.submitRequestEntries(getDefaultInputRequests(),
resultHandler);
+ assertThat(resultHandler.isComplete()).isFalse();
+ assertThat(resultHandler.getFailedRequests()).hasSize(2);
}
@Test
@@ -118,13 +111,12 @@ public class SqsSinkWriterTest {
new TestSqsAsyncClientProvider(sqsAsyncClient);
sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
- CompletableFuture<List<SendMessageBatchRequestEntry>> requests = new
CompletableFuture<>();
- Consumer<List<SendMessageBatchRequestEntry>> requestResult =
requests::complete;
+ TestingResultHandler resultHandler = new TestingResultHandler();
- sinkWriter.submitRequestEntries(getDefaultInputRequests(),
requestResult);
+ sinkWriter.submitRequestEntries(getDefaultInputRequests(),
resultHandler);
- List<SendMessageBatchRequestEntry> result = requests.join();
- Assertions.assertTrue(result.isEmpty());
+ assertThat(resultHandler.isComplete()).isTrue();
+ assertThat(resultHandler.getFailedRequests()).isEmpty();
}
@Test
@@ -135,13 +127,10 @@ public class SqsSinkWriterTest {
new TestSqsAsyncClientProvider(sqsAsyncClient);
sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
- CompletableFuture<List<SendMessageBatchRequestEntry>> requests = new
CompletableFuture<>();
- Consumer<List<SendMessageBatchRequestEntry>> requestResult =
requests::complete;
+ TestingResultHandler resultHandler = new TestingResultHandler();
+ sinkWriter.submitRequestEntries(getDefaultInputRequests(),
resultHandler);
- sinkWriter.submitRequestEntries(getDefaultInputRequests(),
requestResult);
-
- List<SendMessageBatchRequestEntry> result = requests.join();
- Assertions.assertEquals(1, result.size());
+ assertThat(resultHandler.getFailedRequests()).hasSize(1);
}
@Test
@@ -153,11 +142,11 @@ public class SqsSinkWriterTest {
new TestSqsAsyncClientProvider(sqsAsyncClient);
sinkWriter = getSqsSinkWriter(false, testSqsAsyncClientProvider);
- CompletableFuture<List<SendMessageBatchRequestEntry>> requests = new
CompletableFuture<>();
- Consumer<List<SendMessageBatchRequestEntry>> requestResult =
requests::complete;
+ TestingResultHandler resultHandler = new TestingResultHandler();
- sinkWriter.submitRequestEntries(getDefaultInputRequests(),
requestResult);
- assertThat(requests).isNotCompleted();
+ sinkWriter.submitRequestEntries(getDefaultInputRequests(),
resultHandler);
+ assertThat(resultHandler.isComplete()).isFalse();
+ assertThat(resultHandler.getExceptionThrown()).isNotNull();
}
@Test
@@ -169,11 +158,11 @@ public class SqsSinkWriterTest {
new TestSqsAsyncClientProvider(sqsAsyncClient);
sinkWriter = getSqsSinkWriter(true, testSqsAsyncClientProvider);
- CompletableFuture<List<SendMessageBatchRequestEntry>> requests = new
CompletableFuture<>();
- Consumer<List<SendMessageBatchRequestEntry>> requestResult =
requests::complete;
+ TestingResultHandler resultHandler = new TestingResultHandler();
- sinkWriter.submitRequestEntries(getDefaultInputRequests(),
requestResult);
- assertThat(requests).isNotCompleted();
+ sinkWriter.submitRequestEntries(getDefaultInputRequests(),
resultHandler);
+ assertThat(resultHandler.isComplete()).isFalse();
+ assertThat(resultHandler.getExceptionThrown()).isNotNull();
}
@Test
@@ -283,6 +272,40 @@ public class SqsSinkWriterTest {
.build());
}
+ private static class TestingResultHandler
+ implements ResultHandler<SendMessageBatchRequestEntry> {
+ private boolean isComplete = false;
+ private Exception exceptionThrown = null;
+ private final List<SendMessageBatchRequestEntry> failedRequests = new
ArrayList<>();
+
+ @Override
+ public void complete() {
+ isComplete = true;
+ }
+
+ @Override
+ public void completeExceptionally(Exception e) {
+ exceptionThrown = e;
+ }
+
+ @Override
+ public void retryForEntries(List<SendMessageBatchRequestEntry> list) {
+ failedRequests.addAll(list);
+ }
+
+ public boolean isComplete() {
+ return isComplete;
+ }
+
+ public Exception getExceptionThrown() {
+ return exceptionThrown;
+ }
+
+ public List<SendMessageBatchRequestEntry> getFailedRequests() {
+ return failedRequests;
+ }
+ }
+
private static class ThrowingSqsAsyncClient<T extends Throwable>
implements SqsAsyncClient {
private final Optional<T> errorToReturn;
diff --git a/pom.xml b/pom.xml
index 5f7c556..189921a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,7 +56,7 @@ under the License.
<aws.sdkv1.version>1.12.439</aws.sdkv1.version>
<aws.sdkv2.version>2.26.19</aws.sdkv2.version>
<netty.version>4.1.86.Final</netty.version>
- <flink.version>1.19.0</flink.version>
+ <flink.version>1.20.0</flink.version>
<jackson-bom.version>2.14.3</jackson-bom.version>
<glue.schema.registry.version>1.1.18</glue.schema.registry.version>
<guava.version>32.1.3-jre</guava.version>