lincoln-lil commented on code in PR #19983:
URL: https://github.com/apache/flink/pull/19983#discussion_r908429783
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java:
##########
@@ -318,6 +392,143 @@ private void outputCompletedElement() {
}
}
+ private void doRetry(RetryableResultHandlerDelegator
resultHandlerDelegator) throws Exception {
+ // increment current attempt number
+ resultHandlerDelegator.incrementAttempts();
+
+ // fire a new attempt
+ userFunction.asyncInvoke(resultHandlerDelegator.getInputRecord(),
resultHandlerDelegator);
+ }
+
+ /** A delegator holds the real {@link ResultHandler} to handle retries. */
+ private class RetryableResultHandlerDelegator implements ResultFuture<OUT>
{
+
+ private final ResultHandler resultHandler;
+ private final ProcessingTimeService processingTimeService;
+
+ private ScheduledFuture<?> delayedRetryTimer;
+
+ /** start from 1, when this entry created, the first attempt will
happen. */
+ private int currentAttempts = 1;
+
+ private long backoffTimeMillis;
+
+ private final AtomicBoolean retryInFlight = new AtomicBoolean(false);
+
+ public RetryableResultHandlerDelegator(
+ StreamRecord<IN> inputRecord,
+ ResultFuture<OUT> resultFuture,
+ ProcessingTimeService processingTimeService) {
+ this.resultHandler = new ResultHandler(inputRecord, resultFuture);
+ this.processingTimeService = processingTimeService;
+ }
+
+ public void registerTimeout(long timeout) {
+ resultHandler.registerTimeout(processingTimeService, timeout);
+ }
+
+ @Override
+ public void complete(Collection<OUT> results) {
+ if (retryEnabled) {
+ mailboxExecutor.submit(() -> cleanupLastRetryInMailbox(),
"cleanup last retry");
+ if (!resultHandler.completed.get() &&
ifRetryOrCompleted(results, null)) {
+ return;
+ }
+ }
+ resultHandler.complete(results);
+ }
+
+ @Override
+ public void completeExceptionally(Throwable error) {
+ if (retryEnabled) {
+ mailboxExecutor.submit(() -> cleanupLastRetryInMailbox(),
"cleanup last retry");
+ if (ifRetryOrCompleted(null, error)) {
+ return;
+ }
+ }
+ resultHandler.completeExceptionally(error);
+ }
+
+ private void cleanupLastRetryInMailbox() {
+ if (retryInFlight.compareAndSet(true, false)) {
+ assert incompleteDelayRetryHandlers.contains(this);
+ // remove from delayed retry queue
+ incompleteDelayRetryHandlers.remove(this);
+ delayedRetryTimer = null;
+ }
+ }
+
+ private boolean ifRetryOrCompleted(Collection<OUT> results, Throwable
error) {
Review Comment:
I'm not sure that all users' retry predicate will not slow down the main
thread here, so I tend to keep it stays here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]