[FLINK-4606] integrate features of old ResourceManager

This closes #2540


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f198d8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f198d8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f198d8c

Branch: refs/heads/flip-6
Commit: 1f198d8ca56ec6719d112cdc7180aeef6d18477a
Parents: 6e58ebf
Author: Maximilian Michels <m...@apache.org>
Authored: Tue Sep 27 10:38:02 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:42 2016 +0200

----------------------------------------------------------------------
 .../InfoMessageListenerRpcGateway.java          |   1 -
 .../resourcemanager/ResourceManager.java        | 146 ++++++++++++-------
 .../resourcemanager/ResourceManagerGateway.java |   6 +-
 .../ResourceManagerServices.java                |  44 ++++++
 .../StandaloneResourceManager.java              |  19 ++-
 .../TaskExecutorRegistration.java               |  51 -------
 .../registration/TaskExecutorRegistration.java  |  51 +++++++
 .../slotmanager/SimpleSlotManager.java          |   6 -
 .../slotmanager/SlotManager.java                |  63 ++++++--
 .../slotmanager/SlotManagerTest.java            |  25 +++-
 .../slotmanager/SlotProtocolTest.java           |  42 +++---
 11 files changed, 295 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
index c1eeefa..d1373ec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
-import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.rpc.RpcGateway;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 83dc4db..190a4de 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -20,14 +20,18 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
@@ -48,11 +52,10 @@ import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -64,36 +67,43 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * It offers the following methods as part of its rpc interface to interact 
with the him remotely:
  * <ul>
  *     <li>{@link #registerJobMaster(UUID, UUID, String, JobID)} registers a 
{@link JobMaster} at the resource manager</li>
- *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource 
manager</li>
+ *     <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from 
the resource manager</li>
  * </ul>
  */
-public abstract class ResourceManager<ResourceManagerGateway, WorkerType 
extends TaskExecutorRegistration> extends RpcEndpoint implements 
LeaderContender {
+public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
+               extends RpcEndpoint<ResourceManagerGateway>
+               implements LeaderContender {
 
        /** The exit code with which the process is stopped in case of a fatal 
error */
        protected static final int EXIT_CODE_FATAL_ERROR = -13;
 
        private final Map<JobID, JobMasterGateway> jobMasterGateways;
 
-       private final Set<LeaderRetrievalListener> 
jobMasterLeaderRetrievalListeners;
+       private final Map<JobID, JobMasterLeaderListener> 
jobMasterLeaderRetrievalListeners;
 
        private final Map<ResourceID, WorkerType> taskExecutorGateways;
 
        private final HighAvailabilityServices highAvailabilityServices;
 
-       private LeaderElectionService leaderElectionService;
-
        private final SlotManager slotManager;
 
+       private LeaderElectionService leaderElectionService;
+
        private UUID leaderSessionID;
 
        private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners;
 
-       public ResourceManager(RpcService rpcService, HighAvailabilityServices 
highAvailabilityServices, SlotManager slotManager) {
+       private final Time timeout = Time.seconds(5);
+
+       public ResourceManager(
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityServices,
+                       SlotManager slotManager) {
                super(rpcService);
                this.highAvailabilityServices = 
checkNotNull(highAvailabilityServices);
                this.jobMasterGateways = new HashMap<>();
                this.slotManager = checkNotNull(slotManager);
-               this.jobMasterLeaderRetrievalListeners = new HashSet<>();
+               this.jobMasterLeaderRetrievalListeners = new HashMap<>();
                this.taskExecutorGateways = new HashMap<>();
                infoMessageListeners = new HashMap<>();
        }
@@ -105,6 +115,7 @@ public abstract class 
ResourceManager<ResourceManagerGateway, WorkerType extends
                        super.start();
                        leaderElectionService = 
highAvailabilityServices.getResourceManagerLeaderElectionService();
                        leaderElectionService.start(this);
+                       slotManager.setupResourceManagerServices(new 
DefaultResourceManagerServices());
                        // framework specific initialization
                        initialize();
                } catch (Throwable e) {
@@ -117,7 +128,7 @@ public abstract class 
ResourceManager<ResourceManagerGateway, WorkerType extends
        public void shutDown() {
                try {
                        leaderElectionService.stop();
-                       for(JobID jobID : jobMasterGateways.keySet()) {
+                       for (JobID jobID : jobMasterGateways.keySet()) {
                                
highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
                        }
                        super.shutDown();
@@ -189,15 +200,17 @@ public abstract class 
ResourceManager<ResourceManagerGateway, WorkerType extends
                                        if (throwable != null) {
                                                return new 
RegistrationResponse.Decline(throwable.getMessage());
                                        } else {
-                                               JobMasterLeaderListener 
jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
-                                               try {
-                                                       LeaderRetrievalService 
jobMasterLeaderRetriever = 
highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
-                                                       
jobMasterLeaderRetriever.start(jobMasterLeaderListener);
-                                               } catch (Exception e) {
-                                                       log.warn("Failed to 
start JobMasterLeaderRetriever for JobID {}", jobID);
-                                                       return new 
RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
+                                               if 
(!jobMasterLeaderRetrievalListeners.containsKey(jobID)) {
+                                                       JobMasterLeaderListener 
jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
+                                                       try {
+                                                               
LeaderRetrievalService jobMasterLeaderRetriever = 
highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
+                                                               
jobMasterLeaderRetriever.start(jobMasterLeaderListener);
+                                                       } catch (Exception e) {
+                                                               
log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
+                                                               return new 
RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
+                                                       }
+                                                       
jobMasterLeaderRetrievalListeners.put(jobID, jobMasterLeaderListener);
                                                }
-                                               
jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
                                                final JobMasterGateway 
existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
                                                if (existingGateway != null) {
                                                        log.info("Replacing 
gateway for registered JobID {}.", jobID);
@@ -232,7 +245,6 @@ public abstract class 
ResourceManager<ResourceManagerGateway, WorkerType extends
                                                resourceID, 
taskExecutorAddress, leaderSessionID, resourceManagerLeaderId);
                                        throw new Exception("Invalid leader 
session id");
                                }
-
                                return 
getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class).get(5, 
TimeUnit.SECONDS);
                        }
                }).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, 
RegistrationResponse>() {
@@ -241,24 +253,14 @@ public abstract class 
ResourceManager<ResourceManagerGateway, WorkerType extends
                                if (throwable != null) {
                                        return new 
RegistrationResponse.Decline(throwable.getMessage());
                                } else {
-                                       WorkerType startedWorker = 
taskExecutorGateways.get(resourceID);
-                                       if(startedWorker != null) {
-                                               String oldWorkerAddress = 
startedWorker.getTaskExecutorGateway().getAddress();
-                                               if 
(taskExecutorAddress.equals(oldWorkerAddress)) {
-                                                       log.warn("Receive a 
duplicate registration from TaskExecutor {} at ({})", resourceID, 
taskExecutorAddress);
-                                               } else {
-                                                       log.warn("Receive a 
duplicate registration from TaskExecutor {} at different address, previous 
({}), new ({})",
-                                                               resourceID, 
oldWorkerAddress, taskExecutorAddress);
-                                                       // TODO :: suggest old 
taskExecutor to stop itself
-                                                       
slotManager.notifyTaskManagerFailure(resourceID);
-                                                       startedWorker = 
workerStarted(resourceID, taskExecutorGateway);
-                                                       
taskExecutorGateways.put(resourceID, startedWorker);
-                                               }
-                                       } else {
-                                               startedWorker = 
workerStarted(resourceID, taskExecutorGateway);
-                                               
taskExecutorGateways.put(resourceID, startedWorker);
+                                       WorkerType oldWorker = 
taskExecutorGateways.remove(resourceID);
+                                       if (oldWorker != null) {
+                                               // TODO :: suggest old 
taskExecutor to stop itself
+                                               
slotManager.notifyTaskManagerFailure(resourceID);
                                        }
-                                       return new 
TaskExecutorRegistrationSuccess(startedWorker.getInstanceID(), 5000);
+                                       WorkerType newWorker = 
workerStarted(resourceID);
+                                       taskExecutorGateways.put(resourceID, 
newWorker);
+                                       return new 
TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
                                }
                        }
                }, getMainThreadExecutor());
@@ -271,11 +273,20 @@ public abstract class 
ResourceManager<ResourceManagerGateway, WorkerType extends
         * @return Slot assignment
         */
        @RpcMethod
-       public SlotRequestReply requestSlot(SlotRequest slotRequest) {
-               final JobID jobId = slotRequest.getJobId();
-               final JobMasterGateway jobMasterGateway = 
jobMasterGateways.get(jobId);
+       public SlotRequestReply requestSlot(
+                       UUID jobMasterLeaderID,
+                       UUID resourceManagerLeaderID,
+                       SlotRequest slotRequest) {
+
+               JobID jobId = slotRequest.getJobId();
+               JobMasterGateway jobMasterGateway = 
jobMasterGateways.get(jobId);
+               JobMasterLeaderListener jobMasterLeaderListener = 
jobMasterLeaderRetrievalListeners.get(jobId);
+
+               UUID leaderID = jobMasterLeaderListener.getLeaderID();
 
-               if (jobMasterGateway != null) {
+               if (jobMasterGateway != null
+                               && jobMasterLeaderID.equals(leaderID)
+                               && 
resourceManagerLeaderID.equals(leaderSessionID)) {
                        return slotManager.requestSlot(slotRequest);
                } else {
                        log.info("Ignoring slot request for unknown JobMaster 
with JobID {}", jobId);
@@ -379,7 +390,7 @@ public abstract class 
ResourceManager<ResourceManagerGateway, WorkerType extends
        }
 
        /**
-        * Shutdowns cluster
+        * Cleanup application and shut down cluster
         *
         * @param finalStatus
         * @param optionalDiagnostics
@@ -446,17 +457,11 @@ public abstract class 
ResourceManager<ResourceManagerGateway, WorkerType extends
        protected abstract void initialize() throws Exception;
 
        /**
-        * Callback when a task executor register.
+        * Notifies the resource master of a fatal error.
         *
-        * @param resourceID The worker resource id
-        * @param taskExecutorGateway the task executor gateway
-        */
-       protected abstract WorkerType workerStarted(ResourceID resourceID, 
TaskExecutorGateway taskExecutorGateway);
-
-       /**
-        * Callback when a resource manager faced a fatal error
-        * @param message
-        * @param error
+        * <p><b>IMPORTANT:</b> This should not cleanly shut down this master, 
but exit it in
+        * such a way that a high-availability setting would restart this or 
fail over
+        * to another master.
         */
        protected abstract void fatalError(String message, Throwable error);
 
@@ -472,6 +477,19 @@ public abstract class 
ResourceManager<ResourceManagerGateway, WorkerType extends
         */
        protected abstract void shutDownApplication(ApplicationStatus 
finalStatus, String optionalDiagnostics);
 
+       /**
+        * Allocates a resource using the resource profile.
+        * @param resourceProfile The resource description
+        */
+       @VisibleForTesting
+       public abstract void startNewWorker(ResourceProfile resourceProfile);
+
+       /**
+        * Callback when a worker was started.
+        * @param resourceID The worker resource id
+        */
+       protected abstract WorkerType workerStarted(ResourceID resourceID);
+
        // 
------------------------------------------------------------------------
        //  Info messaging
        // 
------------------------------------------------------------------------
@@ -489,6 +507,24 @@ public abstract class 
ResourceManager<ResourceManagerGateway, WorkerType extends
                });
        }
 
+       private class DefaultResourceManagerServices implements 
ResourceManagerServices {
+
+               @Override
+               public void allocateResource(ResourceProfile resourceProfile) {
+                       ResourceManager.this.startNewWorker(resourceProfile);
+               }
+
+               @Override
+               public Executor getAsyncExecutor() {
+                       return 
ResourceManager.this.getRpcService().getExecutor();
+               }
+
+               @Override
+               public Executor getExecutor() {
+                       return ResourceManager.this.getMainThreadExecutor();
+               }
+       }
+
        private static class JobMasterLeaderListener implements 
LeaderRetrievalListener {
 
                private final JobID jobID;
@@ -498,6 +534,14 @@ public abstract class 
ResourceManager<ResourceManagerGateway, WorkerType extends
                        this.jobID = jobID;
                }
 
+               public JobID getJobID() {
+                       return jobID;
+               }
+
+               public UUID getLeaderID() {
+                       return leaderID;
+               }
+
                @Override
                public void notifyLeaderAddress(final String leaderAddress, 
final UUID leaderSessionID) {
                        this.leaderID = leaderSessionID;

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 7c44006..87303a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -59,7 +59,11 @@ public interface ResourceManagerGateway extends RpcGateway {
         * @param slotRequest Slot request
         * @return Future slot assignment
         */
-       Future<SlotRequestReply> requestSlot(SlotRequest slotRequest);
+       Future<SlotRequestReply> requestSlot(
+               UUID jobMasterLeaderID,
+               UUID resourceManagerLeaderID,
+               SlotRequest slotRequest,
+               @RpcTimeout Time timeout);
 
        /**
         * Register a {@link 
org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager.

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
new file mode 100644
index 0000000..30994dc
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Interface which provides access to services of the ResourceManager.
+ */
+public interface ResourceManagerServices {
+
+       /**
+        * Allocates a resource according to the resource profile.
+        */
+       void allocateResource(ResourceProfile resourceProfile);
+
+       /**
+        * Gets the async excutor which executes outside of the main thread of 
the ResourceManager
+        */
+       Executor getAsyncExecutor();
+
+       /**
+        * Gets the executor which executes in the main thread of the 
ResourceManager
+        */
+       Executor getExecutor();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 84db1ee..deca8d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -20,17 +20,18 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 
 /**
  * A standalone implementation of the resource manager. Used when the system 
is started in
  * standalone mode (via scripts), rather than via a resource framework like 
YARN or Mesos.
+ *
+ * This ResourceManager doesn't acquire new resources.
  */
-public class StandaloneResourceManager extends 
ResourceManager<ResourceManagerGateway, TaskExecutorRegistration> {
+public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
        public StandaloneResourceManager(RpcService rpcService,
                HighAvailabilityServices highAvailabilityServices,
@@ -51,14 +52,16 @@ public class StandaloneResourceManager extends 
ResourceManager<ResourceManagerGa
        }
 
        @Override
-       protected TaskExecutorRegistration workerStarted(ResourceID resourceID, 
TaskExecutorGateway taskExecutorGateway) {
-               InstanceID instanceID = new InstanceID();
-               TaskExecutorRegistration taskExecutorRegistration = new 
TaskExecutorRegistration(taskExecutorGateway, instanceID);
-               return taskExecutorRegistration;
+       protected void shutDownApplication(ApplicationStatus finalStatus, 
String optionalDiagnostics) {
        }
 
        @Override
-       protected void shutDownApplication(ApplicationStatus finalStatus, 
String optionalDiagnostics) {
+       public void startNewWorker(ResourceProfile resourceProfile) {
+       }
 
+       @Override
+       protected ResourceID workerStarted(ResourceID resourceID) {
+               return resourceID;
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
deleted file mode 100644
index f8dfdc7..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/TaskExecutorRegistration.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
-
-import java.io.Serializable;
-
-/**
- * This class is responsible for group the TaskExecutorGateway and the 
InstanceID of a registered task executor.
- */
-public class TaskExecutorRegistration implements Serializable {
-
-       private static final long serialVersionUID = -2062957799469434614L;
-
-       private TaskExecutorGateway taskExecutorGateway;
-
-       private InstanceID instanceID;
-
-       public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway,
-                                                                       
InstanceID instanceID) {
-               this.taskExecutorGateway = taskExecutorGateway;
-               this.instanceID = instanceID;
-       }
-
-       public InstanceID getInstanceID() {
-               return instanceID;
-       }
-
-       public TaskExecutorGateway getTaskExecutorGateway() {
-               return taskExecutorGateway;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
new file mode 100644
index 0000000..6b21f5c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.registration;
+
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+import java.io.Serializable;
+
+/**
+ * This class is responsible for group the TaskExecutorGateway and the 
InstanceID of a registered task executor.
+ */
+public class TaskExecutorRegistration implements Serializable {
+
+       private static final long serialVersionUID = -2062957799469434614L;
+
+       private TaskExecutorGateway taskExecutorGateway;
+
+       private InstanceID instanceID;
+
+       public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway,
+                                                                       
InstanceID instanceID) {
+               this.taskExecutorGateway = taskExecutorGateway;
+               this.instanceID = instanceID;
+       }
+
+       public InstanceID getInstanceID() {
+               return instanceID;
+       }
+
+       public TaskExecutorGateway getTaskExecutorGateway() {
+               return taskExecutorGateway;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
index ef5ce31..ae1de5a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
@@ -18,7 +18,6 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -51,9 +50,4 @@ public class SimpleSlotManager extends SlotManager {
                }
        }
 
-       @Override
-       protected void allocateContainer(ResourceProfile resourceProfile) {
-               // TODO
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index a6d2196..a56b2f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -22,16 +22,18 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
 import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,6 +84,9 @@ public abstract class SlotManager {
        /** The current leader id set by the ResourceManager */
        private UUID leaderID;
 
+       /** The Resource allocation provider */
+       private ResourceManagerServices resourceManagerServices;
+
        public SlotManager() {
                this.registeredSlots = new HashMap<>(16);
                this.pendingSlotRequests = new LinkedHashMap<>(16);
@@ -91,6 +96,16 @@ public abstract class SlotManager {
                this.timeout = Time.seconds(10);
        }
 
+       /**
+        * Initializes the resource supplier which is needed to request new 
resources.
+        */
+       public void setupResourceManagerServices(ResourceManagerServices 
resourceManagerServices) {
+               if (this.resourceManagerServices != null) {
+                       throw new 
IllegalStateException("ResourceManagerServices may only be set once.");
+               }
+               this.resourceManagerServices = resourceManagerServices;
+       }
+
 
        // 
------------------------------------------------------------------------
        //  slot managements
@@ -120,17 +135,32 @@ public abstract class SlotManager {
 
                        // record this allocation in bookkeeping
                        allocationMap.addAllocation(slot.getSlotId(), 
allocationId);
-
                        // remove selected slot from free pool
-                       freeSlots.remove(slot.getSlotId());
+                       final ResourceSlot removedSlot = 
freeSlots.remove(slot.getSlotId());
 
                        final Future<SlotRequestReply> slotRequestReplyFuture =
                                
slot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
-                       // TODO handle timeouts and response
+
+                       slotRequestReplyFuture.handleAsync(new 
BiFunction<SlotRequestReply, Throwable, Object>() {
+                               @Override
+                               public Object apply(SlotRequestReply 
slotRequestReply, Throwable throwable) {
+                                       if (throwable != null) {
+                                               // we failed, put the slot and 
the request back again
+                                               if 
(allocationMap.isAllocated(slot.getSlotId())) {
+                                                       // only re-add if the 
slot hasn't been removed in the meantime
+                                                       
freeSlots.put(slot.getSlotId(), removedSlot);
+                                               }
+                                               
pendingSlotRequests.put(allocationId, request);
+                                       }
+                                       return null;
+                               }
+                       }, resourceManagerServices.getExecutor());
                } else {
                        LOG.info("Cannot fulfil slot request, try to allocate a 
new container for it, " +
                                "AllocationID:{}, JobID:{}", allocationId, 
request.getJobId());
-                       allocateContainer(request.getResourceProfile());
+                       Preconditions.checkState(resourceManagerServices != 
null,
+                               "Attempted to allocate resources but no 
ResourceManagerServices set.");
+                       
resourceManagerServices.allocateResource(request.getResourceProfile());
                        pendingSlotRequests.put(allocationId, request);
                }
 
@@ -343,7 +373,7 @@ public abstract class SlotManager {
 
                if (chosenRequest != null) {
                        final AllocationID allocationId = 
chosenRequest.getAllocationId();
-                       pendingSlotRequests.remove(allocationId);
+                       final SlotRequest removedSlotRequest = 
pendingSlotRequests.remove(allocationId);
 
                        LOG.info("Assigning SlotID({}) to AllocationID({}), 
JobID:{}", freeSlot.getSlotId(),
                                allocationId, chosenRequest.getJobId());
@@ -351,7 +381,19 @@ public abstract class SlotManager {
 
                        final Future<SlotRequestReply> slotRequestReplyFuture =
                                
freeSlot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
-                       // TODO handle timeouts and response
+
+                       slotRequestReplyFuture.handleAsync(new 
BiFunction<SlotRequestReply, Throwable, Object>() {
+                               @Override
+                               public Object apply(SlotRequestReply 
slotRequestReply, Throwable throwable) {
+                                       if (throwable != null) {
+                                               // we failed, add the request 
back again
+                                               if 
(allocationMap.isAllocated(freeSlot.getSlotId())) {
+                                                       
pendingSlotRequests.put(allocationId, removedSlotRequest);
+                                               }
+                                       }
+                                       return null;
+                               }
+                       }, resourceManagerServices.getExecutor());
                } else {
                        freeSlots.put(freeSlot.getSlotId(), freeSlot);
                }
@@ -417,13 +459,6 @@ public abstract class SlotManager {
        protected abstract SlotRequest chooseRequestToFulfill(final 
ResourceSlot offeredSlot,
                final Map<AllocationID, SlotRequest> pendingRequests);
 
-       /**
-        * The framework specific code for allocating a container for specified 
resource profile.
-        *
-        * @param resourceProfile The resource profile
-        */
-       protected abstract void allocateContainer(final ResourceProfile 
resourceProfile);
-
        // 
------------------------------------------------------------------------
        //  Helper classes
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 9ee9690..0fed79e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -19,12 +19,16 @@
 package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.junit.BeforeClass;
@@ -34,10 +38,13 @@ import org.mockito.Mockito;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executor;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 
 public class SlotManagerTest {
@@ -57,6 +64,8 @@ public class SlotManagerTest {
        @BeforeClass
        public static void setUp() {
                taskExecutorGateway = Mockito.mock(TaskExecutorGateway.class);
+               
Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), 
any(UUID.class), any(Time.class)))
+                       .thenReturn(new 
FlinkCompletableFuture<SlotRequestReply>());
        }
 
        /**
@@ -498,12 +507,13 @@ public class SlotManagerTest {
        //  testing classes
        // 
------------------------------------------------------------------------
 
-       private static class TestingSlotManager extends SlotManager {
+       private static class TestingSlotManager extends SlotManager implements 
ResourceManagerServices {
 
                private final List<ResourceProfile> allocatedContainers;
 
                TestingSlotManager() {
                        this.allocatedContainers = new LinkedList<>();
+                       setupResourceManagerServices(this);
                }
 
                /**
@@ -543,12 +553,23 @@ public class SlotManagerTest {
                }
 
                @Override
-               protected void allocateContainer(ResourceProfile 
resourceProfile) {
+               public void allocateResource(ResourceProfile resourceProfile) {
                        allocatedContainers.add(resourceProfile);
                }
 
+               @Override
+               public Executor getAsyncExecutor() {
+                       return Mockito.mock(Executor.class);
+               }
+
+               @Override
+               public Executor getExecutor() {
+                       return Mockito.mock(Executor.class);
+               }
+
                List<ResourceProfile> getAllocatedContainers() {
                        return allocatedContainers;
                }
+
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f198d8c/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index ff25897..e3018c9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -24,18 +24,14 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
+import org.apache.flink.runtime.resourcemanager.*;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
@@ -47,9 +43,12 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Matchers.any;
@@ -99,9 +98,9 @@ public class SlotProtocolTest extends TestLogger {
                TestingLeaderElectionService rmLeaderElectionService =
                        configureHA(testingHaServices, jobID, rmAddress, 
rmLeaderID, jmAddress, jmLeaderID);
 
-               TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+               SlotManager slotManager = Mockito.spy(new SimpleSlotManager());
                ResourceManager resourceManager =
-                       new StandaloneResourceManager(testRpcService, 
testingHaServices, slotManager);
+                       Mockito.spy(new 
StandaloneResourceManager(testRpcService, testingHaServices, slotManager));
                resourceManager.start();
                rmLeaderElectionService.isLeader(rmLeaderID);
 
@@ -118,7 +117,7 @@ public class SlotProtocolTest extends TestLogger {
 
                SlotRequest slotRequest = new SlotRequest(jobID, allocationID, 
resourceProfile);
                SlotRequestReply slotRequestReply =
-                       resourceManager.requestSlot(slotRequest);
+                       resourceManager.requestSlot(jmLeaderID, rmLeaderID, 
slotRequest);
 
                // 1) SlotRequest is routed to the SlotManager
                verify(slotManager).requestSlot(slotRequest);
@@ -129,13 +128,15 @@ public class SlotProtocolTest extends TestLogger {
                        allocationID);
 
                // 3) SlotRequest leads to a container allocation
-               verify(slotManager, 
timeout(5000)).allocateContainer(resourceProfile);
+               verify(resourceManager, 
timeout(5000)).startNewWorker(resourceProfile);
 
                Assert.assertFalse(slotManager.isAllocated(allocationID));
 
                // slot becomes available
                final String tmAddress = "/tm1";
                TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               
Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), 
any(UUID.class), any(Time.class)))
+                       .thenReturn(new 
FlinkCompletableFuture<SlotRequestReply>());
                testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
                final ResourceID resourceID = ResourceID.generate();
@@ -176,11 +177,13 @@ public class SlotProtocolTest extends TestLogger {
                        configureHA(testingHaServices, jobID, rmAddress, 
rmLeaderID, jmAddress, jmLeaderID);
 
                TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               
Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), 
any(UUID.class), any(Time.class)))
+                       .thenReturn(new 
FlinkCompletableFuture<SlotRequestReply>());
                testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
-               TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+               SlotManager slotManager = Mockito.spy(new SimpleSlotManager());
                ResourceManager resourceManager =
-                       new StandaloneResourceManager(testRpcService, 
testingHaServices, slotManager);
+                       Mockito.spy(new 
StandaloneResourceManager(testRpcService, testingHaServices, slotManager));
                resourceManager.start();
                rmLeaderElectionService.isLeader(rmLeaderID);
 
@@ -207,7 +210,7 @@ public class SlotProtocolTest extends TestLogger {
 
                SlotRequest slotRequest = new SlotRequest(jobID, allocationID, 
resourceProfile);
                SlotRequestReply slotRequestReply =
-                       resourceManager.requestSlot(slotRequest);
+                       resourceManager.requestSlot(jmLeaderID, rmLeaderID, 
slotRequest);
 
                // 1) a SlotRequest is routed to the SlotManager
                verify(slotManager).requestSlot(slotRequest);
@@ -241,15 +244,4 @@ public class SlotProtocolTest extends TestLogger {
                return rmLeaderElectionService;
        }
 
-       private static class TestingSlotManager extends SimpleSlotManager {
-
-               // change visibility of function to public for testing
-               @Override
-               public void allocateContainer(ResourceProfile resourceProfile) {
-                       super.allocateContainer(resourceProfile);
-               }
-
-
-       }
-
 }

Reply via email to