lindong28 commented on code in PR #22670: URL: https://github.com/apache/flink/pull/22670#discussion_r1243174207
########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/ImmediateCheckpointingAfterAllTasksFinishedITCase.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.PrintSink; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; + +/** Tests an immediate checkpoint should be triggered right after all tasks finished. */ +@RunWith(Parameterized.class) +public class ImmediateCheckpointingAfterAllTasksFinishedITCase extends AbstractTestBase { + private static final int NUM_RECORDS = 100; + + private StreamExecutionEnvironment env; + private int parallelism; + + public ImmediateCheckpointingAfterAllTasksFinishedITCase(int parallelism) { + super(); + this.parallelism = parallelism; + } + + @Before + public void setUp() { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setParallelism(parallelism); + env.enableCheckpointing(Long.MAX_VALUE - 1); + } + + @Parameterized.Parameters(name = "parallelism = {0}") + public static Collection<Integer> parameters() { + return Arrays.asList(1, 4); + } + + @Test + public void testImmediateCheckpointing() throws Exception { + env.addSource(new IntegerStreamSource()) + .transform("passA", Types.INT, new PassThroughOperator()) + .sinkTo(new PrintSink<>()); Review Comment: Should we avoid using `PrintSink` in tests? ########## docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md: ########## @@ -277,11 +277,10 @@ task with the number of new subtasks equal to the number of running tasks. ### Waiting for the final checkpoint before task exit -To ensure all the records could be committed for operators using the two-phase commit, -the tasks would wait for the final checkpoint completed successfully after all the operators finished. -It needs to be noted that this behavior would prolong the execution time of tasks. -If the checkpoint interval is long, the execution time would also be prolonged largely. -For the worst case, if the checkpoint interval is set to `Long.MAX_VALUE`, -the tasks would in fact be blocked forever since the final checkpoint would never happen. +To ensure all the records could be committed for operators using the two-phase commit, +the tasks would wait for the final checkpoint completed successfully after all the operators finished. +The final checkpoint would be triggered immediately after all operators finished, without waiting Review Comment: How about making the following changes: after all operators finished -> after all operators have received end of data ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ########## @@ -502,6 +503,20 @@ public CompletableFuture<Acknowledge> updateTaskExecutionState( return FutureUtils.completedExceptionally(taskExecutionException); } + @Override + public void notifyTaskFinishing(final ExecutionAttemptID executionAttempt) { + if (jobGraph.getJobType() == JobType.STREAMING Review Comment: If the job mode is in batch and checkpointing is enabled, is it true that the job will not wait for checkpointing? Would it be better to remove `jobGraph.getJobType() == JobType.STREAMING`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java: ########## @@ -716,6 +716,12 @@ public void acknowledgeCheckpoint( "acknowledgeCheckpoint"); } + @Override + public boolean notifyTaskFinishing(ExecutionAttemptID executionAttemptID) { + // TODO: support adaptive scheduler. Review Comment: I am wondering how much extra work is needed to support adaptive scheduler. Can we support adaptive scheduler by re-using the logic from `SchedulerBase#notifyTaskFinishing`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java: ########## @@ -1058,6 +1071,21 @@ public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoord operator, request); } + @Override + public boolean notifyTaskFinishing(ExecutionAttemptID executionAttemptID) { + BitSet subtaskStatus = finishingTasks.get(executionAttemptID.getJobVertexId()); + subtaskStatus.set(executionAttemptID.getSubtaskIndex()); + + for (Map.Entry<JobVertexID, BitSet> entry : finishingTasks.entrySet()) { + JobVertexID vertex = entry.getKey(); + BitSet status = entry.getValue(); + if (status.cardinality() != executionGraph.getJobVertex(vertex).getParallelism()) { Review Comment: Can we avoid having to check every vertex each time a subtask reaches end-of-data? For example, maybe we can rename `finishingTasks` to `tasksWaitingEndOfData` and remove job vertex every time all subtasks of a job vertex has reached end of data. Then trigger checkpoint when the map is empty. ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java: ########## @@ -316,4 +316,11 @@ CompletableFuture<?> stopTrackingAndReleasePartitions( */ CompletableFuture<Acknowledge> updateJobResourceRequirements( JobResourceRequirements jobResourceRequirements); + + /** + * Notifies that the execution has reached the end of data and is to be finished. + * + * @param executionAttempt The execution attempt id. + */ + void notifyTaskFinishing(final ExecutionAttemptID executionAttempt); Review Comment: Would it be a bit more simpler to name this method `notifyEndOfData`? And update the doc to remove `is to be finished`? The reason is that we do not need to additionally define what "finishing task" means. ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ########## @@ -502,6 +503,20 @@ public CompletableFuture<Acknowledge> updateTaskExecutionState( return FutureUtils.completedExceptionally(taskExecutionException); } + @Override + public void notifyTaskFinishing(final ExecutionAttemptID executionAttempt) { + if (jobGraph.getJobType() == JobType.STREAMING + && jobGraph.isCheckpointingEnabled() Review Comment: Would it be simpler to move these logic (e.g. `isCheckpointingEnabled()`, allTasksFinishing) into `SchedulerBase#notifyTaskFinishing`? Then we only need to call `schedulerNG.notifyTaskFinishing()` here. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java: ########## @@ -165,6 +166,8 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling private final DeploymentStateTimeMetrics deploymentStateTimeMetrics; + private final Map<JobVertexID, BitSet> finishingTasks; Review Comment: Currently the per-execution-vertex information is stored in ExecutionVertex. Would it be better to also place this information in `ExecutionVertex`? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java: ########## @@ -236,6 +239,16 @@ public SchedulerBase( this.exceptionHistory = new BoundedFIFOQueue<>( jobMasterConfiguration.getInteger(WebOptions.MAX_EXCEPTION_HISTORY_SIZE)); + + this.finishingTasks = createFinishingTasks(); + } + + private Map<JobVertexID, BitSet> createFinishingTasks() { + Map<JobVertexID, BitSet> map = new HashMap<>(); + for (ExecutionJobVertex vertex : getExecutionGraph().getAllVertices().values()) { + map.put(vertex.getJobVertexId(), new BitSet()); Review Comment: Should we exclude ExecutionVertex that has been marked finished? Otherwise, support the job is restarted after some tasks finished. Then checkpoint will not be triggered even if all tasks have finished. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
