lincoln-lil commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r908601534


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +392,143 @@ private void outputCompletedElement() {
         }
     }
 
+    private void doRetry(RetryableResultHandlerDelegator 
resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.incrementAttempts();
+
+        // fire a new attempt
+        userFunction.asyncInvoke(resultHandlerDelegator.getInputRecord(), 
resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> 
{
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will 
happen. */
+        private int currentAttempts = 1;
+
+        private long backoffTimeMillis;
+
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {

Review Comment:
   Yes, this should be done.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +392,143 @@ private void outputCompletedElement() {
         }
     }
 
+    private void doRetry(RetryableResultHandlerDelegator 
resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.incrementAttempts();
+
+        // fire a new attempt
+        userFunction.asyncInvoke(resultHandlerDelegator.getInputRecord(), 
resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> 
{
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will 
happen. */
+        private int currentAttempts = 1;
+
+        private long backoffTimeMillis;
+
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            if (retryEnabled) {
+                mailboxExecutor.submit(() -> cleanupLastRetryInMailbox(), 
"cleanup last retry");
+                if (!resultHandler.completed.get() && 
ifRetryOrCompleted(results, null)) {
+                    return;
+                }
+            }
+            resultHandler.complete(results);
+        }
+
+        @Override
+        public void completeExceptionally(Throwable error) {
+            if (retryEnabled) {
+                mailboxExecutor.submit(() -> cleanupLastRetryInMailbox(), 
"cleanup last retry");
+                if (ifRetryOrCompleted(null, error)) {
+                    return;
+                }
+            }
+            resultHandler.completeExceptionally(error);
+        }
+
+        private void cleanupLastRetryInMailbox() {
+            if (retryInFlight.compareAndSet(true, false)) {
+                assert incompleteDelayRetryHandlers.contains(this);
+                // remove from delayed retry queue
+                incompleteDelayRetryHandlers.remove(this);
+                delayedRetryTimer = null;
+            }
+        }
+
+        private boolean ifRetryOrCompleted(Collection<OUT> results, Throwable 
error) {
+            if (delayedRetryAvailable.get() && 
resultHandler.inputRecord.isRecord()) {
+                boolean satisfy = false;
+                if (null != results && null != retryResultPredicate) {
+                    satisfy = (satisfy || retryResultPredicate.test(results));
+                }
+                if (null != error && null != retryExceptionPredicate) {
+                    satisfy = (satisfy || retryExceptionPredicate.test(error));
+                }
+                if (satisfy) {
+                    if (asyncRetryStrategy.canRetry(getCurrentAttempts())) {
+                        long nextBackoffTimeMillis =
+                                
asyncRetryStrategy.getBackoffTimeMillis(getCurrentAttempts());
+                        setBackoffTimeMillis(nextBackoffTimeMillis);
+                        if (delayedRetryAvailable.get()) {
+                            // timer thread will finally dispatch the task to 
mailbox executor
+                            mailboxExecutor.submit(
+                                    () -> 
trySubmitRetryInMailboxOrComplete(this, results, error),
+                                    "delayed retry or give up retry");
+                            return true;
+                        }
+                    }
+                }
+            }
+            return false;
+        }
+
+        private void trySubmitRetryInMailboxOrComplete(
+                RetryableResultHandlerDelegator resultHandlerDelegator,
+                Collection<OUT> results,
+                Throwable error) {
+            if (delayedRetryAvailable.get()) {
+                incompleteDelayRetryHandlers.add(resultHandlerDelegator);
+                retryInFlight.set(true);
+                final long delayedRetry =
+                        resultHandlerDelegator.getBackoffTimeMillis()
+                                + 
getProcessingTimeService().getCurrentProcessingTime();
+                delayedRetryTimer =
+                        processingTimeService.registerTimer(
+                                delayedRetry, timestamp -> 
doRetry(resultHandlerDelegator));
+            } else {
+                // give up retry and complete immediately
+                if (null != results) {
+                    resultHandler.complete(results);
+                } else {
+                    resultHandler.completeExceptionally(error);
+                }
+            }
+        }
+
+        public long getBackoffTimeMillis() {

Review Comment:
   Make sense, will update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to