This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 07fc4477909b6ec2374cc38f8aa445ebfe6b3b4d
Author: Stephan Ewen <[email protected]>
AuthorDate: Sun Apr 11 18:55:14 2021 +0200

    [FLINK-18071][coordination] (part 2) OperatorCoordinatorHolder does not 
implement OperatorCoordinator interface any more
    
    Originally it was designed that the OperatorCoordinatorHolder has the same 
interface as the OperatorCoordinator and simply
    adds some hooks around the checkpoint triggering procedure.
    
    However, the OperatorCoordinatorHolder is becoming the glue between the 
scheduler threads and the scheduler's view on
    tasks and their status, and the OperatorCoordinator threads and their 
simplified view on the execution state. This
    means they do require different interfaces.
---
 .../runtime/operators/coordination/OperatorCoordinatorHolder.java    | 5 +----
 .../java/org/apache/flink/runtime/scheduler/DefaultScheduler.java    | 5 +++--
 .../main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java  | 2 +-
 3 files changed, 5 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 21974c3..4901f6e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -114,7 +114,7 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  * an event) are also enqueued back into the scheduler main-thread executor, 
strictly in order.
  */
 public class OperatorCoordinatorHolder
-        implements OperatorCoordinator, OperatorCoordinatorCheckpointContext {
+        implements OperatorCoordinatorCheckpointContext, AutoCloseable {
 
     private final OperatorCoordinator coordinator;
     private final OperatorID operatorId;
@@ -179,7 +179,6 @@ public class OperatorCoordinatorHolder
     //  OperatorCoordinator Interface
     // ------------------------------------------------------------------------
 
-    @Override
     public void start() throws Exception {
         mainThreadExecutor.assertRunningInMainThread();
         checkState(context.isInitialized(), "Coordinator Context is not yet 
initialized");
@@ -192,13 +191,11 @@ public class OperatorCoordinatorHolder
         context.unInitialize();
     }
 
-    @Override
     public void handleEventFromOperator(int subtask, OperatorEvent event) 
throws Exception {
         mainThreadExecutor.assertRunningInMainThread();
         coordinator.handleEventFromOperator(subtask, event);
     }
 
-    @Override
     public void subtaskFailed(int subtask, @Nullable Throwable reason) {
         mainThreadExecutor.assertRunningInMainThread();
         coordinator.subtaskFailed(subtask, reason);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 2618d27..33e8636 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -43,7 +43,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
-import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
 import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
@@ -550,7 +550,8 @@ public class DefaultScheduler extends SchedulerBase 
implements SchedulerOperatio
             return;
         }
 
-        for (OperatorCoordinator coordinator : 
vertex.getJobVertex().getOperatorCoordinators()) {
+        for (OperatorCoordinatorHolder coordinator :
+                vertex.getJobVertex().getOperatorCoordinators()) {
             coordinator.subtaskFailed(vertex.getParallelSubtaskIndex(), null);
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 50d3350..0ab5672 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -451,7 +451,7 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
 
     private void notifyCoordinatorsOfEmptyGlobalRestore() throws Exception {
         for (final ExecutionJobVertex ejv : 
getExecutionGraph().getAllVertices().values()) {
-            for (final OperatorCoordinator coordinator : 
ejv.getOperatorCoordinators()) {
+            for (final OperatorCoordinatorHolder coordinator : 
ejv.getOperatorCoordinators()) {
                 
coordinator.resetToCheckpoint(OperatorCoordinator.NO_CHECKPOINT, null);
             }
         }

Reply via email to