yunfengzhou-hub commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r928498354
##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -18,43 +18,82 @@
package org.apache.flink.runtime.operators.coordination;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.Acknowledge;
import
org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
import org.apache.flink.runtime.util.Runnables;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.concurrent.FutureUtils;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
/**
* Implementation of the {@link OperatorCoordinator.SubtaskGateway} interface
that access to
* subtasks for status and event sending via {@link SubtaskAccess}.
+ *
+ * <p>Instances of this class can be temporarily closed, blocking events from
going through,
+ * buffering them, and releasing them later. It is used for "alignment" of
operator event streams
+ * with checkpoint barrier injection, similar to how the input channels are
aligned during a common
+ * checkpoint.
+ *
+ * <p>The methods on the critical communication path, including
closing/reopening the gateway and
+ * sending the operator events, are required to be used in a single-threaded
context specified by
+ * the {@link #mainThreadExecutor} in {@link #SubtaskGatewayImpl(SubtaskAccess,
+ * ComponentMainThreadExecutor, IncompleteFuturesTracker)} in order to avoid
concurrent issues.
*/
class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway {
private static final String EVENT_LOSS_ERROR_MESSAGE =
"An OperatorEvent from an OperatorCoordinator to a task was lost. "
+ "Triggering task failover to ensure consistency. Event:
'%s', targetTask: %s";
+ private static final long NO_CHECKPOINT = Long.MIN_VALUE;
+
private final SubtaskAccess subtaskAccess;
- private final EventSender sender;
- private final Executor sendingExecutor;
+
+ private final ComponentMainThreadExecutor mainThreadExecutor;
+
private final IncompleteFuturesTracker incompleteFuturesTracker;
+ private final List<BlockedEvent> blockedEvents = new ArrayList<>();
+
+ private long currentCheckpointId;
+
+ private long lastCheckpointId;
+
+ private boolean isClosed;
+
SubtaskGatewayImpl(
SubtaskAccess subtaskAccess,
- EventSender sender,
- Executor sendingExecutor,
+ ComponentMainThreadExecutor mainThreadExecutor,
IncompleteFuturesTracker incompleteFuturesTracker) {
this.subtaskAccess = subtaskAccess;
- this.sender = sender;
- this.sendingExecutor = sendingExecutor;
+ this.mainThreadExecutor = mainThreadExecutor;
this.incompleteFuturesTracker = incompleteFuturesTracker;
+ this.currentCheckpointId = NO_CHECKPOINT;
+ this.lastCheckpointId = Long.MIN_VALUE;
+ }
+
+ @Override
+ public ExecutionAttemptID getExecution() {
Review Comment:
There is no important reason to move these methods. I'll move them back.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]