AHeise commented on a change in pull request #12478:
URL: https://github.com/apache/flink/pull/12478#discussion_r436501245
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
##########
@@ -102,11 +102,11 @@ public void start(long checkpointId, CheckpointOptions
checkpointOptions) {
LOG.debug("{} starting checkpoint {} ({})", taskName,
checkpointId, checkpointOptions);
ChannelStateWriteResult result = new ChannelStateWriteResult();
ChannelStateWriteResult put =
results.computeIfAbsent(checkpointId, id -> {
- Preconditions.checkState(results.size() <
maxCheckpoints, "results.size() > maxCheckpoints", results.size(),
maxCheckpoints);
+ Preconditions.checkState(results.size() <
maxCheckpoints, String.format("can't start %d, results.size() > maxCheckpoints:
%d, %d, %s", checkpointId, results.size(), maxCheckpoints, taskName));
Review comment:
nit: usually we add task name to front, also it would be nice to use `%d
> %d` for the comparison values.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
##########
@@ -56,7 +56,7 @@
public class ChannelStateWriterImpl implements ChannelStateWriter {
private static final Logger LOG =
LoggerFactory.getLogger(ChannelStateWriterImpl.class);
- private static final int DEFAULT_MAX_CHECKPOINTS = 5; // currently,
only single in-flight checkpoint is supported
+ private static final int DEFAULT_MAX_CHECKPOINTS = 100; // includes
max-concurrent-checkpoints + checkpoints to be aborted (scheduled via mailbox)
Review comment:
I also thought about that in FLINK-17218, but ultimately couldn't find a
reason why any arbitrary large value offers an advantage over the current
value. It makes the failing `checkState` more unlikely, but didn't eliminate it.
I also thought about removing the `checkState`, but it helped me to find
some issues previously.
TL;DR I'm not convinced that this is a proper fix.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -281,6 +281,8 @@ public void notifyCheckpointAborted(long checkpointId,
OperatorChain<?, ?> opera
}
}
+ channelStateWriter.abort(checkpointId, new
CancellationException("checkpoint aborted via notification"), false);
Review comment:
I like this optimization.
What happens if some other thread still enqueues data to this checkpoint? Or
the writes dismissed?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
##########
@@ -196,9 +207,14 @@ public void checkpointState(
// We generally try to emit the checkpoint barrier as soon as
possible to not affect downstream
// checkpoint alignments
+ if (lastCheckpointId >= metadata.getCheckpointId()) {
+ LOG.warn("Out of order checkpoint barrier (aborted
previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());
Review comment:
Should that be warning? I think on concept level yes.
However, in reality, in current UC, this case happens probably quite
frequently (barrier overtakes by cancellation marker doesn't). I wouldn't like
seeing the log polluted with WARN.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
##########
@@ -137,8 +137,9 @@ boolean isDone() {
/**
* Aborts the checkpoint and fails pending result for this checkpoint.
+ * @param cleanup true if {@link #getWriteResult(long)} is not supposed
to be called afterwards.
*/
- void abort(long checkpointId, Throwable cause);
+ void abort(long checkpointId, Throwable cause, boolean cleanup);
Review comment:
I guess the intend of cleanup is to avoid throwing an exception if
`getWriteResult(long)` is called nonetheless, right?
So in which cases can that happen? Do we ensure cleanup in these cases
(e.g., is abort then called twice?)?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
##########
@@ -141,15 +141,10 @@ boolean isDone() {
void abort(long checkpointId, Throwable cause);
/**
- * Must be called after {@link #start(long, CheckpointOptions)}.
+ * Must be called after {@link #start(long, CheckpointOptions)} once.
*/
ChannelStateWriteResult getWriteResult(long checkpointId);
Review comment:
Please rename `getWriteResult`. I think most developers would expect a
getter to be idempotent.
How about `removeWriteResult`? That would be similar to how a Java map
works. Or `getAndRemoveWriteResult` to be completely explicit.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]