AHeise commented on a change in pull request #12478:
URL: https://github.com/apache/flink/pull/12478#discussion_r436501245



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
##########
@@ -102,11 +102,11 @@ public void start(long checkpointId, CheckpointOptions 
checkpointOptions) {
                LOG.debug("{} starting checkpoint {} ({})", taskName, 
checkpointId, checkpointOptions);
                ChannelStateWriteResult result = new ChannelStateWriteResult();
                ChannelStateWriteResult put = 
results.computeIfAbsent(checkpointId, id -> {
-                       Preconditions.checkState(results.size() < 
maxCheckpoints, "results.size() > maxCheckpoints", results.size(), 
maxCheckpoints);
+                       Preconditions.checkState(results.size() < 
maxCheckpoints, String.format("can't start %d, results.size() > maxCheckpoints: 
%d, %d, %s", checkpointId, results.size(), maxCheckpoints, taskName));

Review comment:
       nit: usually we add task name to front, also it would be nice to use `%d 
> %d` for the comparison values.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
##########
@@ -56,7 +56,7 @@
 public class ChannelStateWriterImpl implements ChannelStateWriter {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(ChannelStateWriterImpl.class);
-       private static final int DEFAULT_MAX_CHECKPOINTS = 5; // currently, 
only single in-flight checkpoint is supported
+       private static final int DEFAULT_MAX_CHECKPOINTS = 100; // includes 
max-concurrent-checkpoints + checkpoints to be aborted (scheduled via mailbox)

Review comment:
       I also thought about that in FLINK-17218, but ultimately couldn't find a 
reason why any arbitrary large value offers an advantage over the current 
value. It makes the failing `checkState` more unlikely, but didn't eliminate it.
   
   I also thought about removing the `checkState`, but it helped me to find 
some issues previously.
   
   TL;DR I'm not convinced that this is a proper fix.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -281,6 +281,8 @@ public void notifyCheckpointAborted(long checkpointId, 
OperatorChain<?, ?> opera
                                }
                        }
 
+                       channelStateWriter.abort(checkpointId, new 
CancellationException("checkpoint aborted via notification"), false);

Review comment:
       I like this optimization.
   What happens if some other thread still enqueues data to this checkpoint? Or 
the writes dismissed?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -196,9 +207,14 @@ public void checkpointState(
                // We generally try to emit the checkpoint barrier as soon as 
possible to not affect downstream
                // checkpoint alignments
 
+               if (lastCheckpointId >= metadata.getCheckpointId()) {
+                       LOG.warn("Out of order checkpoint barrier (aborted 
previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());

Review comment:
       Should that be warning? I think on concept level yes.
   However, in reality, in current UC, this case happens probably quite 
frequently (barrier overtakes by cancellation marker doesn't). I wouldn't like 
seeing the log polluted with WARN.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
##########
@@ -137,8 +137,9 @@ boolean isDone() {
 
        /**
         * Aborts the checkpoint and fails pending result for this checkpoint.
+        * @param cleanup true if {@link #getWriteResult(long)} is not supposed 
to be called afterwards.
         */
-       void abort(long checkpointId, Throwable cause);
+       void abort(long checkpointId, Throwable cause, boolean cleanup);

Review comment:
       I guess the intend of cleanup is to avoid throwing an exception if 
`getWriteResult(long)` is called nonetheless, right?
   So in which cases can that happen? Do we ensure cleanup in these cases 
(e.g., is abort then called twice?)?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
##########
@@ -141,15 +141,10 @@ boolean isDone() {
        void abort(long checkpointId, Throwable cause);
 
        /**
-        * Must be called after {@link #start(long, CheckpointOptions)}.
+        * Must be called after {@link #start(long, CheckpointOptions)} once.
         */
        ChannelStateWriteResult getWriteResult(long checkpointId);

Review comment:
       Please rename `getWriteResult`. I think most developers would expect a 
getter to be idempotent.
   
   How about `removeWriteResult`? That would be similar to how a Java map 
works. Or `getAndRemoveWriteResult` to be completely explicit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to