This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 647b02f7dcc [FLINK-38082][datastream] Avoid unexpected retries when 
user async function invokes completeExceptionally
647b02f7dcc is described below

commit 647b02f7dcc24849418e0c8629b12b0eacd45716
Author: lincoln lee <lincoln.8...@gmail.com>
AuthorDate: Sun Jul 13 10:01:08 2025 +0800

    [FLINK-38082][datastream] Avoid unexpected retries when user async function 
invokes completeExceptionally
    
    This closes #26779
---
 .../api/operators/async/AsyncWaitOperator.java     |  9 ++-
 .../api/operators/async/AsyncWaitOperatorTest.java | 74 +++++++++++++++++++++-
 2 files changed, 79 insertions(+), 4 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index af418de2df2..323fe4106f3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -436,6 +436,7 @@ public class AsyncWaitOperator<IN, OUT>
 
         private final ResultHandler resultHandler;
         private final ProcessingTimeService processingTimeService;
+        private final long startTs;
 
         private ScheduledFuture<?> delayedRetryTimer;
 
@@ -455,6 +456,7 @@ public class AsyncWaitOperator<IN, OUT>
                 ProcessingTimeService processingTimeService) {
             this.resultHandler = new ResultHandler(inputRecord, resultFuture);
             this.processingTimeService = processingTimeService;
+            this.startTs = processingTimeService.getCurrentProcessingTime();
         }
 
         private void registerTimeout(long timeout) {
@@ -532,6 +534,10 @@ public class AsyncWaitOperator<IN, OUT>
                     () -> processRetry(results, error), "delayed retry or 
complete");
         }
 
+        private boolean isTimeout() {
+            return processingTimeService.getCurrentProcessingTime() - startTs 
> timeout;
+        }
+
         private void processRetry(Collection<OUT> results, Throwable error) {
             // ignore repeated call(s) and only called in main thread can be 
safe
             if (!retryAwaiting.compareAndSet(false, true)) {
@@ -542,7 +548,8 @@ public class AsyncWaitOperator<IN, OUT>
                     (null != results && retryResultPredicate.test(results))
                             || (null != error && 
retryExceptionPredicate.test(error));
 
-            if (satisfy
+            if (!isTimeout()
+                    && satisfy
                     && asyncRetryStrategy.canRetry(currentAttempts)
                     && !retryDisabledOnFinish.get()) {
                 long nextBackoffTimeMillis =
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index b56cd968156..6ed1fa364ea 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -118,12 +118,12 @@ public class AsyncWaitOperatorTest {
     @RegisterExtension
     private final SharedObjectsExtension sharedObjects = 
SharedObjectsExtension.create();
 
-    private static AsyncRetryStrategy emptyResultFixedDelayRetryStrategy =
+    private static final AsyncRetryStrategy emptyResultFixedDelayRetryStrategy 
=
             new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(2, 10L)
                     .ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
                     .build();
 
-    private static AsyncRetryStrategy exceptionRetryStrategy =
+    private static final AsyncRetryStrategy exceptionRetryStrategy =
             new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(2, 10L)
                     .ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
                     .build();
@@ -1333,6 +1333,49 @@ public class AsyncWaitOperatorTest {
         }
     }
 
+    @Test
+    void testProcessingTimeWithAlwaysTimeoutFunctionUnorderedWithRetry() 
throws Exception {
+        
testProcessingTimeAlwaysTimeoutFunction(AsyncDataStream.OutputMode.UNORDERED);
+    }
+
+    @Test
+    void testProcessingTimeWithAlwaysTimeoutFunctionOrderedWithRetry() throws 
Exception {
+        
testProcessingTimeAlwaysTimeoutFunction(AsyncDataStream.OutputMode.ORDERED);
+    }
+
+    private void 
testProcessingTimeAlwaysTimeoutFunction(AsyncDataStream.OutputMode mode)
+            throws Exception {
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO);
+
+        AlwaysTimeoutAsyncFunction asyncFunction = new 
AlwaysTimeoutAsyncFunction();
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                builder.setupOutputForSingletonOperatorChain(
+                                new AsyncWaitOperatorFactory<Integer, Integer>(
+                                        asyncFunction, TIMEOUT, 10, mode, 
exceptionRetryStrategy))
+                        .build()) {
+
+            final long initialTime = 0L;
+            AtomicReference<Throwable> error = new AtomicReference<>();
+            
testHarness.getStreamMockEnvironment().setExternalExceptionHandler(error::set);
+
+            try {
+                testHarness.processElement(new StreamRecord<>(1, initialTime + 
1));
+                testHarness.processElement(new StreamRecord<>(2, initialTime + 
2));
+                while (error.get() == null) {
+                    testHarness.processAll();
+                }
+            } catch (Exception e) {
+                error.set(e);
+            }
+            ExceptionUtils.assertThrowableWithMessage(error.get(), "Dummy 
timeout error");
+            // verify the 1st element's try count is exactly 1
+            assertThat(asyncFunction.getTryCount(1)).isEqualTo(1);
+        }
+    }
+
     @Test
     public void testProcessingTimeWithMailboxThreadOrdered() throws Exception {
         testProcessingTimeWithCollectFromMailboxThread(
@@ -1525,7 +1568,7 @@ public class AsyncWaitOperatorTest {
 
         private static final long serialVersionUID = 1L;
 
-        private static Map<Integer, Integer> tryCounts = new HashMap<>();
+        protected static Map<Integer, Integer> tryCounts = new HashMap<>();
 
         @VisibleForTesting
         public int getTryCount(Integer item) {
@@ -1560,6 +1603,31 @@ public class AsyncWaitOperatorTest {
         }
     }
 
+    private static class AlwaysTimeoutAsyncFunction
+            extends AlwaysTimeoutWithDefaultValueAsyncFunction {
+
+        private final transient CountDownLatch latch = new CountDownLatch(1);
+
+        @Override
+        public void asyncInvoke(Integer input, ResultFuture<Integer> 
resultFuture) {
+            tryCounts.merge(input, 1, Integer::sum);
+            CompletableFuture.runAsync(
+                    () -> {
+                        try {
+                            latch.await();
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    });
+        }
+
+        @Override
+        public void timeout(Integer input, ResultFuture<Integer> resultFuture) 
{
+            // simulate the case reported in 
https://issues.apache.org/jira/browse/FLINK-38082
+            resultFuture.completeExceptionally(new TimeoutException("Dummy 
timeout error"));
+        }
+    }
+
     private static class CallThreadAsyncFunction extends 
MyAbstractAsyncFunction<Integer> {
         private static final long serialVersionUID = -1504699677704123889L;
 

Reply via email to