gaoyunhaii commented on code in PR #19464:
URL: https://github.com/apache/flink/pull/19464#discussion_r851886455


##########
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java:
##########
@@ -3688,6 +3693,92 @@ public void 
testTimeoutWhileCheckpointOperatorCoordinatorNotFinishing() throws E
         }
     }
 
+    @Test
+    public void testTimeoutWhileCheckpointOperatorCoordinatorNotFinishing1() 
throws Exception {
+        // Warn: The case is fragile since a specific order of executing the 
tasks is required to
+        // reproduce the issue.
+        JobVertexID jobVertexID = new JobVertexID();
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        ManuallyTriggeredScheduledExecutor 
manuallyTriggeredOperatorCoordinatorExecutor =
+                new ManuallyTriggeredScheduledExecutor();
+        final OperatorID operatorID = new OperatorID();
+        OperatorCoordinatorHolder context =
+                OperatorCoordinatorHolder.create(
+                        new SerializedValue<>(new 
TestingOperatorCoordinator.Provider(operatorID)),
+                        graph.getJobVertex(jobVertexID),
+                        getClass().getClassLoader(),
+                        new CoordinatorStoreImpl());
+        context.lazyInitialize(
+                ExceptionUtils::rethrow,
+                new ComponentMainThreadExecutorServiceAdapter(
+                        manuallyTriggeredOperatorCoordinatorExecutor, 
Thread.currentThread()));
+
+        ManuallyTriggeredScheduledExecutorService manuallyTriggeredIOExecutor =
+                new ManuallyTriggeredScheduledExecutorService();
+        CheckpointCoordinator checkpointCoordinator =
+                new CheckpointCoordinatorBuilder()
+                        .setCheckpointCoordinatorConfiguration(
+                                CheckpointCoordinatorConfiguration.builder()
+                                        .setCheckpointTimeout(10)
+                                        .build())
+                        .setIoExecutor(manuallyTriggeredIOExecutor)
+                        .setTimer(manuallyTriggeredScheduledExecutor)
+                        
.setCoordinatorsToCheckpoint(Collections.singleton(context))
+                        .build(graph);
+        try {
+            checkpointCoordinator.triggerCheckpoint(false);
+            // create checkpoint plan

Review Comment:
   I think here should be `get checkpoint id`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to