This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 647b02f7dcc [FLINK-38082][datastream] Avoid unexpected retries when user async function invokes completeExceptionally 647b02f7dcc is described below commit 647b02f7dcc24849418e0c8629b12b0eacd45716 Author: lincoln lee <lincoln.8...@gmail.com> AuthorDate: Sun Jul 13 10:01:08 2025 +0800 [FLINK-38082][datastream] Avoid unexpected retries when user async function invokes completeExceptionally This closes #26779 --- .../api/operators/async/AsyncWaitOperator.java | 9 ++- .../api/operators/async/AsyncWaitOperatorTest.java | 74 +++++++++++++++++++++- 2 files changed, 79 insertions(+), 4 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java index af418de2df2..323fe4106f3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java @@ -436,6 +436,7 @@ public class AsyncWaitOperator<IN, OUT> private final ResultHandler resultHandler; private final ProcessingTimeService processingTimeService; + private final long startTs; private ScheduledFuture<?> delayedRetryTimer; @@ -455,6 +456,7 @@ public class AsyncWaitOperator<IN, OUT> ProcessingTimeService processingTimeService) { this.resultHandler = new ResultHandler(inputRecord, resultFuture); this.processingTimeService = processingTimeService; + this.startTs = processingTimeService.getCurrentProcessingTime(); } private void registerTimeout(long timeout) { @@ -532,6 +534,10 @@ public class AsyncWaitOperator<IN, OUT> () -> processRetry(results, error), "delayed retry or complete"); } + private boolean isTimeout() { + return processingTimeService.getCurrentProcessingTime() - startTs > timeout; + } + private void processRetry(Collection<OUT> results, Throwable error) { // ignore repeated call(s) and only called in main thread can be safe if (!retryAwaiting.compareAndSet(false, true)) { @@ -542,7 +548,8 @@ public class AsyncWaitOperator<IN, OUT> (null != results && retryResultPredicate.test(results)) || (null != error && retryExceptionPredicate.test(error)); - if (satisfy + if (!isTimeout() + && satisfy && asyncRetryStrategy.canRetry(currentAttempts) && !retryDisabledOnFinish.get()) { long nextBackoffTimeMillis = diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java index b56cd968156..6ed1fa364ea 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java @@ -118,12 +118,12 @@ public class AsyncWaitOperatorTest { @RegisterExtension private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create(); - private static AsyncRetryStrategy emptyResultFixedDelayRetryStrategy = + private static final AsyncRetryStrategy emptyResultFixedDelayRetryStrategy = new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(2, 10L) .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE) .build(); - private static AsyncRetryStrategy exceptionRetryStrategy = + private static final AsyncRetryStrategy exceptionRetryStrategy = new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(2, 10L) .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE) .build(); @@ -1333,6 +1333,49 @@ public class AsyncWaitOperatorTest { } } + @Test + void testProcessingTimeWithAlwaysTimeoutFunctionUnorderedWithRetry() throws Exception { + testProcessingTimeAlwaysTimeoutFunction(AsyncDataStream.OutputMode.UNORDERED); + } + + @Test + void testProcessingTimeWithAlwaysTimeoutFunctionOrderedWithRetry() throws Exception { + testProcessingTimeAlwaysTimeoutFunction(AsyncDataStream.OutputMode.ORDERED); + } + + private void testProcessingTimeAlwaysTimeoutFunction(AsyncDataStream.OutputMode mode) + throws Exception { + StreamTaskMailboxTestHarnessBuilder<Integer> builder = + new StreamTaskMailboxTestHarnessBuilder<>( + OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO) + .addInput(BasicTypeInfo.INT_TYPE_INFO); + + AlwaysTimeoutAsyncFunction asyncFunction = new AlwaysTimeoutAsyncFunction(); + try (StreamTaskMailboxTestHarness<Integer> testHarness = + builder.setupOutputForSingletonOperatorChain( + new AsyncWaitOperatorFactory<Integer, Integer>( + asyncFunction, TIMEOUT, 10, mode, exceptionRetryStrategy)) + .build()) { + + final long initialTime = 0L; + AtomicReference<Throwable> error = new AtomicReference<>(); + testHarness.getStreamMockEnvironment().setExternalExceptionHandler(error::set); + + try { + testHarness.processElement(new StreamRecord<>(1, initialTime + 1)); + testHarness.processElement(new StreamRecord<>(2, initialTime + 2)); + while (error.get() == null) { + testHarness.processAll(); + } + } catch (Exception e) { + error.set(e); + } + ExceptionUtils.assertThrowableWithMessage(error.get(), "Dummy timeout error"); + // verify the 1st element's try count is exactly 1 + assertThat(asyncFunction.getTryCount(1)).isEqualTo(1); + } + } + @Test public void testProcessingTimeWithMailboxThreadOrdered() throws Exception { testProcessingTimeWithCollectFromMailboxThread( @@ -1525,7 +1568,7 @@ public class AsyncWaitOperatorTest { private static final long serialVersionUID = 1L; - private static Map<Integer, Integer> tryCounts = new HashMap<>(); + protected static Map<Integer, Integer> tryCounts = new HashMap<>(); @VisibleForTesting public int getTryCount(Integer item) { @@ -1560,6 +1603,31 @@ public class AsyncWaitOperatorTest { } } + private static class AlwaysTimeoutAsyncFunction + extends AlwaysTimeoutWithDefaultValueAsyncFunction { + + private final transient CountDownLatch latch = new CountDownLatch(1); + + @Override + public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) { + tryCounts.merge(input, 1, Integer::sum); + CompletableFuture.runAsync( + () -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public void timeout(Integer input, ResultFuture<Integer> resultFuture) { + // simulate the case reported in https://issues.apache.org/jira/browse/FLINK-38082 + resultFuture.completeExceptionally(new TimeoutException("Dummy timeout error")); + } + } + private static class CallThreadAsyncFunction extends MyAbstractAsyncFunction<Integer> { private static final long serialVersionUID = -1504699677704123889L;