dawidwys commented on a change in pull request #17968:
URL: https://github.com/apache/flink/pull/17968#discussion_r763706426
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
##########
@@ -483,130 +496,6 @@ public void acknowledgeCheckpoint(
}
}
- @Test
- public void
testTriggerStopWithSavepointNoDrainWhenWaitingForFinalCheckpointOnSourceTask()
- throws Exception {
- int finalCheckpointId = 6;
- int syncSavepointId = 7;
- CompletingCheckpointResponder checkpointResponder =
- new CompletingCheckpointResponder() {
-
- private CheckpointMetrics metrics;
- private TaskStateSnapshot stateSnapshot;
-
- @Override
- public void acknowledgeCheckpoint(
- JobID jobID,
- ExecutionAttemptID executionAttemptID,
- long checkpointId,
- CheckpointMetrics checkpointMetrics,
- TaskStateSnapshot subtaskState) {
- // do not acknowledge any checkpoints straightaway
- if (checkpointId == finalCheckpointId) {
- metrics = checkpointMetrics;
- stateSnapshot = subtaskState;
- }
- }
-
- @Override
- public void declineCheckpoint(
- JobID jobID,
- ExecutionAttemptID executionAttemptID,
- long checkpointId,
- CheckpointException checkpointException) {
- // acknowledge the last pending checkpoint once the
synchronous savepoint is
- // declined.
- if (syncSavepointId == checkpointId) {
- super.acknowledgeCheckpoint(
- jobID,
- executionAttemptID,
- finalCheckpointId,
- metrics,
- stateSnapshot);
- }
- }
- };
-
- try (StreamTaskMailboxTestHarness<String> testHarness =
- new
StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, STRING_TYPE_INFO)
- .modifyStreamConfig(
- config -> {
- config.setCheckpointingEnabled(true);
- config.getConfiguration()
- .set(
-
ExecutionCheckpointingOptions
-
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
- true);
- })
- .setCheckpointResponder(checkpointResponder)
- .setupOutputForSingletonOperatorChain(
- new StreamSource<>(new
ImmediatelyFinishingSource()))
- .build()) {
- checkpointResponder.setHandlers(
- testHarness.streamTask::notifyCheckpointCompleteAsync,
- testHarness.streamTask::notifyCheckpointAbortAsync);
-
- // start task thread
- testHarness.streamTask.runMailboxLoop();
-
- // trigger the final checkpoint
- CompletableFuture<Boolean> checkpointFuture =
- triggerCheckpoint(testHarness, finalCheckpointId);
-
- // trigger the synchronous savepoint w/o drain, which should be
declined
- CompletableFuture<Boolean> savepointFuture =
- triggerStopWithSavepointNoDrain(testHarness,
syncSavepointId);
-
- // The checkpoint 6 would be triggered successfully.
- testHarness.finishProcessing();
- assertTrue(checkpointFuture.isDone());
- assertTrue(savepointFuture.isDone());
- assertFalse(savepointFuture.get());
- testHarness.getTaskStateManager().getWaitForReportLatch().await();
- assertEquals(
- finalCheckpointId,
testHarness.getTaskStateManager().getReportedCheckpointId());
- assertEquals(
- finalCheckpointId,
-
testHarness.getTaskStateManager().getNotifiedCompletedCheckpointId());
- }
- }
-
- @Test
- public void
testTriggerSourceFinishesWhileStoppingWithSavepointWithoutDrain() throws
Exception {
Review comment:
test removal explanation: This is no longer possible, we stop sources
when triggering synchronous checkpoints.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]