gaoyunhaii commented on code in PR #19983: URL: https://github.com/apache/flink/pull/19983#discussion_r907988175
########## docs/content.zh/docs/dev/datastream/operators/asyncio.md: ########## @@ -30,6 +30,8 @@ under the License. 对于不熟悉异步或者事件驱动编程的用户,建议先储备一些关于 Future 和事件驱动编程的知识。 提示:这篇文档 [FLIP-12: 异步 I/O 的设计和实现](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673)介绍了关于设计和实现异步 I/O 功能的细节。 +对于新增的重试支持涉及和实现细节可以参考[FLIP-232: 为 DataStream API 异步 I/O 操作增加重试支持](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963) Review Comment: `对于新增的重试支持的实现细节可以参考` ? Add period at the end of sentence. ########## docs/content/docs/dev/datastream/operators/asyncio.md: ########## @@ -32,7 +32,8 @@ event-driven programming may be useful preparation. Note: Details about the design and implementation of the asynchronous I/O utility can be found in the proposal and design document [FLIP-12: Asynchronous I/O Design and Implementation](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673). - +Details about the new retry support can be found in document +[FLIP-232: Add Retry Support For Async I/O In DataStream API](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883963)。 Review Comment: Use English period. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java: ########## @@ -66,7 +70,29 @@ private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator( long timeout, int bufSize, OutputMode mode) { + return addOperator(in, func, timeout, bufSize, mode, NO_RETRY_STRATEGY); + } + /** + * Add an AsyncWaitOperator. Review Comment: Following the new customs, the method comments usually use third person singular, namely `Adds ...`. And might also the same with other existing methods and newly added methods. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java: ########## @@ -66,7 +70,29 @@ private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator( long timeout, int bufSize, OutputMode mode) { + return addOperator(in, func, timeout, bufSize, mode, NO_RETRY_STRATEGY); Review Comment: I tend to we directly remove this version of `addOperator`, since this is a private method and it seems no much need to keep the duplication to only avoid parameters for the following four calls. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java: ########## @@ -163,4 +193,134 @@ public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait( return addOperator( in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.ORDERED); } + + // ======= retryable ======== Review Comment: I tend to we might not need this line of comment~ ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java: ########## @@ -163,4 +193,134 @@ public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait( return addOperator( in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.ORDERED); } + + // ======= retryable ======== + + /** + * Add an AsyncWaitOperator with an AsyncRetryStrategy to support retry of AsyncFunction. The + * order of output stream records may be reordered. + * + * @param in Input {@link DataStream} + * @param func {@link AsyncFunction} + * @param timeout from first invoke to final completion of asynchronous operation, may include + * multiple retries, and will be reset in case of restart + * @param timeUnit of the given timeout + * @param asyncRetryStrategy The strategy of reattempt async i/o operation that can be triggered + * @param <IN> Type of input record + * @param <OUT> Type of output record + * @return A new {@link SingleOutputStreamOperator}. + */ + public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWaitWithRetry( + DataStream<IN> in, + AsyncFunction<IN, OUT> func, + long timeout, + TimeUnit timeUnit, + AsyncRetryStrategy asyncRetryStrategy) { + Preconditions.checkArgument( Review Comment: Perhaps we could move the check to the `addOperator` for unification? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java: ########## @@ -318,6 +392,143 @@ private void outputCompletedElement() { } } + private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception { + // increment current attempt number + resultHandlerDelegator.incrementAttempts(); + + // fire a new attempt + userFunction.asyncInvoke(resultHandlerDelegator.getInputRecord(), resultHandlerDelegator); + } + + /** A delegator holds the real {@link ResultHandler} to handle retries. */ + private class RetryableResultHandlerDelegator implements ResultFuture<OUT> { + + private final ResultHandler resultHandler; + private final ProcessingTimeService processingTimeService; + + private ScheduledFuture<?> delayedRetryTimer; + + /** start from 1, when this entry created, the first attempt will happen. */ + private int currentAttempts = 1; + + private long backoffTimeMillis; + + private final AtomicBoolean retryInFlight = new AtomicBoolean(false); + + public RetryableResultHandlerDelegator( + StreamRecord<IN> inputRecord, + ResultFuture<OUT> resultFuture, + ProcessingTimeService processingTimeService) { + this.resultHandler = new ResultHandler(inputRecord, resultFuture); + this.processingTimeService = processingTimeService; + } + + public void registerTimeout(long timeout) { + resultHandler.registerTimeout(processingTimeService, timeout); + } + + @Override + public void complete(Collection<OUT> results) { Review Comment: Should we also have some checks to avoid users call complete wrongly? Like users call complete multiple times in a single attempt, like the same done in the `ResultFuture`. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AsyncDataStream.java: ########## @@ -163,4 +193,134 @@ public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait( return addOperator( in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.ORDERED); } + + // ======= retryable ======== + + /** + * Add an AsyncWaitOperator with an AsyncRetryStrategy to support retry of AsyncFunction. The + * order of output stream records may be reordered. + * + * @param in Input {@link DataStream} + * @param func {@link AsyncFunction} + * @param timeout from first invoke to final completion of asynchronous operation, may include + * multiple retries, and will be reset in case of restart + * @param timeUnit of the given timeout + * @param asyncRetryStrategy The strategy of reattempt async i/o operation that can be triggered + * @param <IN> Type of input record + * @param <OUT> Type of output record + * @return A new {@link SingleOutputStreamOperator}. + */ + public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWaitWithRetry( + DataStream<IN> in, + AsyncFunction<IN, OUT> func, + long timeout, + TimeUnit timeUnit, + AsyncRetryStrategy asyncRetryStrategy) { Review Comment: `AsyncRetryStrategy` -> `AsyncRetryStrategy<OUT>` The same to the following method parameters. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java: ########## @@ -318,6 +392,143 @@ private void outputCompletedElement() { } } + private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception { + // increment current attempt number + resultHandlerDelegator.incrementAttempts(); + + // fire a new attempt + userFunction.asyncInvoke(resultHandlerDelegator.getInputRecord(), resultHandlerDelegator); + } + + /** A delegator holds the real {@link ResultHandler} to handle retries. */ + private class RetryableResultHandlerDelegator implements ResultFuture<OUT> { + + private final ResultHandler resultHandler; + private final ProcessingTimeService processingTimeService; + + private ScheduledFuture<?> delayedRetryTimer; + + /** start from 1, when this entry created, the first attempt will happen. */ + private int currentAttempts = 1; + + private long backoffTimeMillis; + + private final AtomicBoolean retryInFlight = new AtomicBoolean(false); + + public RetryableResultHandlerDelegator( + StreamRecord<IN> inputRecord, + ResultFuture<OUT> resultFuture, + ProcessingTimeService processingTimeService) { + this.resultHandler = new ResultHandler(inputRecord, resultFuture); + this.processingTimeService = processingTimeService; + } + + public void registerTimeout(long timeout) { + resultHandler.registerTimeout(processingTimeService, timeout); + } + + @Override + public void complete(Collection<OUT> results) { + if (retryEnabled) { + mailboxExecutor.submit(() -> cleanupLastRetryInMailbox(), "cleanup last retry"); + if (!resultHandler.completed.get() && ifRetryOrCompleted(results, null)) { + return; + } + } + resultHandler.complete(results); + } + + @Override + public void completeExceptionally(Throwable error) { + if (retryEnabled) { + mailboxExecutor.submit(() -> cleanupLastRetryInMailbox(), "cleanup last retry"); + if (ifRetryOrCompleted(null, error)) { + return; + } + } + resultHandler.completeExceptionally(error); + } + + private void cleanupLastRetryInMailbox() { + if (retryInFlight.compareAndSet(true, false)) { + assert incompleteDelayRetryHandlers.contains(this); + // remove from delayed retry queue + incompleteDelayRetryHandlers.remove(this); + delayedRetryTimer = null; + } + } + + private boolean ifRetryOrCompleted(Collection<OUT> results, Throwable error) { Review Comment: Is it possible we also move `ifRetryOrCompleted` into the main thread ? This method also does not seems to have slow logic, if so perhaps we might be able to simplify logic like `delayedRetryAvailable`. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java: ########## @@ -197,11 +248,12 @@ public void processElement(StreamRecord<IN> record) throws Exception { // add element first to the queue final ResultFuture<OUT> entry = addToWorkQueue(element); - final ResultHandler resultHandler = new ResultHandler(element, entry); + final RetryableResultHandlerDelegator resultHandler = Review Comment: Should we directly classify the two cases according to `retryEnabled`? ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java: ########## @@ -318,6 +392,143 @@ private void outputCompletedElement() { } } + private void doRetry(RetryableResultHandlerDelegator resultHandlerDelegator) throws Exception { + // increment current attempt number + resultHandlerDelegator.incrementAttempts(); + + // fire a new attempt + userFunction.asyncInvoke(resultHandlerDelegator.getInputRecord(), resultHandlerDelegator); + } + + /** A delegator holds the real {@link ResultHandler} to handle retries. */ + private class RetryableResultHandlerDelegator implements ResultFuture<OUT> { + + private final ResultHandler resultHandler; + private final ProcessingTimeService processingTimeService; + + private ScheduledFuture<?> delayedRetryTimer; + + /** start from 1, when this entry created, the first attempt will happen. */ + private int currentAttempts = 1; + + private long backoffTimeMillis; + + private final AtomicBoolean retryInFlight = new AtomicBoolean(false); + + public RetryableResultHandlerDelegator( + StreamRecord<IN> inputRecord, + ResultFuture<OUT> resultFuture, + ProcessingTimeService processingTimeService) { + this.resultHandler = new ResultHandler(inputRecord, resultFuture); + this.processingTimeService = processingTimeService; + } + + public void registerTimeout(long timeout) { + resultHandler.registerTimeout(processingTimeService, timeout); + } + + @Override + public void complete(Collection<OUT> results) { + if (retryEnabled) { + mailboxExecutor.submit(() -> cleanupLastRetryInMailbox(), "cleanup last retry"); + if (!resultHandler.completed.get() && ifRetryOrCompleted(results, null)) { + return; + } + } + resultHandler.complete(results); + } + + @Override + public void completeExceptionally(Throwable error) { + if (retryEnabled) { + mailboxExecutor.submit(() -> cleanupLastRetryInMailbox(), "cleanup last retry"); + if (ifRetryOrCompleted(null, error)) { + return; + } + } + resultHandler.completeExceptionally(error); + } + + private void cleanupLastRetryInMailbox() { + if (retryInFlight.compareAndSet(true, false)) { + assert incompleteDelayRetryHandlers.contains(this); + // remove from delayed retry queue + incompleteDelayRetryHandlers.remove(this); + delayedRetryTimer = null; + } + } + + private boolean ifRetryOrCompleted(Collection<OUT> results, Throwable error) { + if (delayedRetryAvailable.get() && resultHandler.inputRecord.isRecord()) { + boolean satisfy = false; + if (null != results && null != retryResultPredicate) { + satisfy = (satisfy || retryResultPredicate.test(results)); + } + if (null != error && null != retryExceptionPredicate) { + satisfy = (satisfy || retryExceptionPredicate.test(error)); + } + if (satisfy) { + if (asyncRetryStrategy.canRetry(getCurrentAttempts())) { + long nextBackoffTimeMillis = + asyncRetryStrategy.getBackoffTimeMillis(getCurrentAttempts()); + setBackoffTimeMillis(nextBackoffTimeMillis); + if (delayedRetryAvailable.get()) { + // timer thread will finally dispatch the task to mailbox executor + mailboxExecutor.submit( + () -> trySubmitRetryInMailboxOrComplete(this, results, error), + "delayed retry or give up retry"); + return true; + } + } + } + } + return false; + } + + private void trySubmitRetryInMailboxOrComplete( + RetryableResultHandlerDelegator resultHandlerDelegator, + Collection<OUT> results, + Throwable error) { + if (delayedRetryAvailable.get()) { + incompleteDelayRetryHandlers.add(resultHandlerDelegator); + retryInFlight.set(true); + final long delayedRetry = + resultHandlerDelegator.getBackoffTimeMillis() + + getProcessingTimeService().getCurrentProcessingTime(); + delayedRetryTimer = + processingTimeService.registerTimer( + delayedRetry, timestamp -> doRetry(resultHandlerDelegator)); + } else { + // give up retry and complete immediately + if (null != results) { + resultHandler.complete(results); + } else { + resultHandler.completeExceptionally(error); + } + } + } + + public long getBackoffTimeMillis() { Review Comment: We might could remove some of the getter / setter since they are only used in this class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
