This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a1d74c131b0 [FLINK-29038][runtime] Fix unstable case
AsyncWaitOperatorTest#testProcessingTimeRepeatedCompleteOrderedWithRetry
a1d74c131b0 is described below
commit a1d74c131b0fd10f34436463077fd5c7a7984a2b
Author: lincoln.lil <[email protected]>
AuthorDate: Mon Aug 29 17:27:34 2022 +0800
[FLINK-29038][runtime] Fix unstable case
AsyncWaitOperatorTest#testProcessingTimeRepeatedCompleteOrderedWithRetry
This closes #20702.
---
.../api/operators/async/AsyncWaitOperatorTest.java | 20 +++++++++-----------
1 file changed, 9 insertions(+), 11 deletions(-)
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index dfa1624b30b..dd81885d277 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.operators.async;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -71,6 +72,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
+import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
@@ -1233,22 +1235,18 @@ public class AsyncWaitOperatorTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(5, initialTime + 5));
testHarness.processElement(new StreamRecord<>(6, initialTime + 6));
- ScheduledFuture<?> testTimer =
- testHarness
- .getTimerService()
- .registerTimer(
-
testHarness.getTimerService().getCurrentProcessingTime()
- + TIMEOUT,
- ts -> {});
-
expectedOutput.add(new StreamRecord<>(4, initialTime + 2));
expectedOutput.add(new StreamRecord<>(8, initialTime + 4));
expectedOutput.add(new StreamRecord<>(12, initialTime + 6));
- // wait until all timers have been processed
- testTimer.get();
+ Deadline deadline = Deadline.fromNow(Duration.ofSeconds(10));
+ while (testHarness.getOutput().size() < expectedOutput.size()
+ && deadline.hasTimeLeft()) {
+ testHarness.processAll();
+ //noinspection BusyWait
+ Thread.sleep(100);
+ }
- testHarness.processAll();
if (mode == AsyncDataStream.OutputMode.ORDERED) {
TestHarnessUtil.assertOutputEquals(
"ORDERED Output was not correct.", expectedOutput,
testHarness.getOutput());