This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push:
new 94277e18746 [FLINK-32907][Tests] Fix
CheckpointAfterAllTasksFinishedITCase hangs on AZP
94277e18746 is described below
commit 94277e187464c56be1144a183dab0756d5f1800b
Author: JiangXin <[email protected]>
AuthorDate: Tue Aug 29 16:42:40 2023 +0800
[FLINK-32907][Tests] Fix CheckpointAfterAllTasksFinishedITCase hangs on AZP
This closes #23256.
---
.../checkpointing/CheckpointAfterAllTasksFinishedITCase.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java
index d61a48b9c38..442fe87ca15 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java
@@ -79,7 +79,7 @@ public class CheckpointAfterAllTasksFinishedITCase extends
AbstractTestBase {
@Test
public void testImmediateCheckpointing() throws Exception {
env.enableCheckpointing(Long.MAX_VALUE - 1);
- StreamGraph streamGraph = getStreamGraph(env);
+ StreamGraph streamGraph = getStreamGraph(env, false);
env.execute(streamGraph);
assertThat(smallResult.get().size()).isEqualTo(SMALL_SOURCE_NUM_RECORDS);
assertThat(bigResult.get().size()).isEqualTo(BIG_SOURCE_NUM_RECORDS);
@@ -98,7 +98,7 @@ public class CheckpointAfterAllTasksFinishedITCase extends
AbstractTestBase {
env.enableCheckpointing(100);
IntegerStreamSource.latch = new CountDownLatch(1);
- JobGraph jobGraph = getStreamGraph(env).getJobGraph();
+ JobGraph jobGraph = getStreamGraph(env, true).getJobGraph();
miniCluster.submitJob(jobGraph).get();
CommonTestUtils.waitForSubtasksToFinish(
@@ -118,7 +118,7 @@ public class CheckpointAfterAllTasksFinishedITCase extends
AbstractTestBase {
bigResult.get().clear();
env.enableCheckpointing(Long.MAX_VALUE - 1);
- JobGraph restoredJobGraph = getStreamGraph(env).getJobGraph();
+ JobGraph restoredJobGraph = getStreamGraph(env,
true).getJobGraph();
restoredJobGraph.setSavepointRestoreSettings(
SavepointRestoreSettings.forPath(savepointPath, false));
miniCluster.submitJob(restoredJobGraph).get();
@@ -131,13 +131,13 @@ public class CheckpointAfterAllTasksFinishedITCase
extends AbstractTestBase {
}
}
- private StreamGraph getStreamGraph(StreamExecutionEnvironment env) {
+ private StreamGraph getStreamGraph(StreamExecutionEnvironment env, boolean
block) {
env.addSource(new IntegerStreamSource(SMALL_SOURCE_NUM_RECORDS, false))
.transform("passA", Types.INT, new PassThroughOperator())
.addSink(new CollectSink(smallResult))
.name("sinkA");
- env.addSource(new IntegerStreamSource(BIG_SOURCE_NUM_RECORDS, true))
+ env.addSource(new IntegerStreamSource(BIG_SOURCE_NUM_RECORDS, block))
.transform("passB", Types.INT, new PassThroughOperator())
.addSink(new CollectSink(bigResult))
.name("sinkB");