[FLINK-4689] [cluster management] Implement a simple slot provider for the new 
job manager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e91b82d3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e91b82d3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e91b82d3

Branch: refs/heads/flip-6
Commit: e91b82d3c868e18611064f905b345906f1414f84
Parents: 655722a
Author: Kurt Young <ykt...@gmail.com>
Authored: Sun Oct 16 22:20:38 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 16 22:14:41 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/instance/SlotPool.java |  1 -
 .../jobmanager/slots/PooledSlotProvider.java    | 73 ++++++++++++++++++++
 .../flink/runtime/jobmaster/JobMaster.java      | 24 ++++---
 3 files changed, 89 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e91b82d3/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index e7857c1..de952c3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -135,7 +135,6 @@ public class SlotPool implements SlotOwner {
 
                internalAllocateSlot(jobID, allocationID, resourceProfile, 
future);
 
-               final SlotOwner owner = this;
                return future.thenApplyAsync(
                        new ApplyFunction<SlotDescriptor, SimpleSlot>() {
                                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e91b82d3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
new file mode 100644
index 0000000..5655fc2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotPool;
+import org.apache.flink.runtime.instance.SlotProvider;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A simple pool based slot provider with {@link SlotPool} as the underlying 
storage.
+ */
+public class PooledSlotProvider implements SlotProvider {
+
+       /** The pool which holds all the slots. */
+       private final SlotPool slotPool;
+
+       /** The timeout for allocation. */
+       private final Time timeout;
+
+       public PooledSlotProvider(final SlotPool slotPool, final Time timeout) {
+               this.slotPool = slotPool;
+               this.timeout = timeout;
+       }
+
+       @Override
+       public Future<SimpleSlot> allocateSlot(ScheduledUnit task,
+                       boolean allowQueued) throws NoResourceAvailableException
+       {
+               checkNotNull(task);
+
+               final JobID jobID = 
task.getTaskToExecute().getVertex().getJobId();
+               final Future<SimpleSlot> future = 
slotPool.allocateSimpleSlot(jobID, ResourceProfile.UNKNOWN);
+               try {
+                       final SimpleSlot slot = future.get(timeout.getSize(), 
timeout.getUnit());
+                       return FlinkCompletableFuture.completed(slot);
+               } catch (InterruptedException e) {
+                       throw new NoResourceAvailableException("Could not 
allocate a slot because it's interrupted.");
+               } catch (ExecutionException e) {
+                       throw new NoResourceAvailableException("Could not 
allocate a slot because some error occurred " +
+                                       "during allocation, " + e.getMessage());
+               } catch (TimeoutException e) {
+                       throw new NoResourceAvailableException("Could not 
allocate a slot within time limit: " + timeout);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e91b82d3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index a7be476..05c20d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -49,6 +49,7 @@ import 
org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.instance.SlotPool;
 import org.apache.flink.runtime.io.network.PartitionState;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -56,7 +57,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
+import org.apache.flink.runtime.jobmanager.slots.PooledSlotProvider;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -84,7 +85,6 @@ import 
org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedValue;
-
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
@@ -93,6 +93,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -145,6 +146,9 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        /** The execution graph of this job */
        private final ExecutionGraph executionGraph;
 
+       private final SlotPool slotPool;
+
+       private final Time allocationTimeout;
 
        private volatile UUID leaderSessionID;
 
@@ -156,8 +160,6 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        /** Connection with ResourceManager, null if not located address yet or 
we close it initiative */
        private ResourceManagerConnection resourceManagerConnection;
 
-       // TODO - we need to replace this with the slot pool
-       private final Scheduler scheduler;
 
        // 
------------------------------------------------------------------------
 
@@ -239,8 +241,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                -1,
                                log);
 
-               // TODO - temp fix
-               this.scheduler = new Scheduler(executorService);
+               this.slotPool = new SlotPool(executorService);
+               this.allocationTimeout = Time.of(5, TimeUnit.SECONDS);
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -262,6 +264,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                if (LEADER_ID_UPDATER.compareAndSet(this, null, 
leaderSessionID)) {
                        super.start();
 
+                       slotPool.setJobManagerLeaderId(leaderSessionID);
                        log.info("Starting JobManager for job {} ({})", 
jobGraph.getName(), jobGraph.getJobID());
                        getSelf().startJobExecution();
                } else {
@@ -337,7 +340,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        @Override
                        public void run() {
                                try {
-                                       
executionGraph.scheduleForExecution(scheduler);
+                                       executionGraph.scheduleForExecution(new 
PooledSlotProvider(slotPool, allocationTimeout));
                                } catch (Throwable t) {
                                        executionGraph.fail(t);
                                }
@@ -365,6 +368,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                ((StartStoppable) getSelf()).stop();
 
                leaderSessionID = null;
+               slotPool.setJobManagerLeaderId(null);
                executionGraph.suspend(cause);
 
                // disconnect from resource manager:
@@ -783,9 +787,12 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                // TODO - add tests for comment in 
https://github.com/apache/flink/pull/2565
                                // verify the response with current connection
                                if (resourceManagerConnection != null
-                                               && 
resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
 {
+                                               && 
resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId()))
+                               {
                                        log.info("JobManager successfully 
registered at ResourceManager, leader id: {}.",
                                                        
success.getResourceManagerLeaderId());
+                                       
slotPool.setResourceManager(success.getResourceManagerLeaderId(),
+                                                       
resourceManagerConnection.getTargetGateway());
                                }
                        }
                });
@@ -796,6 +803,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        resourceManagerConnection.close();
                        resourceManagerConnection = null;
                }
+               slotPool.disconnectResourceManager();
        }
 
        
//----------------------------------------------------------------------------------------------

Reply via email to