lindong28 commented on code in PR #22670:
URL: https://github.com/apache/flink/pull/22670#discussion_r1243174207


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ImmediateCheckpointingAfterAllTasksFinishedITCase.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSink;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/** Tests an immediate checkpoint should be triggered right after all tasks 
finished. */
+@RunWith(Parameterized.class)
+public class ImmediateCheckpointingAfterAllTasksFinishedITCase extends 
AbstractTestBase {
+    private static final int NUM_RECORDS = 100;
+
+    private StreamExecutionEnvironment env;
+    private int parallelism;
+
+    public ImmediateCheckpointingAfterAllTasksFinishedITCase(int parallelism) {
+        super();
+        this.parallelism = parallelism;
+    }
+
+    @Before
+    public void setUp() {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        env.setParallelism(parallelism);
+        env.enableCheckpointing(Long.MAX_VALUE - 1);
+    }
+
+    @Parameterized.Parameters(name = "parallelism = {0}")
+    public static Collection<Integer> parameters() {
+        return Arrays.asList(1, 4);
+    }
+
+    @Test
+    public void testImmediateCheckpointing() throws Exception {
+        env.addSource(new IntegerStreamSource())
+                .transform("passA", Types.INT, new PassThroughOperator())
+                .sinkTo(new PrintSink<>());

Review Comment:
   Should we avoid using `PrintSink` in tests?



##########
docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md:
##########
@@ -277,11 +277,10 @@ task with the number of new subtasks equal to the number 
of running tasks.
 
 ### Waiting for the final checkpoint before task exit
 
-To ensure all the records could be committed for operators using the two-phase 
commit, 
-the tasks would wait for the final checkpoint completed successfully after all 
the operators finished. 
-It needs to be noted that this behavior would prolong the execution time of 
tasks. 
-If the checkpoint interval is long, the execution time would also be prolonged 
largely. 
-For the worst case, if the checkpoint interval is set to `Long.MAX_VALUE`, 
-the tasks would in fact be blocked forever since the final checkpoint would 
never happen.
+To ensure all the records could be committed for operators using the two-phase 
commit,
+the tasks would wait for the final checkpoint completed successfully after all 
the operators finished.
+The final checkpoint would be triggered immediately after all operators 
finished, without waiting

Review Comment:
   How about making the following changes:
   
   after all operators finished -> after all operators have received end of 
data 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -502,6 +503,20 @@ public CompletableFuture<Acknowledge> 
updateTaskExecutionState(
         return FutureUtils.completedExceptionally(taskExecutionException);
     }
 
+    @Override
+    public void notifyTaskFinishing(final ExecutionAttemptID executionAttempt) 
{
+        if (jobGraph.getJobType() == JobType.STREAMING

Review Comment:
   If the job mode is in batch and checkpointing is enabled, is it true that 
the job will not wait for checkpointing?
   
   Would it be better to remove `jobGraph.getJobType() == JobType.STREAMING`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -716,6 +716,12 @@ public void acknowledgeCheckpoint(
                 "acknowledgeCheckpoint");
     }
 
+    @Override
+    public boolean notifyTaskFinishing(ExecutionAttemptID executionAttemptID) {
+        // TODO: support adaptive scheduler.

Review Comment:
   I am wondering how much extra work is needed to support adaptive scheduler.
   
   Can we support adaptive scheduler by re-using the logic from 
`SchedulerBase#notifyTaskFinishing`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##########
@@ -1058,6 +1071,21 @@ public CompletableFuture<CoordinationResponse> 
deliverCoordinationRequestToCoord
                 operator, request);
     }
 
+    @Override
+    public boolean notifyTaskFinishing(ExecutionAttemptID executionAttemptID) {
+        BitSet subtaskStatus = 
finishingTasks.get(executionAttemptID.getJobVertexId());
+        subtaskStatus.set(executionAttemptID.getSubtaskIndex());
+
+        for (Map.Entry<JobVertexID, BitSet> entry : finishingTasks.entrySet()) 
{
+            JobVertexID vertex = entry.getKey();
+            BitSet status = entry.getValue();
+            if (status.cardinality() != 
executionGraph.getJobVertex(vertex).getParallelism()) {

Review Comment:
   Can we avoid having to check every vertex each time a subtask reaches 
end-of-data?
   
   For example, maybe we can rename  `finishingTasks` to 
`tasksWaitingEndOfData` and remove job vertex every time all subtasks of a job 
vertex has reached end of data. Then trigger checkpoint when the map is empty.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java:
##########
@@ -316,4 +316,11 @@ CompletableFuture<?> stopTrackingAndReleasePartitions(
      */
     CompletableFuture<Acknowledge> updateJobResourceRequirements(
             JobResourceRequirements jobResourceRequirements);
+
+    /**
+     * Notifies that the execution has reached the end of data and is to be 
finished.
+     *
+     * @param executionAttempt The execution attempt id.
+     */
+    void notifyTaskFinishing(final ExecutionAttemptID executionAttempt);

Review Comment:
   Would it be a bit more simpler to name this method `notifyEndOfData`? And 
update the doc to remove `is to be finished`? The reason is that we do not need 
to additionally define what "finishing task" means.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -502,6 +503,20 @@ public CompletableFuture<Acknowledge> 
updateTaskExecutionState(
         return FutureUtils.completedExceptionally(taskExecutionException);
     }
 
+    @Override
+    public void notifyTaskFinishing(final ExecutionAttemptID executionAttempt) 
{
+        if (jobGraph.getJobType() == JobType.STREAMING
+                && jobGraph.isCheckpointingEnabled()

Review Comment:
   Would it be simpler to move these logic (e.g. `isCheckpointingEnabled()`, 
allTasksFinishing) into `SchedulerBase#notifyTaskFinishing`?
   
   Then we only need to call `schedulerNG.notifyTaskFinishing()` here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##########
@@ -165,6 +166,8 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
 
     private final DeploymentStateTimeMetrics deploymentStateTimeMetrics;
 
+    private final Map<JobVertexID, BitSet> finishingTasks;

Review Comment:
   Currently the per-execution-vertex information is stored in ExecutionVertex. 
Would it be better to also place this information in `ExecutionVertex`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java:
##########
@@ -236,6 +239,16 @@ public SchedulerBase(
         this.exceptionHistory =
                 new BoundedFIFOQueue<>(
                         
jobMasterConfiguration.getInteger(WebOptions.MAX_EXCEPTION_HISTORY_SIZE));
+
+        this.finishingTasks = createFinishingTasks();
+    }
+
+    private Map<JobVertexID, BitSet> createFinishingTasks() {
+        Map<JobVertexID, BitSet> map = new HashMap<>();
+        for (ExecutionJobVertex vertex : 
getExecutionGraph().getAllVertices().values()) {
+            map.put(vertex.getJobVertexId(), new BitSet());

Review Comment:
   Should we exclude ExecutionVertex that has been marked finished?
   
   Otherwise, support the job is restarted after some tasks finished. Then 
checkpoint will not be triggered even if all tasks have finished.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to