dawidwys commented on a change in pull request #16633:
URL: https://github.com/apache/flink/pull/16633#discussion_r681241135



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
##########
@@ -368,6 +400,85 @@ public CompletedCheckpoint finalizeCheckpoint(
         }
     }
 
+    /**
+     * If a job vertex using {@code UnionListState} has part of tasks FINISHED 
where others are
+     * still in RUNNING state, the checkpoint would be aborted since it might 
cause incomplete
+     * {@code UnionListState}.
+     */
+    private void checkNoPartlyFinishedVertexUsedUnionListState(
+            Map<JobVertexID, ExecutionJobVertex> partlyFinishedVertex) {
+        for (ExecutionJobVertex vertex : partlyFinishedVertex.values()) {
+            if (hasUsedUnionListState(vertex)) {
+                throw new IllegalStateException(
+                        String.format(
+                                "The vertex %s (id = %s) has used"
+                                        + " UnionListState, but part of its 
tasks are FINISHED.",
+                                vertex.getName(), vertex.getJobVertexId()));
+            }
+        }
+    }
+
+    /**
+     * If a job vertex using {@code UnionListState} has all the tasks in 
RUNNING state, but part of
+     * the tasks have reported that the operators are finished, the checkpoint 
would be aborted.
+     * This is to force the fast tasks wait for the slow tasks so that their 
final checkpoints would
+     * be the same one, otherwise if the fast tasks finished, the slow tasks 
would be blocked
+     * forever since all the following checkpoints would be aborted.
+     */
+    private void checkNoPartlyOperatorsFinishedVertexUsedUnionListState(
+            Map<JobVertexID, ExecutionJobVertex> partlyFinishedVertex) {
+        for (Map.Entry<ExecutionJobVertex, Integer> entry :
+                vertexOperatorsFinishedTasksCount.entrySet()) {
+            ExecutionJobVertex vertex = entry.getKey();
+
+            // If the vertex is partly finished, then it must not used 
UnionListState
+            // due to it passed the previous check.
+            if (partlyFinishedVertex.containsKey(vertex.getJobVertexId())) {
+                continue;
+            }
+
+            if (entry.getValue() != vertex.getParallelism() && 
hasUsedUnionListState(vertex)) {
+                throw new IllegalStateException(

Review comment:
       nit: I think `IllegalStateException` is not the best choice here. After 
all it's not so much of an illegal state.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to