[FLINK-4689] [cluster management] Implement a simple slot provider for the new job manager
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e91b82d3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e91b82d3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e91b82d3 Branch: refs/heads/flip-6 Commit: e91b82d3c868e18611064f905b345906f1414f84 Parents: 655722a Author: Kurt Young <ykt...@gmail.com> Authored: Sun Oct 16 22:20:38 2016 +0800 Committer: Stephan Ewen <se...@apache.org> Committed: Sun Oct 16 22:14:41 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/runtime/instance/SlotPool.java | 1 - .../jobmanager/slots/PooledSlotProvider.java | 73 ++++++++++++++++++++ .../flink/runtime/jobmaster/JobMaster.java | 24 ++++--- 3 files changed, 89 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e91b82d3/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index e7857c1..de952c3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -135,7 +135,6 @@ public class SlotPool implements SlotOwner { internalAllocateSlot(jobID, allocationID, resourceProfile, future); - final SlotOwner owner = this; return future.thenApplyAsync( new ApplyFunction<SlotDescriptor, SimpleSlot>() { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/e91b82d3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java new file mode 100644 index 0000000..5655fc2 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.slots; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.SlotPool; +import org.apache.flink.runtime.instance.SlotProvider; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A simple pool based slot provider with {@link SlotPool} as the underlying storage. + */ +public class PooledSlotProvider implements SlotProvider { + + /** The pool which holds all the slots. */ + private final SlotPool slotPool; + + /** The timeout for allocation. */ + private final Time timeout; + + public PooledSlotProvider(final SlotPool slotPool, final Time timeout) { + this.slotPool = slotPool; + this.timeout = timeout; + } + + @Override + public Future<SimpleSlot> allocateSlot(ScheduledUnit task, + boolean allowQueued) throws NoResourceAvailableException + { + checkNotNull(task); + + final JobID jobID = task.getTaskToExecute().getVertex().getJobId(); + final Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, ResourceProfile.UNKNOWN); + try { + final SimpleSlot slot = future.get(timeout.getSize(), timeout.getUnit()); + return FlinkCompletableFuture.completed(slot); + } catch (InterruptedException e) { + throw new NoResourceAvailableException("Could not allocate a slot because it's interrupted."); + } catch (ExecutionException e) { + throw new NoResourceAvailableException("Could not allocate a slot because some error occurred " + + "during allocation, " + e.getMessage()); + } catch (TimeoutException e) { + throw new NoResourceAvailableException("Could not allocate a slot within time limit: " + timeout); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e91b82d3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index a7be476..05c20d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -49,6 +49,7 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.instance.Slot; +import org.apache.flink.runtime.instance.SlotPool; import org.apache.flink.runtime.io.network.PartitionState; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -56,7 +57,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.OnCompletionActions; -import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; +import org.apache.flink.runtime.jobmanager.slots.PooledSlotProvider; import org.apache.flink.runtime.jobmaster.message.ClassloadingProps; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -84,7 +85,6 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.util.SerializedThrowable; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.SerializedValue; - import org.slf4j.Logger; import javax.annotation.Nullable; @@ -93,6 +93,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -145,6 +146,9 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { /** The execution graph of this job */ private final ExecutionGraph executionGraph; + private final SlotPool slotPool; + + private final Time allocationTimeout; private volatile UUID leaderSessionID; @@ -156,8 +160,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { /** Connection with ResourceManager, null if not located address yet or we close it initiative */ private ResourceManagerConnection resourceManagerConnection; - // TODO - we need to replace this with the slot pool - private final Scheduler scheduler; // ------------------------------------------------------------------------ @@ -239,8 +241,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { -1, log); - // TODO - temp fix - this.scheduler = new Scheduler(executorService); + this.slotPool = new SlotPool(executorService); + this.allocationTimeout = Time.of(5, TimeUnit.SECONDS); } //---------------------------------------------------------------------------------------------- @@ -262,6 +264,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) { super.start(); + slotPool.setJobManagerLeaderId(leaderSessionID); log.info("Starting JobManager for job {} ({})", jobGraph.getName(), jobGraph.getJobID()); getSelf().startJobExecution(); } else { @@ -337,7 +340,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { @Override public void run() { try { - executionGraph.scheduleForExecution(scheduler); + executionGraph.scheduleForExecution(new PooledSlotProvider(slotPool, allocationTimeout)); } catch (Throwable t) { executionGraph.fail(t); } @@ -365,6 +368,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { ((StartStoppable) getSelf()).stop(); leaderSessionID = null; + slotPool.setJobManagerLeaderId(null); executionGraph.suspend(cause); // disconnect from resource manager: @@ -783,9 +787,12 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { // TODO - add tests for comment in https://github.com/apache/flink/pull/2565 // verify the response with current connection if (resourceManagerConnection != null - && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) { + && resourceManagerConnection.getTargetLeaderId().equals(success.getResourceManagerLeaderId())) + { log.info("JobManager successfully registered at ResourceManager, leader id: {}.", success.getResourceManagerLeaderId()); + slotPool.setResourceManager(success.getResourceManagerLeaderId(), + resourceManagerConnection.getTargetGateway()); } } }); @@ -796,6 +803,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { resourceManagerConnection.close(); resourceManagerConnection = null; } + slotPool.disconnectResourceManager(); } //----------------------------------------------------------------------------------------------