AlanConfluent commented on code in PR #26779: URL: https://github.com/apache/flink/pull/26779#discussion_r2198899881
########## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java: ########## @@ -1333,6 +1333,50 @@ private void testProcessingTimeAlwaysTimeoutFunctionWithRetry(AsyncDataStream.Ou } } + @Test + void testProcessingTimeWithAlwaysTimeoutFunctionUnorderedWithRetry() throws Exception { + testProcessingTimeAlwaysTimeoutFunction(AsyncDataStream.OutputMode.UNORDERED); + } + + @Test + void testProcessingTimeWithAlwaysTimeoutFunctionOrderedWithRetry() throws Exception { + testProcessingTimeAlwaysTimeoutFunction(AsyncDataStream.OutputMode.ORDERED); + } + + private void testProcessingTimeAlwaysTimeoutFunction(AsyncDataStream.OutputMode mode) + throws Exception { + StreamTaskMailboxTestHarnessBuilder<Integer> builder = + new StreamTaskMailboxTestHarnessBuilder<>( + OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO) + .addInput(BasicTypeInfo.INT_TYPE_INFO); + + try (StreamTaskMailboxTestHarness<Integer> testHarness = + builder.setupOutputForSingletonOperatorChain( + new AsyncWaitOperatorFactory<Integer, Integer>( + new AlwaysTimeoutAsyncFunction(), + TIMEOUT, + 10, + mode, + exceptionRetryStrategy)) + .build()) { + + final long initialTime = 0L; + AtomicReference<Throwable> error = new AtomicReference<>(); + testHarness.getStreamMockEnvironment().setExternalExceptionHandler(error::set); + + try { + testHarness.processElement(new StreamRecord<>(1, initialTime + 1)); + testHarness.processElement(new StreamRecord<>(2, initialTime + 2)); + while (error.get() == null) { + testHarness.processAll(); + } + } catch (Exception e) { + error.set(e); + } + ExceptionUtils.assertThrowableWithMessage(error.get(), "Dummy timeout error"); Review Comment: If you counted the number of invocations, you could verify that it only ever tries once, even after the timeout exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org