rkhachatryan commented on a change in pull request #14734: URL: https://github.com/apache/flink/pull/14734#discussion_r569486717
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBrief.java ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; + +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The brief of one checkpoint, indicating which tasks to trigger, waiting for acknowledge or commit + * for one specific checkpoint. + */ +public class CheckpointBrief { Review comment: To me, the name `CheckpointBrief` is a bit ambiguous. How about `CheckpointTasks`, `CheckpointPlan`, `CheckpointSpec` (the latter overlaps with options though)? ditto: `CheckpointBriefCalculator` ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointBriefCalculator.java ########## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Computes the tasks to trigger, wait or commit for each checkpoint. */ +public class CheckpointBriefCalculator { + private static final Logger LOG = LoggerFactory.getLogger(CheckpointBriefCalculator.class); + + private final JobID jobId; + + private final List<ExecutionVertex> tasksToTrigger; + + private final List<ExecutionVertex> tasksToWait; + + private final List<ExecutionVertex> tasksToCommitTo; + + public CheckpointBriefCalculator( + JobID jobId, + List<ExecutionVertex> tasksToTrigger, + List<ExecutionVertex> tasksToWait, + List<ExecutionVertex> tasksToCommitTo) { + + this.jobId = jobId; + this.tasksToTrigger = Collections.unmodifiableList(tasksToTrigger); + this.tasksToWait = Collections.unmodifiableList(tasksToWait); + this.tasksToCommitTo = Collections.unmodifiableList(tasksToCommitTo); + } + + public CheckpointBrief calculateCheckpointBrief() throws CheckpointException { + List<Execution> tasksToTrigger = getTriggerExecutions(); + Map<ExecutionAttemptID, ExecutionVertex> tasksToWait = getAckTasks(); + + return new CheckpointBrief( + Collections.unmodifiableList(tasksToTrigger), tasksToWait, tasksToCommitTo); Review comment: I think we should do that (make it unmodifiable here and copy in `PendingCheckpoint`). I expect `CheckpointBrief` to be immutable. Besides, this class isn't concerned about mutability of these three structures: two are newly created, one is already immutable (`tasksToCommitTo`). So I'd move wrapping with `unmodifiable` to the `CheckpointBrief` constructor. nit: inline variables? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -350,13 +340,15 @@ public CheckpointCoordinator( this.minPauseBetweenCheckpoints, this.pendingCheckpoints::size, this.checkpointsCleaner::getNumberOfCheckpointsToClean); + this.cachedTasksById = - new LinkedHashMap<ExecutionAttemptID, ExecutionVertex>(tasksToWaitFor.length) { + new LinkedHashMap<ExecutionAttemptID, ExecutionVertex>( + attemptMappingProvider.getNumberOfTasks()) { @Override protected boolean removeEldestEntry( Map.Entry<ExecutionAttemptID, ExecutionVertex> eldest) { - return size() > CheckpointCoordinator.this.tasksToWaitFor.length; + return size() > attemptMappingProvider.getNumberOfTasks(); Review comment: The interaction between cache in `CheckpointCoordinator` and `ExecutionAttemptMappingProvider` seems a bit fragile to me. And they both have kind of overlapping responsibilities. What do you think about moving the cache istelf to the new `ExecutionAttemptMappingProvider`? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ########## @@ -464,7 +459,13 @@ public void failJobDueToTaskFailure( checkpointsCleaner, new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer), SharedStateRegistry.DEFAULT_FACTORY, - failureManager); + failureManager, + new CheckpointBriefCalculator( + getJobID(), + sourceAndAllVertices.f0, + sourceAndAllVertices.f1, + sourceAndAllVertices.f1), + new ExecutionAttemptMappingProvider(sourceAndAllVertices.f1)); Review comment: I think `ExecutionGraph` needs `getSourceAndAllVertices()` only to construct these two objects. Why not move this method to `CheckpointBriefCalculator`. I guess it will also make future steps easier (dynamic sources calculation). `ExecutionAttemptMappingProvider` can then depend on `CheckpointBriefCalculator`. WDYT? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ########## @@ -142,6 +145,7 @@ public PendingCheckpoint( this.checkpointId = checkpointId; this.checkpointTimestamp = checkpointTimestamp; this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm); + this.tasksToCommitTo = checkNotNull(tasksToCommitTo); Review comment: nit: one step further would be to store `CheckpointBrief` in `PendingCheckpoint` instead of collections. But that's probably out of scope of this PR ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
