kezhuw commented on a change in pull request #15601:
URL: https://github.com/apache/flink/pull/15601#discussion_r612942890
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -282,34 +307,55 @@ private void checkpointCoordinatorInternal(
@Override
public void afterSourceBarrierInjection(long checkpointId) {
- // this method is commonly called by the CheckpointCoordinator's
executor thread (timer
- // thread).
-
- // we ideally want the scheduler main-thread to be the one that sends
the blocked events
- // however, we need to react synchronously here, to maintain
consistency and not allow
- // another checkpoint injection in-between (unlikely, but possible).
- // fortunately, the event-sending goes pretty much directly to the RPC
gateways, which are
- // thread safe.
-
- // this will automatically be fixed once the checkpoint coordinator
runs in the
+ // unfortunately, this method does not run in the scheduler executor,
but in the
+ // checkpoint coordinator time thread.
+ // we can remove the delegation once the checkpoint coordinator runs
fully in the
// scheduler's main thread executor
- eventValve.openValveAndUnmarkCheckpoint();
+ mainThreadExecutor.execute(() ->
eventValve.openValveAndUnmarkCheckpoint(checkpointId));
}
@Override
public void abortCurrentTriggering() {
- // this method is commonly called by the CheckpointCoordinator's
executor thread (timer
- // thread).
+ // unfortunately, this method does not run in the scheduler executor,
but in the
+ // checkpoint coordinator time thread.
+ // we can remove the delegation once the checkpoint coordinator runs
fully in the
+ // scheduler's main thread executor
+ mainThreadExecutor.execute(eventValve::openValveAndUnmarkCheckpoint);
+ }
- // we ideally want the scheduler main-thread to be the one that sends
the blocked events
- // however, we need to react synchronously here, to maintain
consistency and not allow
- // another checkpoint injection in-between (unlikely, but possible).
- // fortunately, the event-sending goes pretty much directly to the RPC
gateways, which are
- // thread safe.
+ // ------------------------------------------------------------------------
+ // miscellaneous helpers
+ // ------------------------------------------------------------------------
- // this will automatically be fixed once the checkpoint coordinator
runs in the
- // scheduler's main thread executor
- eventValve.openValveAndUnmarkCheckpoint();
+ private void setupAllSubtaskGateways() {
+ for (int i = 0; i < operatorParallelism; i++) {
+ setupSubtaskGateway(i);
+ }
+ }
+
+ private void setupSubtaskGateway(int subtask) {
+ // this gets an access to the latest task execution attempt.
+ final SubtaskAccess sta = taskAccesses.getAccessForSubtask(subtask);
+
+ final OperatorCoordinator.SubtaskGateway gateway =
+ new SubtaskGatewayImpl(sta, eventValve, mainThreadExecutor);
+
+ // We need to do this synchronously here, otherwise we violate the
contract that
+ // 'subtaskFailed()' will never overtake 'subtaskReady()'.
+ // An alternative, if we ever figure out that this cannot work
synchronously here,
+ // is that we re-enqueue all actions (like 'subtaskFailed()' and
'subtaskRestored()')
+ // back into the main thread executor, rather than directly calling
the OperatorCoordinator
+ sta.hasSwitchedToRunning()
+ .thenAccept(
+ (ignored) -> {
+ mainThreadExecutor.assertRunningInMainThread();
+
+ // this is a guard in case someone accidentally
makes the
+ // notification asynchronous
+ assert sta.isStillRunning();
+
+ coordinator.subtaskReady(subtask, gateway);
Review comment:
I suspect whether this is broken by FLINK-17012. I think we should
guarantee following order for coordinator authors:
* `subtaskReady`.
* `handleEventFromOperator`.
But after FLINK-17012, `StreamOperator.open` is called in
`RECOVERING`(INITIALIZING in master now). At least, I saw
`ReaderRegistrationEvent` was sending there. This means that
`handleEventFromOperator` could be called before `subtaskReady`. I don't think
this is what we want. I will verify this later if in doubt.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
##########
@@ -285,41 +348,130 @@ public void handleEventFromOperator(int subtask,
OperatorEvent event) throws Exc
String.format("Don't recognize event '%s' from task
%d.", event, subtask));
}
- if (periodicTask != null) {
- throw new Exception("periodic already running");
+ synchronized (recoveredTaskRunning) {
+ // signal the previous task that its recovered task is now
running
+ final CompletableFuture<?> prevTaskFuture =
recoveredTaskRunning.peekLast();
+ if (prevTaskFuture != null) {
+ prevTaskFuture.complete(null);
+ }
+ // add a future for this task
+ recoveredTaskRunning.addLast(new CompletableFuture<>());
}
- periodicTask =
- executor.scheduleWithFixedDelay(this, delay, delay,
TimeUnit.MILLISECONDS);
+
+ // first, we hand this over to the mailbox thread, so we preserve
order on operations,
+ // even if the action is only to do a thread safe scheduling into
the scheduledExecutor
+ runInMailbox(
+ () -> {
+ checkState(!workLoopRunning);
+ checkState(subtaskGateway != null);
+
+ workLoopRunning = true;
+ scheduleSingleAction();
+ });
}
@Override
public void subtaskFailed(int subtask, @Nullable Throwable reason) {
Review comment:
Actually, there is no true subtask failure in this test case. This
`subtaskFailed` is caused by global failure. I think we should draw some
unrecognized events to fail subtask.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -237,38 +244,56 @@ public void notifyCheckpointAborted(long checkpointId) {
@Override
public void resetToCheckpoint(long checkpointId, @Nullable byte[]
checkpointData)
throws Exception {
- // ideally we would like to check this here, however this method is
called early during
- // execution graph construction, before the main thread executor is set
+ // the first time this method is called is early during execution
graph construction,
+ // before the main thread executor is set. hence this conditional
check.
+ if (mainThreadExecutor != null) {
+ mainThreadExecutor.assertRunningInMainThread();
+ }
+
+ eventValve.openValveAndUnmarkCheckpoint();
- eventValve.reset();
if (context != null) {
Review comment:
I saw no possibility that `context` could be `null`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinator.java
##########
@@ -226,6 +219,26 @@ default void notifyCheckpointAborted(long checkpointId) {}
// ------------------------------------------------------------------------
+ interface SubtaskGateway {
+
+ /**
+ * Sends an event to the parallel subtask with the given subtask index.
+ *
+ * <p>The returned future is completed successfully once the event has
been received by the
+ * target TaskManager. The future is completed exceptionally if the
event cannot be sent.
+ * That includes situations where the target task is not running.
+ */
+ CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt);
+
+ ExecutionAttemptID getExecution();
+
+ int getSubtask();
+
+ boolean isReady();
Review comment:
I think we don't need `isReady` after `subtaskReady`. Currently
implementation of `isReady` should always true after `subtaskReady`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
##########
@@ -282,34 +307,55 @@ private void checkpointCoordinatorInternal(
@Override
public void afterSourceBarrierInjection(long checkpointId) {
- // this method is commonly called by the CheckpointCoordinator's
executor thread (timer
- // thread).
-
- // we ideally want the scheduler main-thread to be the one that sends
the blocked events
- // however, we need to react synchronously here, to maintain
consistency and not allow
- // another checkpoint injection in-between (unlikely, but possible).
- // fortunately, the event-sending goes pretty much directly to the RPC
gateways, which are
- // thread safe.
-
- // this will automatically be fixed once the checkpoint coordinator
runs in the
+ // unfortunately, this method does not run in the scheduler executor,
but in the
+ // checkpoint coordinator time thread.
+ // we can remove the delegation once the checkpoint coordinator runs
fully in the
// scheduler's main thread executor
- eventValve.openValveAndUnmarkCheckpoint();
+ mainThreadExecutor.execute(() ->
eventValve.openValveAndUnmarkCheckpoint(checkpointId));
}
@Override
public void abortCurrentTriggering() {
- // this method is commonly called by the CheckpointCoordinator's
executor thread (timer
- // thread).
+ // unfortunately, this method does not run in the scheduler executor,
but in the
+ // checkpoint coordinator time thread.
+ // we can remove the delegation once the checkpoint coordinator runs
fully in the
+ // scheduler's main thread executor
+ mainThreadExecutor.execute(eventValve::openValveAndUnmarkCheckpoint);
+ }
- // we ideally want the scheduler main-thread to be the one that sends
the blocked events
- // however, we need to react synchronously here, to maintain
consistency and not allow
- // another checkpoint injection in-between (unlikely, but possible).
- // fortunately, the event-sending goes pretty much directly to the RPC
gateways, which are
- // thread safe.
+ // ------------------------------------------------------------------------
+ // miscellaneous helpers
+ // ------------------------------------------------------------------------
- // this will automatically be fixed once the checkpoint coordinator
runs in the
- // scheduler's main thread executor
- eventValve.openValveAndUnmarkCheckpoint();
+ private void setupAllSubtaskGateways() {
+ for (int i = 0; i < operatorParallelism; i++) {
+ setupSubtaskGateway(i);
+ }
+ }
+
+ private void setupSubtaskGateway(int subtask) {
+ // this gets an access to the latest task execution attempt.
+ final SubtaskAccess sta = taskAccesses.getAccessForSubtask(subtask);
+
+ final OperatorCoordinator.SubtaskGateway gateway =
+ new SubtaskGatewayImpl(sta, eventValve, mainThreadExecutor);
+
+ // We need to do this synchronously here, otherwise we violate the
contract that
+ // 'subtaskFailed()' will never overtake 'subtaskReady()'.
+ // An alternative, if we ever figure out that this cannot work
synchronously here,
+ // is that we re-enqueue all actions (like 'subtaskFailed()' and
'subtaskRestored()')
+ // back into the main thread executor, rather than directly calling
the OperatorCoordinator
+ sta.hasSwitchedToRunning()
+ .thenAccept(
+ (ignored) -> {
+ mainThreadExecutor.assertRunningInMainThread();
+
+ // this is a guard in case someone accidentally
makes the
+ // notification asynchronous
+ assert sta.isStillRunning();
+
+ coordinator.subtaskReady(subtask, gateway);
Review comment:
I confirmed this by moving `StartEvent` sending to
`EventCollectingTask.restore` in `CoordinatorEventsExactlyOnceITCase`. It
should behaves same as sending event from `StreamOperator.open`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]