lindong28 commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r923952394
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorEventDispatcherImpl.java:
##########
@@ -82,6 +83,10 @@ public void registerEventHandler(OperatorID operator,
OperatorEventHandler handl
}
}
+ Set<OperatorID> getRegisteredOperators() {
Review Comment:
Could we explicitly mark methods as `private/protected/public` for
readability and consistency?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -245,7 +262,13 @@ public void notifyCheckpointAborted(long checkpointId) {
// checkpoint coordinator time thread.
// we can remove the delegation once the checkpoint coordinator runs
fully in the
// scheduler's main thread executor
- mainThreadExecutor.execute(() ->
coordinator.notifyCheckpointAborted(checkpointId));
+ mainThreadExecutor.execute(
+ () -> {
+ subtaskGatewayMap
+ .values()
+ .forEach(x ->
x.openGatewayAndUnmarkCheckpoint(checkpointId));
+ coordinator.notifyCheckpointAborted(checkpointId);
Review Comment:
This method does not call `eventValve.openValveAndUnmarkCheckpoint()` prior
to this PR. Could you explain why it is now necessary to call
`openGatewayAndUnmarkCheckpoint()`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -277,29 +300,33 @@ private void checkpointCoordinatorInternal(
final long checkpointId, final CompletableFuture<byte[]> result) {
mainThreadExecutor.assertRunningInMainThread();
- final CompletableFuture<byte[]> coordinatorCheckpoint = new
CompletableFuture<>();
-
- FutureUtils.assertNoException(
- coordinatorCheckpoint.handleAsync(
- (success, failure) -> {
- if (failure != null) {
- result.completeExceptionally(failure);
- } else if (eventValve.tryShutValve(checkpointId)) {
-
completeCheckpointOnceEventsAreDone(checkpointId, result, success);
- } else {
- // if we cannot shut the valve, this means the
checkpoint
- // has been aborted before, so the future is
already
- // completed exceptionally. but we try to
complete it here
- // again, just in case, as a safety net.
- result.completeExceptionally(
- new FlinkException("Cannot shut event
valve"));
- }
- return null;
- },
- mainThreadExecutor));
-
try {
- eventValve.markForCheckpoint(checkpointId);
+ subtaskGatewayMap.values().forEach(x ->
x.markForCheckpoint(checkpointId));
+
+ final CompletableFuture<byte[]> coordinatorCheckpoint = new
CompletableFuture<>();
+
+ coordinatorCheckpoint.whenComplete(
+ (success, failure) -> {
+ if (failure != null) {
+ result.completeExceptionally(failure);
+ } else {
+ closeGateways(checkpointId, result);
+ }
+ });
+
+ FutureUtils.assertNoException(
+ coordinatorCheckpoint.handleAsync(
+ (success, failure) -> {
+ if (failure != null) {
+ result.completeExceptionally(failure);
Review Comment:
This line seems to duplicate the logic at line 311 above.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -308,6 +335,26 @@ private void checkpointCoordinatorInternal(
}
}
+ private void closeGateways(final long checkpointId, final
CompletableFuture<byte[]> result) {
+ Set<CloseableSubtaskGateway> closedGateways = new HashSet<>();
+ for (CloseableSubtaskGateway gateway : subtaskGatewayMap.values()) {
+ if (!gateway.tryCloseGateway(checkpointId)) {
+
closedGateways.forEach(CloseableSubtaskGateway::openGatewayAndUnmarkCheckpoint);
Review Comment:
Is there any case where `closedGateways` is not empty at this point? If no,
could it be better to throw IllegalStateException if `closedGateways()` is not
empty?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]