AlanConfluent commented on code in PR #26779:
URL: https://github.com/apache/flink/pull/26779#discussion_r2198899881


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java:
##########
@@ -1333,6 +1333,50 @@ private void 
testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncDataStream.Ou
         }
     }
 
+    @Test
+    void testProcessingTimeWithAlwaysTimeoutFunctionUnorderedWithRetry() 
throws Exception {
+        
testProcessingTimeAlwaysTimeoutFunction(AsyncDataStream.OutputMode.UNORDERED);
+    }
+
+    @Test
+    void testProcessingTimeWithAlwaysTimeoutFunctionOrderedWithRetry() throws 
Exception {
+        
testProcessingTimeAlwaysTimeoutFunction(AsyncDataStream.OutputMode.ORDERED);
+    }
+
+    private void 
testProcessingTimeAlwaysTimeoutFunction(AsyncDataStream.OutputMode mode)
+            throws Exception {
+        StreamTaskMailboxTestHarnessBuilder<Integer> builder =
+                new StreamTaskMailboxTestHarnessBuilder<>(
+                                OneInputStreamTask::new, 
BasicTypeInfo.INT_TYPE_INFO)
+                        .addInput(BasicTypeInfo.INT_TYPE_INFO);
+
+        try (StreamTaskMailboxTestHarness<Integer> testHarness =
+                builder.setupOutputForSingletonOperatorChain(
+                                new AsyncWaitOperatorFactory<Integer, Integer>(
+                                        new AlwaysTimeoutAsyncFunction(),
+                                        TIMEOUT,
+                                        10,
+                                        mode,
+                                        exceptionRetryStrategy))
+                        .build()) {
+
+            final long initialTime = 0L;
+            AtomicReference<Throwable> error = new AtomicReference<>();
+            
testHarness.getStreamMockEnvironment().setExternalExceptionHandler(error::set);
+
+            try {
+                testHarness.processElement(new StreamRecord<>(1, initialTime + 
1));
+                testHarness.processElement(new StreamRecord<>(2, initialTime + 
2));
+                while (error.get() == null) {
+                    testHarness.processAll();
+                }
+            } catch (Exception e) {
+                error.set(e);
+            }
+            ExceptionUtils.assertThrowableWithMessage(error.get(), "Dummy 
timeout error");

Review Comment:
   If you counted the number of invocations, you could verify that it only ever 
tries once, even after the timeout exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to