This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
     new 94277e18746 [FLINK-32907][Tests] Fix 
CheckpointAfterAllTasksFinishedITCase hangs on AZP
94277e18746 is described below

commit 94277e187464c56be1144a183dab0756d5f1800b
Author: JiangXin <[email protected]>
AuthorDate: Tue Aug 29 16:42:40 2023 +0800

    [FLINK-32907][Tests] Fix CheckpointAfterAllTasksFinishedITCase hangs on AZP
    
    This closes #23256.
---
 .../checkpointing/CheckpointAfterAllTasksFinishedITCase.java   | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java
index d61a48b9c38..442fe87ca15 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java
@@ -79,7 +79,7 @@ public class CheckpointAfterAllTasksFinishedITCase extends 
AbstractTestBase {
     @Test
     public void testImmediateCheckpointing() throws Exception {
         env.enableCheckpointing(Long.MAX_VALUE - 1);
-        StreamGraph streamGraph = getStreamGraph(env);
+        StreamGraph streamGraph = getStreamGraph(env, false);
         env.execute(streamGraph);
         
assertThat(smallResult.get().size()).isEqualTo(SMALL_SOURCE_NUM_RECORDS);
         assertThat(bigResult.get().size()).isEqualTo(BIG_SOURCE_NUM_RECORDS);
@@ -98,7 +98,7 @@ public class CheckpointAfterAllTasksFinishedITCase extends 
AbstractTestBase {
 
             env.enableCheckpointing(100);
             IntegerStreamSource.latch = new CountDownLatch(1);
-            JobGraph jobGraph = getStreamGraph(env).getJobGraph();
+            JobGraph jobGraph = getStreamGraph(env, true).getJobGraph();
             miniCluster.submitJob(jobGraph).get();
 
             CommonTestUtils.waitForSubtasksToFinish(
@@ -118,7 +118,7 @@ public class CheckpointAfterAllTasksFinishedITCase extends 
AbstractTestBase {
             bigResult.get().clear();
 
             env.enableCheckpointing(Long.MAX_VALUE - 1);
-            JobGraph restoredJobGraph = getStreamGraph(env).getJobGraph();
+            JobGraph restoredJobGraph = getStreamGraph(env, 
true).getJobGraph();
             restoredJobGraph.setSavepointRestoreSettings(
                     SavepointRestoreSettings.forPath(savepointPath, false));
             miniCluster.submitJob(restoredJobGraph).get();
@@ -131,13 +131,13 @@ public class CheckpointAfterAllTasksFinishedITCase 
extends AbstractTestBase {
         }
     }
 
-    private StreamGraph getStreamGraph(StreamExecutionEnvironment env) {
+    private StreamGraph getStreamGraph(StreamExecutionEnvironment env, boolean 
block) {
         env.addSource(new IntegerStreamSource(SMALL_SOURCE_NUM_RECORDS, false))
                 .transform("passA", Types.INT, new PassThroughOperator())
                 .addSink(new CollectSink(smallResult))
                 .name("sinkA");
 
-        env.addSource(new IntegerStreamSource(BIG_SOURCE_NUM_RECORDS, true))
+        env.addSource(new IntegerStreamSource(BIG_SOURCE_NUM_RECORDS, block))
                 .transform("passB", Types.INT, new PassThroughOperator())
                 .addSink(new CollectSink(bigResult))
                 .name("sinkB");

Reply via email to