dawidwys commented on a change in pull request #17968:
URL: https://github.com/apache/flink/pull/17968#discussion_r763706426



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
##########
@@ -483,130 +496,6 @@ public void acknowledgeCheckpoint(
         }
     }
 
-    @Test
-    public void 
testTriggerStopWithSavepointNoDrainWhenWaitingForFinalCheckpointOnSourceTask()
-            throws Exception {
-        int finalCheckpointId = 6;
-        int syncSavepointId = 7;
-        CompletingCheckpointResponder checkpointResponder =
-                new CompletingCheckpointResponder() {
-
-                    private CheckpointMetrics metrics;
-                    private TaskStateSnapshot stateSnapshot;
-
-                    @Override
-                    public void acknowledgeCheckpoint(
-                            JobID jobID,
-                            ExecutionAttemptID executionAttemptID,
-                            long checkpointId,
-                            CheckpointMetrics checkpointMetrics,
-                            TaskStateSnapshot subtaskState) {
-                        // do not acknowledge any checkpoints straightaway
-                        if (checkpointId == finalCheckpointId) {
-                            metrics = checkpointMetrics;
-                            stateSnapshot = subtaskState;
-                        }
-                    }
-
-                    @Override
-                    public void declineCheckpoint(
-                            JobID jobID,
-                            ExecutionAttemptID executionAttemptID,
-                            long checkpointId,
-                            CheckpointException checkpointException) {
-                        // acknowledge the last pending checkpoint once the 
synchronous savepoint is
-                        // declined.
-                        if (syncSavepointId == checkpointId) {
-                            super.acknowledgeCheckpoint(
-                                    jobID,
-                                    executionAttemptID,
-                                    finalCheckpointId,
-                                    metrics,
-                                    stateSnapshot);
-                        }
-                    }
-                };
-
-        try (StreamTaskMailboxTestHarness<String> testHarness =
-                new 
StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, STRING_TYPE_INFO)
-                        .modifyStreamConfig(
-                                config -> {
-                                    config.setCheckpointingEnabled(true);
-                                    config.getConfiguration()
-                                            .set(
-                                                    
ExecutionCheckpointingOptions
-                                                            
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
-                                                    true);
-                                })
-                        .setCheckpointResponder(checkpointResponder)
-                        .setupOutputForSingletonOperatorChain(
-                                new StreamSource<>(new 
ImmediatelyFinishingSource()))
-                        .build()) {
-            checkpointResponder.setHandlers(
-                    testHarness.streamTask::notifyCheckpointCompleteAsync,
-                    testHarness.streamTask::notifyCheckpointAbortAsync);
-
-            // start task thread
-            testHarness.streamTask.runMailboxLoop();
-
-            // trigger the final checkpoint
-            CompletableFuture<Boolean> checkpointFuture =
-                    triggerCheckpoint(testHarness, finalCheckpointId);
-
-            // trigger the synchronous savepoint w/o drain, which should be 
declined
-            CompletableFuture<Boolean> savepointFuture =
-                    triggerStopWithSavepointNoDrain(testHarness, 
syncSavepointId);
-
-            // The checkpoint 6 would be triggered successfully.
-            testHarness.finishProcessing();
-            assertTrue(checkpointFuture.isDone());
-            assertTrue(savepointFuture.isDone());
-            assertFalse(savepointFuture.get());
-            testHarness.getTaskStateManager().getWaitForReportLatch().await();
-            assertEquals(
-                    finalCheckpointId, 
testHarness.getTaskStateManager().getReportedCheckpointId());
-            assertEquals(
-                    finalCheckpointId,
-                    
testHarness.getTaskStateManager().getNotifiedCompletedCheckpointId());
-        }
-    }
-
-    @Test
-    public void 
testTriggerSourceFinishesWhileStoppingWithSavepointWithoutDrain() throws 
Exception {

Review comment:
       test removal explanation: This is no longer possible, we stop sources 
when triggering synchronous checkpoints.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to