lincoln-lil commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r919683662


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -109,11 +130,18 @@
     /** Whether object reuse has been enabled or disabled. */
     private transient boolean isObjectReuseEnabled;
 
+    private transient Predicate<Collection<OUT>> retryResultPredicate;
+
+    private transient Predicate<Throwable> retryExceptionPredicate;
+
+    private transient AtomicBoolean delayedRetryAvailable;

Review Comment:
   `retryDislabledOnFinish` seems a better choice!



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -154,6 +190,14 @@ public void setup(
             default:
                 throw new IllegalStateException("Unknown async mode: " + 
outputMode + '.');
         }
+        if 
(asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent()) {

Review Comment:
   This simplification is great!



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -128,6 +156,15 @@ public AsyncWaitOperator(
 
         this.timeout = timeout;
 
+        this.asyncRetryStrategy = asyncRetryStrategy;
+
+        this.retryEnabled =
+                // construct from utility class
+                asyncRetryStrategy != NO_RETRY_STRATEGY
+                        // construct from api
+                        || 
asyncRetryStrategy.getRetryPredicate().resultPredicate().isPresent()

Review Comment:
   You're right, the top level '||' should be '&&'



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, 
includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry 
handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator 
resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator 
resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> 
{
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will 
happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated 
complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is 
in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry 
fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection 
to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {
+                    return;
+                }
+
+                processRetryInMailBox(results, null);
+            } else {
+                resultHandler.complete(results);
+            }
+        }
+
+        private void processRetryInMailBox(Collection<OUT> results, Throwable 
error) {
+            mailboxExecutor.submit(
+                    () -> processRetry(this, results, error), "delayed retry 
or complete");

Review Comment:
   This is the debt of previous refactor, should be cleanup indeed.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, 
includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry 
handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator 
resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator 
resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> 
{
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will 
happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated 
complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is 
in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry 
fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection 
to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {

Review Comment:
   I struggled for the variable name for a while,  how about naming it as 
`retryAwaitting` ?  It mainly indicates that it is waiting for a retry, but it 
is not triggered yet.
   The difference between `lastRetryCompleted` is it does not mean that the 
retryHandler is completed/finished, just indicates the status of waiting for 
retry. WDYT?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -197,11 +248,12 @@ public void processElement(StreamRecord<IN> record) 
throws Exception {
         // add element first to the queue
         final ResultFuture<OUT> entry = addToWorkQueue(element);
 
-        final ResultHandler resultHandler = new ResultHandler(element, entry);
+        final RetryableResultHandlerDelegator resultHandler =

Review Comment:
   Performance perspective, I accept this approach. So the `retryEnabled` flag 
can be removed from `RetryableResultHandlerDelegator`.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, 
includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry 
handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator 
resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator 
resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> 
{
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will 
happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated 
complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is 
in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry 
fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection 
to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {
+                    return;
+                }
+
+                processRetryInMailBox(results, null);
+            } else {
+                resultHandler.complete(results);
+            }
+        }
+
+        private void processRetryInMailBox(Collection<OUT> results, Throwable 
error) {
+            mailboxExecutor.submit(
+                    () -> processRetry(this, results, error), "delayed retry 
or complete");
+        }
+
+        private void processRetry(
+                RetryableResultHandlerDelegator resultHandlerDelegator,
+                Collection<OUT> results,
+                Throwable error) {
+            boolean satisfy = false;
+            if (null != results && null != retryResultPredicate) {
+                satisfy = (satisfy || retryResultPredicate.test(results));
+            }
+            if (null != error && null != retryExceptionPredicate) {
+                satisfy = (satisfy || retryExceptionPredicate.test(error));
+            }
+            if (satisfy) {
+                if (asyncRetryStrategy.canRetry(currentAttempts)) {
+                    long nextBackoffTimeMillis =
+                            
asyncRetryStrategy.getBackoffTimeMillis(currentAttempts);
+                    if (delayedRetryAvailable.get()) {
+                        final long delayedRetry =
+                                nextBackoffTimeMillis
+                                        + 
getProcessingTimeService().getCurrentProcessingTime();
+
+                        // timer thread will finally dispatch the task to 
mailbox executor,
+                        // and it can only be submitted once for one attempt.
+                        delayedRetryTimer =
+                                processingTimeService.registerTimer(
+                                        delayedRetry,
+                                        timestamp -> 
doRetryWithCleanup(resultHandlerDelegator));
+
+                        // add to incomplete retry handlers, will remove it 
after retry fired
+                        inFlightDelayRetryHandlers.add(resultHandlerDelegator);
+
+                        return;
+                    }
+                }
+            }
+            // retry unsatisfied, complete it
+            if (null != results) {
+                resultHandlerDelegator.resultHandler.complete(results);
+            } else {
+                
resultHandlerDelegator.resultHandler.completeExceptionally(error);
+            }
+        }
+
+        @Override
+        public void completeExceptionally(Throwable error) {

Review Comment:
   Yes, it looks more intuitive to let them together.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, 
includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry 
handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator 
resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator 
resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> 
{
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will 
happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated 
complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is 
in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry 
fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection 
to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {
+                    return;
+                }
+
+                processRetryInMailBox(results, null);
+            } else {
+                resultHandler.complete(results);
+            }
+        }
+
+        private void processRetryInMailBox(Collection<OUT> results, Throwable 
error) {
+            mailboxExecutor.submit(
+                    () -> processRetry(this, results, error), "delayed retry 
or complete");
+        }
+
+        private void processRetry(
+                RetryableResultHandlerDelegator resultHandlerDelegator,
+                Collection<OUT> results,
+                Throwable error) {
+            boolean satisfy = false;
+            if (null != results && null != retryResultPredicate) {
+                satisfy = (satisfy || retryResultPredicate.test(results));
+            }
+            if (null != error && null != retryExceptionPredicate) {
+                satisfy = (satisfy || retryExceptionPredicate.test(error));
+            }
+            if (satisfy) {
+                if (asyncRetryStrategy.canRetry(currentAttempts)) {
+                    long nextBackoffTimeMillis =
+                            
asyncRetryStrategy.getBackoffTimeMillis(currentAttempts);
+                    if (delayedRetryAvailable.get()) {
+                        final long delayedRetry =
+                                nextBackoffTimeMillis
+                                        + 
getProcessingTimeService().getCurrentProcessingTime();
+
+                        // timer thread will finally dispatch the task to 
mailbox executor,
+                        // and it can only be submitted once for one attempt.
+                        delayedRetryTimer =
+                                processingTimeService.registerTimer(
+                                        delayedRetry,
+                                        timestamp -> 
doRetryWithCleanup(resultHandlerDelegator));
+
+                        // add to incomplete retry handlers, will remove it 
after retry fired

Review Comment:
   Do you mean that we add it since first attempt(in processElement) ?  I tried 
that, and give up for considering the possiblility of retry maybe relatively 
smaller against normal processing.
   But for the missing removal when retry finally complete, I'd like to change 
it like this:
   ```
   if (satisfy retry) {
      ....   
       if (currentAttempts == 1) {
       // add to incomplete retry handlers only for first time
           inFlightDelayRetryHandlers.add(this);
        }
   } else {
      // remove handle that has been tried from incomplete retry handlers
      if (currentAttempts > 1) {
            inFlightDelayRetryHandlers.remove(this);
      }
      // resultHandler complete...
   }
   ```
   
   



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +389,143 @@ private void outputCompletedElement() {
         }
     }
 
+    /**
+     * Besides doRetry, the cleanup work will be done after retry fired, 
includes reset retry
+     * in-flight flag and remove retry handler from the incomplete retry 
handlers.
+     */
+    private void doRetryWithCleanup(RetryableResultHandlerDelegator 
resultHandlerDelegator)
+            throws Exception {
+        doRetry(resultHandlerDelegator);
+
+        // reset retryInFlight for next possible retry
+        resultHandlerDelegator.retryInFlight.set(false);
+        // remove from incomplete retry handlers
+        inFlightDelayRetryHandlers.remove(resultHandlerDelegator);
+    }
+
+    /** Increments number of attempts and fire the attempt. */
+    private void doRetry(RetryableResultHandlerDelegator 
resultHandlerDelegator) throws Exception {
+        // increment current attempt number
+        resultHandlerDelegator.currentAttempts++;
+
+        // fire a new attempt
+        userFunction.asyncInvoke(
+                resultHandlerDelegator.resultHandler.inputRecord.getValue(),
+                resultHandlerDelegator);
+    }
+
+    /** A delegator holds the real {@link ResultHandler} to handle retries. */
+    private class RetryableResultHandlerDelegator implements ResultFuture<OUT> 
{
+
+        private final ResultHandler resultHandler;
+        private final ProcessingTimeService processingTimeService;
+
+        private ScheduledFuture<?> delayedRetryTimer;
+
+        /** start from 1, when this entry created, the first attempt will 
happen. */
+        private int currentAttempts = 1;
+
+        /**
+         * A guard similar to ResultHandler.complete to prevent repeated 
complete calls from
+         * ill-written AsyncFunction. This flag indicates a retry is 
in-flight, will reject new
+         * retry request if true. And wil be reset to false after the retry 
fired.
+         */
+        private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+        public RetryableResultHandlerDelegator(
+                StreamRecord<IN> inputRecord,
+                ResultFuture<OUT> resultFuture,
+                ProcessingTimeService processingTimeService) {
+            this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+            this.processingTimeService = processingTimeService;
+        }
+
+        public void registerTimeout(long timeout) {
+            resultHandler.registerTimeout(processingTimeService, timeout);
+        }
+
+        @Override
+        public void complete(Collection<OUT> results) {
+            Preconditions.checkNotNull(
+                    results, "Results must not be null, use empty collection 
to emit nothing");
+            if (retryEnabled
+                    && delayedRetryAvailable.get()
+                    && resultHandler.inputRecord.isRecord()) {
+                // ignore repeated call(s)
+                if (!retryInFlight.compareAndSet(false, true)) {
+                    return;
+                }
+
+                processRetryInMailBox(results, null);
+            } else {
+                resultHandler.complete(results);
+            }
+        }
+
+        private void processRetryInMailBox(Collection<OUT> results, Throwable 
error) {
+            mailboxExecutor.submit(
+                    () -> processRetry(this, results, error), "delayed retry 
or complete");
+        }
+
+        private void processRetry(
+                RetryableResultHandlerDelegator resultHandlerDelegator,
+                Collection<OUT> results,
+                Throwable error) {
+            boolean satisfy = false;
+            if (null != results && null != retryResultPredicate) {
+                satisfy = (satisfy || retryResultPredicate.test(results));
+            }
+            if (null != error && null != retryExceptionPredicate) {
+                satisfy = (satisfy || retryExceptionPredicate.test(error));
+            }
+            if (satisfy) {
+                if (asyncRetryStrategy.canRetry(currentAttempts)) {
+                    long nextBackoffTimeMillis =
+                            
asyncRetryStrategy.getBackoffTimeMillis(currentAttempts);
+                    if (delayedRetryAvailable.get()) {
+                        final long delayedRetry =
+                                nextBackoffTimeMillis
+                                        + 
getProcessingTimeService().getCurrentProcessingTime();
+
+                        // timer thread will finally dispatch the task to 
mailbox executor,
+                        // and it can only be submitted once for one attempt.
+                        delayedRetryTimer =
+                                processingTimeService.registerTimer(
+                                        delayedRetry,
+                                        timestamp -> 
doRetryWithCleanup(resultHandlerDelegator));
+
+                        // add to incomplete retry handlers, will remove it 
after retry fired
+                        inFlightDelayRetryHandlers.add(resultHandlerDelegator);
+
+                        return;

Review Comment:
   make sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to