This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9412c99 [FLINK-25811][connector/base] changing failed requests
handler to accept list in AsyncSinkWriter
9412c99 is described below
commit 9412c992e50004eb2412530a3b1c0a288fc5fc64
Author: Ahmed Hamdy <[email protected]>
AuthorDate: Tue Jan 25 15:34:52 2022 +0000
[FLINK-25811][connector/base] changing failed requests handler to accept
list in AsyncSinkWriter
---
.../kinesis/sink/KinesisDataStreamsSinkWriter.java | 7 +++----
.../connector/base/sink/writer/AsyncSinkWriter.java | 16 +++++++++-------
.../flink/connector/base/sink/ArrayListAsyncSink.java | 2 +-
.../connector/base/sink/writer/AsyncSinkWriterTest.java | 5 ++---
4 files changed, 15 insertions(+), 15 deletions(-)
diff --git
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
index 984998f..b720e5e 100644
---
a/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
+++
b/flink-connectors/flink-connector-aws-kinesis-data-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisDataStreamsSinkWriter.java
@@ -36,7 +36,6 @@ import
software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
@@ -112,7 +111,7 @@ class KinesisDataStreamsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, PutRe
@Override
protected void submitRequestEntries(
List<PutRecordsRequestEntry> requestEntries,
- Consumer<Collection<PutRecordsRequestEntry>> requestResult) {
+ Consumer<List<PutRecordsRequestEntry>> requestResult) {
PutRecordsRequest batchRequest =
PutRecordsRequest.builder().records(requestEntries).streamName(streamName).build();
@@ -141,7 +140,7 @@ class KinesisDataStreamsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, PutRe
private void handleFullyFailedRequest(
Throwable err,
List<PutRecordsRequestEntry> requestEntries,
- Consumer<Collection<PutRecordsRequestEntry>> requestResult) {
+ Consumer<List<PutRecordsRequestEntry>> requestResult) {
LOG.warn("KDS Sink failed to persist {} entries to KDS",
requestEntries.size(), err);
numRecordsOutErrorsCounter.inc(requestEntries.size());
@@ -153,7 +152,7 @@ class KinesisDataStreamsSinkWriter<InputT> extends
AsyncSinkWriter<InputT, PutRe
private void handlePartiallyFailedRequest(
PutRecordsResponse response,
List<PutRecordsRequestEntry> requestEntries,
- Consumer<Collection<PutRecordsRequestEntry>> requestResult) {
+ Consumer<List<PutRecordsRequestEntry>> requestResult) {
LOG.warn("KDS Sink failed to persist {} entries to KDS",
response.failedRecordCount());
numRecordsOutErrorsCounter.inc(response.failedRecordCount());
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
index fa158fe..263164e 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
@@ -34,6 +34,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
+import java.util.ListIterator;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -186,7 +187,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT
extends Serializable
* method. All other elements are assumed to have been successfully
persisted.
*/
protected abstract void submitRequestEntries(
- List<RequestEntryT> requestEntries,
Consumer<Collection<RequestEntryT>> requestResult);
+ List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>>
requestResult);
/**
* This method allows the getting of the size of a {@code RequestEntryT}
in bytes. The size in
@@ -300,7 +301,7 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT
extends Serializable
}
long timestampOfRequest = System.currentTimeMillis();
- Consumer<Collection<RequestEntryT>> requestResult =
+ Consumer<List<RequestEntryT>> requestResult =
failedRequestEntries ->
mailboxExecutor.execute(
() -> completeRequest(failedRequestEntries,
timestampOfRequest),
@@ -343,14 +344,15 @@ public abstract class AsyncSinkWriter<InputT,
RequestEntryT extends Serializable
*
* @param failedRequestEntries requestEntries that need to be retried
*/
- private void completeRequest(
- Collection<RequestEntryT> failedRequestEntries, long
requestStartTime) {
+ private void completeRequest(List<RequestEntryT> failedRequestEntries,
long requestStartTime) {
lastSendTimestamp = requestStartTime;
ackTime = System.currentTimeMillis();
inFlightRequestsCount--;
- List<RequestEntryT> requestsList = new
ArrayList<>(failedRequestEntries);
- Collections.reverse(requestsList);
- requestsList.forEach(failedEntry -> addEntryToBuffer(failedEntry,
true));
+ ListIterator<RequestEntryT> iterator =
+ failedRequestEntries.listIterator(failedRequestEntries.size());
+ while (iterator.hasPrevious()) {
+ addEntryToBuffer(iterator.previous(), true);
+ }
}
private void addEntryToBuffer(RequestEntryT entry, boolean insertAtHead) {
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
index dc48db8..946e976 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/ArrayListAsyncSink.java
@@ -69,7 +69,7 @@ public class ArrayListAsyncSink extends AsyncSinkBase<String,
Integer> {
@Override
protected void submitRequestEntries(
- List<Integer> requestEntries,
Consumer<Collection<Integer>> requestResult) {
+ List<Integer> requestEntries, Consumer<List<Integer>>
requestResult) {
try {
ArrayListDestination.putRecords(requestEntries);
} catch (RuntimeException e) {
diff --git
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
index 4352305..b508159 100644
---
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
+++
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
@@ -40,7 +40,6 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
@@ -886,7 +885,7 @@ public class AsyncSinkWriterTest {
*/
@Override
protected void submitRequestEntries(
- List<Integer> requestEntries, Consumer<Collection<Integer>>
requestResult) {
+ List<Integer> requestEntries, Consumer<List<Integer>>
requestResult) {
maybeDelay();
if (requestEntries.stream().anyMatch(val -> val > 100 && val <=
200)) {
@@ -1146,7 +1145,7 @@ public class AsyncSinkWriterTest {
@Override
protected void submitRequestEntries(
- List<Integer> requestEntries, Consumer<Collection<Integer>>
requestResult) {
+ List<Integer> requestEntries, Consumer<List<Integer>>
requestResult) {
if (requestEntries.size() == 3) {
try {
delayedStartLatch.countDown();