This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 07fc4477909b6ec2374cc38f8aa445ebfe6b3b4d Author: Stephan Ewen <[email protected]> AuthorDate: Sun Apr 11 18:55:14 2021 +0200 [FLINK-18071][coordination] (part 2) OperatorCoordinatorHolder does not implement OperatorCoordinator interface any more Originally it was designed that the OperatorCoordinatorHolder has the same interface as the OperatorCoordinator and simply adds some hooks around the checkpoint triggering procedure. However, the OperatorCoordinatorHolder is becoming the glue between the scheduler threads and the scheduler's view on tasks and their status, and the OperatorCoordinator threads and their simplified view on the execution state. This means they do require different interfaces. --- .../runtime/operators/coordination/OperatorCoordinatorHolder.java | 5 +---- .../java/org/apache/flink/runtime/scheduler/DefaultScheduler.java | 5 +++-- .../main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java | 2 +- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index 21974c3..4901f6e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -114,7 +114,7 @@ import static org.apache.flink.util.Preconditions.checkState; * an event) are also enqueued back into the scheduler main-thread executor, strictly in order. */ public class OperatorCoordinatorHolder - implements OperatorCoordinator, OperatorCoordinatorCheckpointContext { + implements OperatorCoordinatorCheckpointContext, AutoCloseable { private final OperatorCoordinator coordinator; private final OperatorID operatorId; @@ -179,7 +179,6 @@ public class OperatorCoordinatorHolder // OperatorCoordinator Interface // ------------------------------------------------------------------------ - @Override public void start() throws Exception { mainThreadExecutor.assertRunningInMainThread(); checkState(context.isInitialized(), "Coordinator Context is not yet initialized"); @@ -192,13 +191,11 @@ public class OperatorCoordinatorHolder context.unInitialize(); } - @Override public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exception { mainThreadExecutor.assertRunningInMainThread(); coordinator.handleEventFromOperator(subtask, event); } - @Override public void subtaskFailed(int subtask, @Nullable Throwable reason) { mainThreadExecutor.assertRunningInMainThread(); coordinator.subtaskFailed(subtask, reason); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index 2618d27..33e8636 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -43,7 +43,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; -import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy; import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory; @@ -550,7 +550,8 @@ public class DefaultScheduler extends SchedulerBase implements SchedulerOperatio return; } - for (OperatorCoordinator coordinator : vertex.getJobVertex().getOperatorCoordinators()) { + for (OperatorCoordinatorHolder coordinator : + vertex.getJobVertex().getOperatorCoordinators()) { coordinator.subtaskFailed(vertex.getParallelSubtaskIndex(), null); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 50d3350..0ab5672 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -451,7 +451,7 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling private void notifyCoordinatorsOfEmptyGlobalRestore() throws Exception { for (final ExecutionJobVertex ejv : getExecutionGraph().getAllVertices().values()) { - for (final OperatorCoordinator coordinator : ejv.getOperatorCoordinators()) { + for (final OperatorCoordinatorHolder coordinator : ejv.getOperatorCoordinators()) { coordinator.resetToCheckpoint(OperatorCoordinator.NO_CHECKPOINT, null); } }
