This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3a2fc5ef34f563c906473cbe4bdd79a9d7eec48e Author: lincoln lee <[email protected]> AuthorDate: Tue Aug 9 17:53:04 2022 +0800 [hotfix][runtime] Do last attempt without successfully canceling the retry timer to prevent unexpected incomplete element during finish phase in AsyncWaitOperator It is hard to reproduce this in runtime tests, but occasionally happens in AsyncLookupJoinITCase#testAsyncJoinTemporalTableWithLookupThresholdWithSufficientRetry of FLINK-28849. It's better to add a separate test in runtime. This closes #20482 --- .../flink/streaming/api/operators/async/AsyncWaitOperator.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java index ba3f1c3ad87..0d88943b21e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java @@ -359,10 +359,10 @@ public class AsyncWaitOperator<IN, OUT> if (inFlightDelayRetryHandlers.size() > 0) { for (RetryableResultHandlerDelegator delegator : inFlightDelayRetryHandlers) { assert delegator.delayedRetryTimer != null; - // cancel retry timer, cancel failure means retry action already being executed - if (delegator.delayedRetryTimer.cancel(true)) { - tryOnce(delegator); - } + // fire an attempt intermediately not rely on successfully canceling the retry + // timer for two reasons: 1. cancel retry timer can not be 100% safe 2. there's + // protection for repeated retries + tryOnce(delegator); } inFlightDelayRetryHandlers.clear(); }
