Repository: flink
Updated Branches:
  refs/heads/release-1.2 90173b724 -> c22efce09


[FLINK-5938] Replace ExecutionContext by Executor in Scheduler

In order to remove the Scheduler's dependency on Scala's ExecutionContext and
Akka's futures, this PR replaces the ExecutionContext by an Executor which is
used to execute the concurrent handleNewSlot call.

This closes #3439.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53b45428
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53b45428
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53b45428

Branch: refs/heads/release-1.2
Commit: 53b454285c810b83232a1c97d945f16ab3634c68
Parents: 90173b7
Author: Till Rohrmann <[email protected]>
Authored: Tue Feb 28 17:27:03 2017 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Thu Mar 2 12:27:15 2017 +0100

----------------------------------------------------------------------
 .../runtime/jobmanager/scheduler/Scheduler.java | 23 +++++++++-----------
 1 file changed, 10 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/53b45428/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index b839e0e..700cc8f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -30,11 +30,9 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import akka.dispatch.Futures;
-
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
@@ -54,9 +52,9 @@ import org.apache.flink.runtime.instance.InstanceListener;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.ExecutionContext;
 
 /**
  * The scheduler is responsible for distributing the ready-to-run tasks among 
instances and slots.
@@ -102,16 +100,16 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
        /** The number of slot allocations where locality could not be 
respected */
        private int nonLocalizedAssignments;
 
-       /** The ExecutionContext which is used to execute newSlotAvailable 
futures. */
-       private final ExecutionContext executionContext;
+       /** The Executor which is used to execute newSlotAvailable futures. */
+       private final Executor executor;
 
        // 
------------------------------------------------------------------------
 
        /**
         * Creates a new scheduler.
         */
-       public Scheduler(ExecutionContext executionContext) {
-               this.executionContext = executionContext;
+       public Scheduler(Executor executor) {
+               this.executor = Preconditions.checkNotNull(executor);
        }
        
        /**
@@ -527,15 +525,14 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
                // 
                // that leads with a high probability to deadlocks, when 
scheduling fast
 
-               this.newlyAvailableInstances.add(instance);
+               newlyAvailableInstances.add(instance);
 
-               Futures.future(new Callable<Object>() {
+               executor.execute(new Runnable() {
                        @Override
-                       public Object call() throws Exception {
+                       public void run() {
                                handleNewSlot();
-                               return null;
                        }
-               }, executionContext);
+               });
        }
        
        private void handleNewSlot() {

Reply via email to