lindong28 commented on code in PR #23331:
URL: https://github.com/apache/flink/pull/23331#discussion_r1312867815
##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointAfterAllTasksFinishedITCase.java:
##########
@@ -129,40 +131,56 @@ public void testRestoreAfterSomeTasksFinished() throws
Exception {
miniCluster.requestJobResult(restoredJobGraph.getJobID()).get();
assertThat(smallResult.get().size()).isEqualTo(SMALL_SOURCE_NUM_RECORDS);
assertThat(bigResult.get().size()).isEqualTo(BIG_SOURCE_NUM_RECORDS);
- } finally {
- IntegerStreamSource.latch = null;
}
}
+ /**
+ * Tests the behaviors of the following two scenarios at which the
subtasks that have finished
+ * should be restored to {@link
org.apache.flink.runtime.scheduler.VertexEndOfDataListener}.
+ *
+ * <p>Some subtasks in the job have reached the end of data but failed due
to their final
+ * checkpoint throwing exceptions.
+ *
+ * <p>Some pipelines of the job have been finished but the next checkpoint
is not triggered, at
+ * this time a full-strategy failover happens.
+ */
@Test
public void testFailoverAfterSomeTasksFinished() throws Exception {
+ final Configuration config = new Configuration();
+ config.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY,
"full");
+
final MiniClusterConfiguration cfg =
new MiniClusterConfiguration.Builder()
.withRandomPorts()
.setNumTaskManagers(1)
.setNumSlotsPerTaskManager(4)
+ .setConfiguration(config)
.build();
try (MiniCluster miniCluster = new MiniCluster(cfg)) {
miniCluster.start();
env.enableCheckpointing(100);
- IntegerStreamSource.latch = new CountDownLatch(1);
JobGraph jobGraph = getStreamGraph(env, true, true).getJobGraph();
miniCluster.submitJob(jobGraph).get();
CommonTestUtils.waitForSubtasksToFinish(
miniCluster,
jobGraph.getJobID(),
findVertexByName(jobGraph, "passA -> Sink: sinkA").getID(),
- false);
+ true);
bigResult.get().clear();
IntegerStreamSource.latch.countDown();
miniCluster.requestJobResult(jobGraph.getJobID()).get();
-
assertThat(smallResult.get().size()).isEqualTo(SMALL_SOURCE_NUM_RECORDS);
+
+ // We are expecting the source to be finished and then restart
during failover, so the
+ // sink should receive records as much as double
SMALL_SOURCE_NUM_RECORDS.
+ // However, in a few cases, a checkpoint happens to be triggered
before failover, and
+ // the source would not restart so the sink will only receive
SMALL_SOURCE_NUM_RECORDS
+ // records.
+ assertThat(smallResult.get().size())
Review Comment:
It would be better to make the test result deterministic by either using a
source that supports checkpoint, or de-duplicating the result via HashSet()
etc. That would allow the test to mimic what users actually do in production
jobs.
That being said, I would merge this PR since it fixes flaky test and the
issue described above does not affect production code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]