lindong28 commented on code in PR #20275:
URL: https://github.com/apache/flink/pull/20275#discussion_r927315690
##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -284,22 +311,24 @@ private void checkpointCoordinatorInternal(
(success, failure) -> {
if (failure != null) {
result.completeExceptionally(failure);
- } else if (eventValve.tryShutValve(checkpointId)) {
+ } else if (closeGateways(checkpointId,
subtasksToCheckpoint)) {
completeCheckpointOnceEventsAreDone(checkpointId, result, success);
} else {
- // if we cannot shut the valve, this means the
checkpoint
+ // if we cannot close the gateway, this means
the checkpoint
// has been aborted before, so the future is
already
// completed exceptionally. but we try to
complete it here
// again, just in case, as a safety net.
result.completeExceptionally(
- new FlinkException("Cannot shut event
valve"));
+ new FlinkException("Cannot close
gateway"));
}
return null;
},
mainThreadExecutor));
try {
- eventValve.markForCheckpoint(checkpointId);
+ for (int subtask : subtasksToCheckpoint) {
+ subtaskGatewayMap.get(subtask).markForCheckpoint(checkpointId);
Review Comment:
Would it be simpler to call `markForCheckpoint()` for every subtask instead
of additionally passing `subtasksToCheckpoint` as a function input parameter?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -120,7 +118,7 @@
private final OperatorID operatorId;
private final LazyInitializedCoordinatorContext context;
private final SubtaskAccess.SubtaskAccessFactory taskAccesses;
- private final OperatorEventValve eventValve;
+ private final Map<Integer, SubtaskGatewayImpl> subtaskGatewayMap = new
HashMap<>();
Review Comment:
Should we instantiate this variable in the constructor for consistency with
`unconfirmedEvents`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorCoordinatorCheckpoints.java:
##########
@@ -59,15 +66,25 @@ public static CompletableFuture<CoordinatorSnapshot>
triggerCoordinatorCheckpoin
public static CompletableFuture<AllCoordinatorSnapshots>
triggerAllCoordinatorCheckpoints(
final Collection<OperatorCoordinatorCheckpointContext>
coordinators,
- final long checkpointId)
+ final PendingCheckpoint checkpoint)
throws Exception {
final Collection<CompletableFuture<CoordinatorSnapshot>>
individualSnapshots =
new ArrayList<>(coordinators.size());
for (final OperatorCoordinatorCheckpointContext coordinator :
coordinators) {
+ Set<Integer> subtasksToCheckpoint = new HashSet<>();
Review Comment:
Is there any conceptual difference between `tasksToWaitFor` and
`subtasksToCheckpoint`? If no, it is probably more readable to use a name that
is consistent with `CheckpointPlan::getTasksToWaitFor()`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -18,43 +18,82 @@
package org.apache.flink.runtime.operators.coordination;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.Acknowledge;
import
org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
import org.apache.flink.runtime.util.Runnables;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.concurrent.FutureUtils;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
/**
* Implementation of the {@link OperatorCoordinator.SubtaskGateway} interface
that access to
* subtasks for status and event sending via {@link SubtaskAccess}.
+ *
+ * <p>Instances of this class can be temporarily closed, blocking events from
going through,
+ * buffering them, and releasing them later. It is used for "alignment" of
operator event streams
+ * with checkpoint barrier injection, similar to how the input channels are
aligned during a common
+ * checkpoint.
+ *
+ * <p>The methods on the critical communication path, including
closing/reopening the gateway and
+ * sending the operator events, are required to be used in a single-threaded
context specified by
+ * the {@link #mainThreadExecutor} in {@link #SubtaskGatewayImpl(SubtaskAccess,
+ * ComponentMainThreadExecutor, IncompleteFuturesTracker)} in order to avoid
concurrent issues.
Review Comment:
`concurrent issues` -> `race condition`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java:
##########
@@ -308,6 +337,24 @@ private void checkpointCoordinatorInternal(
}
}
+ private boolean closeGateways(
+ final long checkpointId, final Set<Integer> subtasksToCheckpoint) {
+ boolean hasCloseableGateway = false;
+ for (int subtask : subtasksToCheckpoint) {
+ SubtaskGatewayImpl gateway = subtaskGatewayMap.get(subtask);
+ if (!gateway.tryCloseGateway(checkpointId)) {
Review Comment:
Would it be simpler to call `tryCloseGateway()` for every subtask instead of
only for `subtasksToCheckpoint`?
The logic here seems to suggest that it is possible to have
`tryCloseGateway(..)` fail for all tasks consistently. But it is not possible
to have it fail for a subset of tasks. Can you explain this possibility and the
impossibility?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorEventDispatcher.java:
##########
@@ -29,6 +29,9 @@ public interface OperatorEventDispatcher {
/**
* Register a listener that is notified every time an OperatorEvent is
sent from the
* OperatorCoordinator (of the operator with the given OperatorID) to this
subtask.
+ *
+ * <p>The stream operator with the given OperatorID must implement {@link
OperatorEventHandler}
Review Comment:
The existing comment (prior to this PR) seems to have described the behavior
of calling this method. Could you explain what is the extra benefit of adding
this comment?
And why does the operator with the given operatorID have to implement
`OperatorEventHandler`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -18,43 +18,82 @@
package org.apache.flink.runtime.operators.coordination;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.Acknowledge;
import
org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
import org.apache.flink.runtime.util.Runnables;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.concurrent.FutureUtils;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
/**
* Implementation of the {@link OperatorCoordinator.SubtaskGateway} interface
that access to
* subtasks for status and event sending via {@link SubtaskAccess}.
+ *
+ * <p>Instances of this class can be temporarily closed, blocking events from
going through,
+ * buffering them, and releasing them later. It is used for "alignment" of
operator event streams
+ * with checkpoint barrier injection, similar to how the input channels are
aligned during a common
+ * checkpoint.
+ *
+ * <p>The methods on the critical communication path, including
closing/reopening the gateway and
+ * sending the operator events, are required to be used in a single-threaded
context specified by
+ * the {@link #mainThreadExecutor} in {@link #SubtaskGatewayImpl(SubtaskAccess,
+ * ComponentMainThreadExecutor, IncompleteFuturesTracker)} in order to avoid
concurrent issues.
*/
class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway {
private static final String EVENT_LOSS_ERROR_MESSAGE =
"An OperatorEvent from an OperatorCoordinator to a task was lost. "
+ "Triggering task failover to ensure consistency. Event:
'%s', targetTask: %s";
+ private static final long NO_CHECKPOINT = Long.MIN_VALUE;
+
private final SubtaskAccess subtaskAccess;
- private final EventSender sender;
- private final Executor sendingExecutor;
+
+ private final ComponentMainThreadExecutor mainThreadExecutor;
+
private final IncompleteFuturesTracker incompleteFuturesTracker;
+ private final List<BlockedEvent> blockedEvents = new ArrayList<>();
Review Comment:
Should we initialize this variable in the constructor for consistency with
e.g. `lastCheckpointId`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -18,43 +18,82 @@
package org.apache.flink.runtime.operators.coordination;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.Acknowledge;
import
org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
import org.apache.flink.runtime.util.Runnables;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.concurrent.FutureUtils;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
/**
* Implementation of the {@link OperatorCoordinator.SubtaskGateway} interface
that access to
* subtasks for status and event sending via {@link SubtaskAccess}.
+ *
+ * <p>Instances of this class can be temporarily closed, blocking events from
going through,
+ * buffering them, and releasing them later. It is used for "alignment" of
operator event streams
+ * with checkpoint barrier injection, similar to how the input channels are
aligned during a common
+ * checkpoint.
+ *
+ * <p>The methods on the critical communication path, including
closing/reopening the gateway and
+ * sending the operator events, are required to be used in a single-threaded
context specified by
+ * the {@link #mainThreadExecutor} in {@link #SubtaskGatewayImpl(SubtaskAccess,
+ * ComponentMainThreadExecutor, IncompleteFuturesTracker)} in order to avoid
concurrent issues.
*/
class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway {
private static final String EVENT_LOSS_ERROR_MESSAGE =
"An OperatorEvent from an OperatorCoordinator to a task was lost. "
+ "Triggering task failover to ensure consistency. Event:
'%s', targetTask: %s";
+ private static final long NO_CHECKPOINT = Long.MIN_VALUE;
+
private final SubtaskAccess subtaskAccess;
- private final EventSender sender;
- private final Executor sendingExecutor;
+
+ private final ComponentMainThreadExecutor mainThreadExecutor;
+
private final IncompleteFuturesTracker incompleteFuturesTracker;
+ private final List<BlockedEvent> blockedEvents = new ArrayList<>();
+
+ private long currentCheckpointId;
+
+ private long lastCheckpointId;
+
+ private boolean isClosed;
Review Comment:
Should we explicitly initialize this variable?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -91,27 +130,133 @@ public CompletableFuture<Acknowledge>
sendEvent(OperatorEvent evt) {
new
FlinkException(msg, failure)));
}
},
- sendingExecutor);
+ mainThreadExecutor);
- sendingExecutor.execute(
+ mainThreadExecutor.execute(
() -> {
- sender.sendEvent(sendAction, sendResult);
+ sendEventInternal(sendAction, sendResult);
incompleteFuturesTracker.trackFutureWhileIncomplete(result);
});
+
return result;
}
- @Override
- public ExecutionAttemptID getExecution() {
- return subtaskAccess.currentAttempt();
+ private void sendEventInternal(
+ Callable<CompletableFuture<Acknowledge>> sendAction,
+ CompletableFuture<Acknowledge> result) {
+ checkRunsInMainThread();
+
+ if (isClosed) {
+ blockedEvents.add(new BlockedEvent(sendAction, result));
+ } else {
+ callSendAction(sendAction, result);
+ }
}
- @Override
- public int getSubtask() {
- return subtaskAccess.getSubtaskIndex();
+ private void callSendAction(
+ Callable<CompletableFuture<Acknowledge>> sendAction,
+ CompletableFuture<Acknowledge> result) {
+ try {
+ final CompletableFuture<Acknowledge> sendResult =
sendAction.call();
+ FutureUtils.forward(sendResult, result);
+ } catch (Throwable t) {
+ ExceptionUtils.rethrowIfFatalError(t);
+ result.completeExceptionally(t);
+ }
}
- private boolean isReady() {
- return subtaskAccess.hasSwitchedToRunning().isDone();
+ /**
+ * Marks the gateway for the next checkpoint. This remembers the
checkpoint ID and will only
+ * allow closing the gateway for this specific checkpoint.
+ *
+ * <p>This is the gateway's mechanism to detect situations where multiple
coordinator
+ * checkpoints would be attempted overlapping, which is currently not
supported (the gateway
+ * doesn't keep a list of events blocked per checkpoint). It also helps to
identify situations
+ * where the checkpoint was aborted even before the gateway was closed (by
finding out that the
+ * {@code currentCheckpointId} was already reset to {@code NO_CHECKPOINT}.
+ */
+ void markForCheckpoint(long checkpointId) {
+ checkRunsInMainThread();
+
+ if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId !=
checkpointId) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot mark for checkpoint %d, already marked for
checkpoint %d",
+ checkpointId, currentCheckpointId));
+ }
+
+ if (checkpointId > lastCheckpointId) {
+ currentCheckpointId = checkpointId;
+ lastCheckpointId = checkpointId;
+ } else {
+ throw new IllegalStateException(
+ String.format(
+ "Regressing checkpoint IDs. Previous checkpointId
= %d, new checkpointId = %d",
+ lastCheckpointId, checkpointId));
+ }
+ }
+
+ /**
+ * Closes the gateway. All events sent through this gateway are blocked
until the gateway is
+ * re-opened. If the gateway is already closed, this does nothing.
+ *
+ * @return True if the gateway is closed, false if the checkpointId is
incorrect.
+ */
+ boolean tryCloseGateway(long checkpointId) {
+ checkRunsInMainThread();
+
+ if (checkpointId == currentCheckpointId) {
+ isClosed = true;
+ return true;
+ }
+ return false;
+ }
+
+ boolean isClosed() {
+ checkRunsInMainThread();
+ return isClosed;
+ }
+
+ void openGatewayAndUnmarkCheckpoint(long expectedCheckpointId) {
+ checkRunsInMainThread();
+
+ if (expectedCheckpointId != currentCheckpointId) {
Review Comment:
Prior to this PR, `OperatorEventVale::openValveAndUnmarkCheckpoint(long
expectedCheckpointId)` throws IllegalStateException in this case.
Could it be useful to still throw IllegalStateException if
`currentCheckpointId != expectedCheckpointId && currentCheckpointId !=
NO_CHECKPOINT`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java:
##########
@@ -18,43 +18,82 @@
package org.apache.flink.runtime.operators.coordination;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.Acknowledge;
import
org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
import org.apache.flink.runtime.util.Runnables;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.concurrent.FutureUtils;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
/**
* Implementation of the {@link OperatorCoordinator.SubtaskGateway} interface
that access to
* subtasks for status and event sending via {@link SubtaskAccess}.
+ *
+ * <p>Instances of this class can be temporarily closed, blocking events from
going through,
+ * buffering them, and releasing them later. It is used for "alignment" of
operator event streams
+ * with checkpoint barrier injection, similar to how the input channels are
aligned during a common
+ * checkpoint.
+ *
+ * <p>The methods on the critical communication path, including
closing/reopening the gateway and
+ * sending the operator events, are required to be used in a single-threaded
context specified by
+ * the {@link #mainThreadExecutor} in {@link #SubtaskGatewayImpl(SubtaskAccess,
+ * ComponentMainThreadExecutor, IncompleteFuturesTracker)} in order to avoid
concurrent issues.
*/
class SubtaskGatewayImpl implements OperatorCoordinator.SubtaskGateway {
private static final String EVENT_LOSS_ERROR_MESSAGE =
"An OperatorEvent from an OperatorCoordinator to a task was lost. "
+ "Triggering task failover to ensure consistency. Event:
'%s', targetTask: %s";
+ private static final long NO_CHECKPOINT = Long.MIN_VALUE;
+
private final SubtaskAccess subtaskAccess;
- private final EventSender sender;
- private final Executor sendingExecutor;
+
+ private final ComponentMainThreadExecutor mainThreadExecutor;
+
private final IncompleteFuturesTracker incompleteFuturesTracker;
+ private final List<BlockedEvent> blockedEvents = new ArrayList<>();
+
+ private long currentCheckpointId;
+
+ private long lastCheckpointId;
+
+ private boolean isClosed;
+
SubtaskGatewayImpl(
SubtaskAccess subtaskAccess,
- EventSender sender,
- Executor sendingExecutor,
+ ComponentMainThreadExecutor mainThreadExecutor,
IncompleteFuturesTracker incompleteFuturesTracker) {
this.subtaskAccess = subtaskAccess;
- this.sender = sender;
- this.sendingExecutor = sendingExecutor;
+ this.mainThreadExecutor = mainThreadExecutor;
this.incompleteFuturesTracker = incompleteFuturesTracker;
+ this.currentCheckpointId = NO_CHECKPOINT;
+ this.lastCheckpointId = Long.MIN_VALUE;
+ }
+
+ @Override
+ public ExecutionAttemptID getExecution() {
Review Comment:
nits: it is in general preferred not to move code around unless there is
clear benefits (e.g. a code style pattern that we can consistently enforce in
the future).
Do we need to move these 3 methods here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]