lindong28 commented on code in PR #23331:
URL: https://github.com/apache/flink/pull/23331#discussion_r1312867815


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java:
##########
@@ -129,40 +131,56 @@ public void testRestoreAfterSomeTasksFinished() throws 
Exception {
             miniCluster.requestJobResult(restoredJobGraph.getJobID()).get();
             
assertThat(smallResult.get().size()).isEqualTo(SMALL_SOURCE_NUM_RECORDS);
             
assertThat(bigResult.get().size()).isEqualTo(BIG_SOURCE_NUM_RECORDS);
-        } finally {
-            IntegerStreamSource.latch = null;
         }
     }
 
+    /**
+     * Tests the behaviors of the following two scenarios at which the 
subtasks that have finished
+     * should be restored to {@link 
org.apache.flink.runtime.scheduler.VertexEndOfDataListener}.
+     *
+     * <p>Some subtasks in the job have reached the end of data but failed due 
to their final
+     * checkpoint throwing exceptions.
+     *
+     * <p>Some pipelines of the job have been finished but the next checkpoint 
is not triggered, at
+     * this time a full-strategy failover happens.
+     */
     @Test
     public void testFailoverAfterSomeTasksFinished() throws Exception {
+        final Configuration config = new Configuration();
+        config.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, 
"full");
+
         final MiniClusterConfiguration cfg =
                 new MiniClusterConfiguration.Builder()
                         .withRandomPorts()
                         .setNumTaskManagers(1)
                         .setNumSlotsPerTaskManager(4)
+                        .setConfiguration(config)
                         .build();
         try (MiniCluster miniCluster = new MiniCluster(cfg)) {
             miniCluster.start();
 
             env.enableCheckpointing(100);
-            IntegerStreamSource.latch = new CountDownLatch(1);
             JobGraph jobGraph = getStreamGraph(env, true, true).getJobGraph();
             miniCluster.submitJob(jobGraph).get();
 
             CommonTestUtils.waitForSubtasksToFinish(
                     miniCluster,
                     jobGraph.getJobID(),
                     findVertexByName(jobGraph, "passA -> Sink: sinkA").getID(),
-                    false);
+                    true);
             bigResult.get().clear();
             IntegerStreamSource.latch.countDown();
 
             miniCluster.requestJobResult(jobGraph.getJobID()).get();
-            
assertThat(smallResult.get().size()).isEqualTo(SMALL_SOURCE_NUM_RECORDS);
+
+            // We are expecting the source to be finished and then restart 
during failover, so the
+            // sink should receive records as much as double 
SMALL_SOURCE_NUM_RECORDS.
+            // However, in a few cases, a checkpoint happens to be triggered 
before failover, and
+            // the source would not restart so the sink will only receive 
SMALL_SOURCE_NUM_RECORDS
+            // records.
+            assertThat(smallResult.get().size())

Review Comment:
   It would be better to make the test result deterministic by either using a 
source that supports checkpoint, or de-duplicating the result via HashSet() 
etc. That would allow the test to mimic what users actually do in production 
jobs.
   
   That being said, I would merge this PR since it fixes flaky test and the 
issue described above does not affect production code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to