lindong28 commented on code in PR #20752:
URL: https://github.com/apache/flink/pull/20752#discussion_r962646192
##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -78,10 +80,9 @@ class SubtaskGatewayImpl implements
OperatorCoordinator.SubtaskGateway {
this.subtaskAccess = subtaskAccess;
this.mainThreadExecutor = mainThreadExecutor;
this.incompleteFuturesTracker = incompleteFuturesTracker;
- this.blockedEvents = new ArrayList<>();
- this.currentCheckpointId = NO_CHECKPOINT;
- this.lastCheckpointId = Long.MIN_VALUE;
- this.isClosed = false;
+ this.blockedEventsMap = new TreeMap<>();
+ this.latestActiveCheckpointId = NO_CHECKPOINT;
+ this.latestAttemptedCheckpointId = Long.MIN_VALUE;
Review Comment:
Would it be better to replace `Long.MIN_VALUE` with `NO_CHECKPOINT`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -63,13 +67,11 @@ class SubtaskGatewayImpl implements
OperatorCoordinator.SubtaskGateway {
private final IncompleteFuturesTracker incompleteFuturesTracker;
- private final List<BlockedEvent> blockedEvents;
+ private final TreeMap<Long, List<BlockedEvent>> blockedEventsMap;
- private long currentCheckpointId;
+ private long latestActiveCheckpointId;
Review Comment:
Would it be simpler to remove this variable and derive its value from
`blockedEventsMap` when needed?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -221,38 +215,57 @@ void openGatewayAndUnmarkCheckpoint(long checkpointId) {
// Gateways should always be marked and closed for a specific
checkpoint before it can be
// reopened for that checkpoint. If a gateway is to be opened for an
unforeseen checkpoint,
// exceptions should be thrown.
- if (lastCheckpointId < checkpointId) {
+ if (latestAttemptedCheckpointId < checkpointId) {
throw new IllegalStateException(
String.format(
"Gateway closed for different checkpoint: closed
for = %d, expected = %d",
- currentCheckpointId, checkpointId));
+ latestActiveCheckpointId, checkpointId));
}
// The message to open gateway with a specific checkpoint id might
arrive after the
// checkpoint has been aborted, or even after a new checkpoint has
started. In these cases
// this message should be ignored.
- if (currentCheckpointId == NO_CHECKPOINT || checkpointId <
lastCheckpointId) {
+ if (latestActiveCheckpointId == NO_CHECKPOINT
Review Comment:
Would it be simpler to remove `latestActiveCheckpointId == NO_CHECKPOINT`
from this boolean expression?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]