liuml07 commented on code in PR #24839:
URL: https://github.com/apache/flink/pull/24839#discussion_r1628815201


##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -533,4 +607,77 @@ private int getNextBatchSizeLimit() {
     protected Consumer<Exception> getFatalExceptionCons() {
         return fatalExceptionCons;
     }
+
+    /** An implementation of {@link ResultHandler} that supports timeout. */
+    private class AsyncSinkWriterResultHandler implements 
ResultHandler<RequestEntryT> {
+        private final ScheduledFuture<?> scheduledFuture;
+        private final long requestTimestamp;
+        private final int batchSize;
+        private final AtomicBoolean isCompleted = new AtomicBoolean(false);
+        private final List<RequestEntryT> batchEntries;
+
+        public AsyncSinkWriterResultHandler(
+                long requestTimestamp, List<RequestEntryT> batchEntries, 
RequestInfo requestInfo) {
+            this.scheduledFuture =
+                    timeService.registerTimer(
+                            timeService.getCurrentProcessingTime() + 
requestTimeoutMS,
+                            instant -> this.timeout());
+            this.requestTimestamp = requestTimestamp;
+            this.batchEntries = batchEntries;
+            this.batchSize = requestInfo.getBatchSize();
+        }
+
+        @Override
+        public void complete() {
+            if (isCompleted.compareAndSet(false, true)) {
+                scheduledFuture.cancel(false);
+                mailboxExecutor.execute(
+                        () -> completeRequest(Collections.emptyList(), 
batchSize, requestTimestamp),
+                        "Mark in-flight request as completed successfully",
+                        batchSize);
+            }
+        }
+
+        @Override
+        public void completeExceptionally(Exception e) {
+            if (isCompleted.compareAndSet(false, true)) {
+                scheduledFuture.cancel(false);
+                mailboxExecutor.execute(
+                        () -> getFatalExceptionCons().accept(e),
+                        "Mark in-flight request as failed with fatal 
exception",

Review Comment:
   having %s for exception message?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -533,4 +607,77 @@ private int getNextBatchSizeLimit() {
     protected Consumer<Exception> getFatalExceptionCons() {
         return fatalExceptionCons;
     }
+
+    /** An implementation of {@link ResultHandler} that supports timeout. */
+    private class AsyncSinkWriterResultHandler implements 
ResultHandler<RequestEntryT> {
+        private final ScheduledFuture<?> scheduledFuture;
+        private final long requestTimestamp;
+        private final int batchSize;
+        private final AtomicBoolean isCompleted = new AtomicBoolean(false);
+        private final List<RequestEntryT> batchEntries;
+
+        public AsyncSinkWriterResultHandler(
+                long requestTimestamp, List<RequestEntryT> batchEntries, 
RequestInfo requestInfo) {
+            this.scheduledFuture =
+                    timeService.registerTimer(
+                            timeService.getCurrentProcessingTime() + 
requestTimeoutMS,
+                            instant -> this.timeout());
+            this.requestTimestamp = requestTimestamp;
+            this.batchEntries = batchEntries;
+            this.batchSize = requestInfo.getBatchSize();
+        }
+
+        @Override
+        public void complete() {
+            if (isCompleted.compareAndSet(false, true)) {
+                scheduledFuture.cancel(false);
+                mailboxExecutor.execute(
+                        () -> completeRequest(Collections.emptyList(), 
batchSize, requestTimestamp),
+                        "Mark in-flight request as completed successfully",

Review Comment:
   having %d for the `batchSize`?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -181,15 +187,88 @@ public abstract class AsyncSinkWriter<InputT, 
RequestEntryT extends Serializable
      * <p>During checkpointing, the sink needs to ensure that there are no 
outstanding in-flight
      * requests.
      *
+     * <p>This method is deprecated in favor of {@code 
submitRequestEntries(List<RequestEntryT>

Review Comment:
   nit: use `@deprecated` in Javadoc for this?



##########
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java:
##########
@@ -533,4 +607,77 @@ private int getNextBatchSizeLimit() {
     protected Consumer<Exception> getFatalExceptionCons() {
         return fatalExceptionCons;
     }
+
+    /** An implementation of {@link ResultHandler} that supports timeout. */
+    private class AsyncSinkWriterResultHandler implements 
ResultHandler<RequestEntryT> {
+        private final ScheduledFuture<?> scheduledFuture;
+        private final long requestTimestamp;
+        private final int batchSize;
+        private final AtomicBoolean isCompleted = new AtomicBoolean(false);
+        private final List<RequestEntryT> batchEntries;
+
+        public AsyncSinkWriterResultHandler(
+                long requestTimestamp, List<RequestEntryT> batchEntries, 
RequestInfo requestInfo) {
+            this.scheduledFuture =
+                    timeService.registerTimer(
+                            timeService.getCurrentProcessingTime() + 
requestTimeoutMS,
+                            instant -> this.timeout());
+            this.requestTimestamp = requestTimestamp;
+            this.batchEntries = batchEntries;
+            this.batchSize = requestInfo.getBatchSize();
+        }
+
+        @Override
+        public void complete() {
+            if (isCompleted.compareAndSet(false, true)) {
+                scheduledFuture.cancel(false);
+                mailboxExecutor.execute(
+                        () -> completeRequest(Collections.emptyList(), 
batchSize, requestTimestamp),
+                        "Mark in-flight request as completed successfully",
+                        batchSize);
+            }
+        }
+
+        @Override
+        public void completeExceptionally(Exception e) {
+            if (isCompleted.compareAndSet(false, true)) {
+                scheduledFuture.cancel(false);
+                mailboxExecutor.execute(
+                        () -> getFatalExceptionCons().accept(e),
+                        "Mark in-flight request as failed with fatal 
exception",
+                        e.getMessage());
+            }
+        }
+
+        @Override
+        public void retryForEntries(List<RequestEntryT> requestEntriesToRetry) 
{
+            if (isCompleted.compareAndSet(false, true)) {
+                scheduledFuture.cancel(false);
+                mailboxExecutor.execute(
+                        () -> completeRequest(requestEntriesToRetry, 
batchSize, requestTimestamp),
+                        "Mark in-flight request as completed with %d failed 
request entries",
+                        requestEntriesToRetry.size());
+            }
+        }
+
+        public void timeout() {
+            if (isCompleted.compareAndSet(false, true)) {
+                mailboxExecutor.execute(
+                        () -> {
+                            if (failOnTimeout) {
+                                getFatalExceptionCons()
+                                        .accept(
+                                                new TimeoutException(
+                                                        "Request timed out 
after "
+                                                                + 
requestTimeoutMS
+                                                                + "ms with 
failOnTimeout set to true."));
+                            } else {
+                                // Retry the request on timeout
+                                completeRequest(batchEntries, batchSize, 
requestTimestamp);
+                            }
+                        },
+                        "Mark in-flight request as completed with timeout");

Review Comment:
   nit: add `requestTimeoutMS` to the description?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to