This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a1d74c131b0 [FLINK-29038][runtime] Fix unstable case 
AsyncWaitOperatorTest#testProcessingTimeRepeatedCompleteOrderedWithRetry
a1d74c131b0 is described below

commit a1d74c131b0fd10f34436463077fd5c7a7984a2b
Author: lincoln.lil <[email protected]>
AuthorDate: Mon Aug 29 17:27:34 2022 +0800

    [FLINK-29038][runtime] Fix unstable case 
AsyncWaitOperatorTest#testProcessingTimeRepeatedCompleteOrderedWithRetry
    
    This closes #20702.
---
 .../api/operators/async/AsyncWaitOperatorTest.java   | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index dfa1624b30b..dd81885d277 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.operators.async;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -71,6 +72,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
+import java.time.Duration;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -1233,22 +1235,18 @@ public class AsyncWaitOperatorTest extends TestLogger {
             testHarness.processElement(new StreamRecord<>(5, initialTime + 5));
             testHarness.processElement(new StreamRecord<>(6, initialTime + 6));
 
-            ScheduledFuture<?> testTimer =
-                    testHarness
-                            .getTimerService()
-                            .registerTimer(
-                                    
testHarness.getTimerService().getCurrentProcessingTime()
-                                            + TIMEOUT,
-                                    ts -> {});
-
             expectedOutput.add(new StreamRecord<>(4, initialTime + 2));
             expectedOutput.add(new StreamRecord<>(8, initialTime + 4));
             expectedOutput.add(new StreamRecord<>(12, initialTime + 6));
 
-            // wait until all timers have been processed
-            testTimer.get();
+            Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
+            while (testHarness.getOutput().size() < expectedOutput.size()
+                    && deadline.hasTimeLeft()) {
+                testHarness.processAll();
+                //noinspection BusyWait
+                Thread.sleep(100);
+            }
 
-            testHarness.processAll();
             if (mode == AsyncDataStream.OutputMode.ORDERED) {
                 TestHarnessUtil.assertOutputEquals(
                         "ORDERED Output was not correct.", expectedOutput, 
testHarness.getOutput());

Reply via email to