dawidwys commented on a change in pull request #16633:
URL: https://github.com/apache/flink/pull/16633#discussion_r681241135
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
##########
@@ -368,6 +400,85 @@ public CompletedCheckpoint finalizeCheckpoint(
}
}
+ /**
+ * If a job vertex using {@code UnionListState} has part of tasks FINISHED
where others are
+ * still in RUNNING state, the checkpoint would be aborted since it might
cause incomplete
+ * {@code UnionListState}.
+ */
+ private void checkNoPartlyFinishedVertexUsedUnionListState(
+ Map<JobVertexID, ExecutionJobVertex> partlyFinishedVertex) {
+ for (ExecutionJobVertex vertex : partlyFinishedVertex.values()) {
+ if (hasUsedUnionListState(vertex)) {
+ throw new IllegalStateException(
+ String.format(
+ "The vertex %s (id = %s) has used"
+ + " UnionListState, but part of its
tasks are FINISHED.",
+ vertex.getName(), vertex.getJobVertexId()));
+ }
+ }
+ }
+
+ /**
+ * If a job vertex using {@code UnionListState} has all the tasks in
RUNNING state, but part of
+ * the tasks have reported that the operators are finished, the checkpoint
would be aborted.
+ * This is to force the fast tasks wait for the slow tasks so that their
final checkpoints would
+ * be the same one, otherwise if the fast tasks finished, the slow tasks
would be blocked
+ * forever since all the following checkpoints would be aborted.
+ */
+ private void checkNoPartlyOperatorsFinishedVertexUsedUnionListState(
+ Map<JobVertexID, ExecutionJobVertex> partlyFinishedVertex) {
+ for (Map.Entry<ExecutionJobVertex, Integer> entry :
+ vertexOperatorsFinishedTasksCount.entrySet()) {
+ ExecutionJobVertex vertex = entry.getKey();
+
+ // If the vertex is partly finished, then it must not used
UnionListState
+ // due to it passed the previous check.
+ if (partlyFinishedVertex.containsKey(vertex.getJobVertexId())) {
+ continue;
+ }
+
+ if (entry.getValue() != vertex.getParallelism() &&
hasUsedUnionListState(vertex)) {
+ throw new IllegalStateException(
Review comment:
nit: I think `IllegalStateException` is not the best choice here. After
all it's not so much of an illegal state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]