pnowojski commented on code in PR #20233:
URL: https://github.com/apache/flink/pull/20233#discussion_r924252841
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -60,89 +63,78 @@
public class ChannelStateWriterImpl implements ChannelStateWriter {
private static final Logger LOG =
LoggerFactory.getLogger(ChannelStateWriterImpl.class);
- private static final int DEFAULT_MAX_CHECKPOINTS =
- 1000; // includes max-concurrent-checkpoints + checkpoints to be
aborted (scheduled via
- // mailbox)
private final String taskName;
private final ChannelStateWriteRequestExecutor executor;
- private final ConcurrentMap<Long, ChannelStateWriteResult> results;
- private final int maxCheckpoints;
- /**
- * Creates a {@link ChannelStateWriterImpl} with {@link
#DEFAULT_MAX_CHECKPOINTS} as {@link
- * #maxCheckpoints}.
- */
- public ChannelStateWriterImpl(
- String taskName, int subtaskIndex, CheckpointStorageWorkerView
streamFactoryResolver) {
- this(taskName, subtaskIndex, streamFactoryResolver,
DEFAULT_MAX_CHECKPOINTS);
- }
+ private final Object lock = new Object();
+
+ @GuardedBy("lock")
+ private long ongoingCheckpointId;
+
+ @GuardedBy("lock")
+ private final NavigableSet<Long> abortedCheckpointIds;
+
+ // The result of ongoingCheckpointId, the checkpoint that CheckpointId is
less than
+ // ongoingCheckpointId should be aborted due to concurrent unaligned
checkpoint is currently not
+ // supported.
+ @GuardedBy("lock")
+ private ChannelStateWriteResult result;
Review Comment:
Ops, sorry. I've mislead you. Now I remember it was supposed to be task
owned class, but as a result of some discussions it ended up being used from
the Netty thread (`addInputData` called from
`ChannelStatePersister#maybePersist`). That's also why we needed
`ConcurrentMap` here before.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java:
##########
@@ -60,89 +63,78 @@
public class ChannelStateWriterImpl implements ChannelStateWriter {
private static final Logger LOG =
LoggerFactory.getLogger(ChannelStateWriterImpl.class);
- private static final int DEFAULT_MAX_CHECKPOINTS =
- 1000; // includes max-concurrent-checkpoints + checkpoints to be
aborted (scheduled via
- // mailbox)
Review Comment:
Thanks for the explanation. But in that case, why do we need
`abortedCheckpointIds` set? Couldn't we have just `long ongoingCheckpointId`
and `long nextExpectedCheckpointId`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]