[FLINK-4606]  Integrate the new ResourceManager with the existed 
FlinkResourceManager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/415af17f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/415af17f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/415af17f

Branch: refs/heads/flip-6
Commit: 415af17fdf45fe157a6ee5f7187ee63e8845f168
Parents: bb781ae
Author: beyond1920 <beyond1...@126.com>
Authored: Fri Sep 9 09:11:24 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:42 2016 +0200

----------------------------------------------------------------------
 .../InfoMessageListenerRpcGateway.java          |  35 +++
 .../resourcemanager/ResourceManager.java        | 214 ++++++++++++++++---
 .../resourcemanager/ResourceManagerGateway.java |  23 ++
 .../StandaloneResourceManager.java              |  64 ++++++
 .../resourcemanager/ResourceManagerHATest.java  |   2 +-
 .../ResourceManagerJobMasterTest.java           |   2 +-
 .../ResourceManagerTaskExecutorTest.java        |   2 +-
 .../slotmanager/SlotProtocolTest.java           |   5 +-
 8 files changed, 318 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/415af17f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
new file mode 100644
index 0000000..c1eeefa
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
+import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.rpc.RpcGateway;
+
+/**
+ * A gateway to listen for info messages from {@link ResourceManager}
+ */
+public interface InfoMessageListenerRpcGateway extends RpcGateway {
+
+       /**
+        * Notifies when resource manager need to notify listener about 
InfoMessage
+        * @param infoMessage
+        */
+       void notifyInfoMessage(InfoMessage infoMessage);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/415af17f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 88b8a11..83dc4db 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -20,19 +20,22 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -42,8 +45,6 @@ import 
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.HashMap;
@@ -66,15 +67,16 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource 
manager</li>
  * </ul>
  */
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> 
implements LeaderContender {
+public abstract class ResourceManager<ResourceManagerGateway, WorkerType 
extends TaskExecutorRegistration> extends RpcEndpoint implements 
LeaderContender {
 
-       private final Logger LOG = LoggerFactory.getLogger(getClass());
+       /** The exit code with which the process is stopped in case of a fatal 
error */
+       protected static final int EXIT_CODE_FATAL_ERROR = -13;
 
        private final Map<JobID, JobMasterGateway> jobMasterGateways;
 
        private final Set<LeaderRetrievalListener> 
jobMasterLeaderRetrievalListeners;
 
-       private final Map<ResourceID, TaskExecutorRegistration> 
taskExecutorGateways;
+       private final Map<ResourceID, WorkerType> taskExecutorGateways;
 
        private final HighAvailabilityServices highAvailabilityServices;
 
@@ -84,16 +86,16 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> impleme
 
        private UUID leaderSessionID;
 
-       public ResourceManager(
-                       RpcService rpcService,
-                       HighAvailabilityServices highAvailabilityServices,
-                       SlotManager slotManager) {
+       private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners;
+
+       public ResourceManager(RpcService rpcService, HighAvailabilityServices 
highAvailabilityServices, SlotManager slotManager) {
                super(rpcService);
                this.highAvailabilityServices = 
checkNotNull(highAvailabilityServices);
                this.jobMasterGateways = new HashMap<>();
-               this.slotManager = slotManager;
+               this.slotManager = checkNotNull(slotManager);
                this.jobMasterLeaderRetrievalListeners = new HashSet<>();
                this.taskExecutorGateways = new HashMap<>();
+               infoMessageListeners = new HashMap<>();
        }
 
        @Override
@@ -103,6 +105,8 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> impleme
                        super.start();
                        leaderElectionService = 
highAvailabilityServices.getResourceManagerLeaderElectionService();
                        leaderElectionService.start(this);
+                       // framework specific initialization
+                       initialize();
                } catch (Throwable e) {
                        log.error("A fatal error happened when starting the 
ResourceManager", e);
                        throw new RuntimeException("A fatal error happened when 
starting the ResourceManager", e);
@@ -166,12 +170,12 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> impleme
                                                jobMasterLeaderInfo = 
LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
                                                        
highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new 
FiniteDuration(5, TimeUnit.SECONDS));
                                        } catch (Exception e) {
-                                               LOG.warn("Failed to start 
JobMasterLeaderRetriever for JobID {}", jobID);
+                                               log.warn("Failed to start 
JobMasterLeaderRetriever for JobID {}", jobID);
                                                throw new Exception("Failed to 
retrieve JobMasterLeaderRetriever");
                                        }
 
                                        if 
(!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) {
-                                               LOG.info("Declining 
registration request from non-leading JobManager {}", jobMasterAddress);
+                                               log.info("Declining 
registration request from non-leading JobManager {}", jobMasterAddress);
                                                throw new Exception("JobManager 
is not leading");
                                        }
 
@@ -190,7 +194,7 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> impleme
                                                        LeaderRetrievalService 
jobMasterLeaderRetriever = 
highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
                                                        
jobMasterLeaderRetriever.start(jobMasterLeaderListener);
                                                } catch (Exception e) {
-                                                       LOG.warn("Failed to 
start JobMasterLeaderRetriever for JobID {}", jobID);
+                                                       log.warn("Failed to 
start JobMasterLeaderRetriever for JobID {}", jobID);
                                                        return new 
RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
                                                }
                                                
jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
@@ -237,13 +241,24 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> impleme
                                if (throwable != null) {
                                        return new 
RegistrationResponse.Decline(throwable.getMessage());
                                } else {
-                                       InstanceID id = new InstanceID();
-                                       TaskExecutorRegistration 
oldTaskExecutor =
-                                               
taskExecutorGateways.put(resourceID, new 
TaskExecutorRegistration(taskExecutorGateway, id));
-                                       if (oldTaskExecutor != null) {
-                                               log.warn("Receive a duplicate 
registration from TaskExecutor {} at ({})", resourceID, taskExecutorAddress);
+                                       WorkerType startedWorker = 
taskExecutorGateways.get(resourceID);
+                                       if(startedWorker != null) {
+                                               String oldWorkerAddress = 
startedWorker.getTaskExecutorGateway().getAddress();
+                                               if 
(taskExecutorAddress.equals(oldWorkerAddress)) {
+                                                       log.warn("Receive a 
duplicate registration from TaskExecutor {} at ({})", resourceID, 
taskExecutorAddress);
+                                               } else {
+                                                       log.warn("Receive a 
duplicate registration from TaskExecutor {} at different address, previous 
({}), new ({})",
+                                                               resourceID, 
oldWorkerAddress, taskExecutorAddress);
+                                                       // TODO :: suggest old 
taskExecutor to stop itself
+                                                       
slotManager.notifyTaskManagerFailure(resourceID);
+                                                       startedWorker = 
workerStarted(resourceID, taskExecutorGateway);
+                                                       
taskExecutorGateways.put(resourceID, startedWorker);
+                                               }
+                                       } else {
+                                               startedWorker = 
workerStarted(resourceID, taskExecutorGateway);
+                                               
taskExecutorGateways.put(resourceID, startedWorker);
                                        }
-                                       return new 
TaskExecutorRegistrationSuccess(id, 5000);
+                                       return new 
TaskExecutorRegistrationSuccess(startedWorker.getInstanceID(), 5000);
                                }
                        }
                }, getMainThreadExecutor());
@@ -263,14 +278,12 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> impleme
                if (jobMasterGateway != null) {
                        return slotManager.requestSlot(slotRequest);
                } else {
-                       LOG.info("Ignoring slot request for unknown JobMaster 
with JobID {}", jobId);
+                       log.info("Ignoring slot request for unknown JobMaster 
with JobID {}", jobId);
                        return new 
SlotRequestRejected(slotRequest.getAllocationId());
                }
        }
 
 
-
-
        // 
------------------------------------------------------------------------
        //  Leader Contender
        // 
------------------------------------------------------------------------
@@ -324,6 +337,158 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> impleme
                shutDown();
        }
 
+       /**
+        * Registers an infoMessage listener
+        *
+        * @param infoMessageListenerAddress address of infoMessage listener to 
register to this resource manager
+        */
+       @RpcMethod
+       public void registerInfoMessageListener(final String 
infoMessageListenerAddress) {
+               
if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
+                       log.warn("Receive a duplicate registration from info 
message listener on ({})", infoMessageListenerAddress);
+               } else {
+                       Future<InfoMessageListenerRpcGateway> 
infoMessageListenerRpcGatewayFuture = 
getRpcService().connect(infoMessageListenerAddress, 
InfoMessageListenerRpcGateway.class);
+
+                       infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new 
AcceptFunction<InfoMessageListenerRpcGateway>() {
+                               @Override
+                               public void 
accept(InfoMessageListenerRpcGateway gateway) {
+                                       log.info("Receive a registration from 
info message listener on ({})", infoMessageListenerAddress);
+                                       
infoMessageListeners.put(infoMessageListenerAddress, gateway);
+                               }
+                       }, getMainThreadExecutor());
+
+                       
infoMessageListenerRpcGatewayFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
+                               @Override
+                               public Void apply(Throwable failure) {
+                                       log.warn("Receive a registration from 
unreachable info message listener on ({})", infoMessageListenerAddress);
+                                       return null;
+                               }
+                       }, getMainThreadExecutor());
+               }
+       }
+
+       /**
+        * Unregisters an infoMessage listener
+        *
+        * @param infoMessageListenerAddress address of infoMessage listener to 
unregister from this resource manager
+        *
+        */
+       @RpcMethod
+       public void unRegisterInfoMessageListener(final String 
infoMessageListenerAddress) {
+               infoMessageListeners.remove(infoMessageListenerAddress);
+       }
+
+       /**
+        * Shutdowns cluster
+        *
+        * @param finalStatus
+        * @param optionalDiagnostics
+        */
+       @RpcMethod
+       public void shutDownCluster(final ApplicationStatus finalStatus, final 
String optionalDiagnostics) {
+               log.info("shut down cluster because application is in {}, 
diagnostics {}", finalStatus, optionalDiagnostics);
+               shutDownApplication(finalStatus, optionalDiagnostics);
+       }
+
+       /**
+        * This method should be called by the framework once it detects that a 
currently registered task executor has failed.
+        *
+        * @param resourceID Id of the worker that has failed.
+        * @param message An informational message that explains why the worker 
failed.
+        */
+       public void notifyWorkerFailed(final ResourceID resourceID, String 
message) {
+               runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               WorkerType worker = 
taskExecutorGateways.remove(resourceID);
+                               if (worker != null) {
+                                       // TODO :: suggest failed task executor 
to stop itself
+                                       
slotManager.notifyTaskManagerFailure(resourceID);
+                               }
+                       }
+               });
+       }
+
+       /**
+        * Gets the number of currently started TaskManagers.
+        *
+        * @return The number of currently started TaskManagers.
+        */
+       public int getNumberOfStartedTaskManagers() {
+               return taskExecutorGateways.size();
+       }
+
+       /**
+        * Notifies the resource manager of a fatal error.
+        *
+        * <p><b>IMPORTANT:</b> This should not cleanly shut down this master, 
but exit it in
+        * such a way that a high-availability setting would restart this or 
fail over
+        * to another master.
+        */
+       public void onFatalError(final String message, final Throwable error) {
+               runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               fatalError(message, error);
+                       }
+               });
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Framework specific behavior
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Initializes the framework specific components.
+        *
+        * @throws Exception Exceptions during initialization cause the 
resource manager to fail.
+        */
+       protected abstract void initialize() throws Exception;
+
+       /**
+        * Callback when a task executor register.
+        *
+        * @param resourceID The worker resource id
+        * @param taskExecutorGateway the task executor gateway
+        */
+       protected abstract WorkerType workerStarted(ResourceID resourceID, 
TaskExecutorGateway taskExecutorGateway);
+
+       /**
+        * Callback when a resource manager faced a fatal error
+        * @param message
+        * @param error
+        */
+       protected abstract void fatalError(String message, Throwable error);
+
+       /**
+        * The framework specific code for shutting down the application. This 
should report the
+        * application's final status and shut down the resource manager 
cleanly.
+        *
+        * This method also needs to make sure all pending containers that are 
not registered
+        * yet are returned.
+        *
+        * @param finalStatus The application status to report.
+        * @param optionalDiagnostics An optional diagnostics message.
+        */
+       protected abstract void shutDownApplication(ApplicationStatus 
finalStatus, String optionalDiagnostics);
+
+       // 
------------------------------------------------------------------------
+       //  Info messaging
+       // 
------------------------------------------------------------------------
+
+       public void sendInfoMessage(final String message) {
+               runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               InfoMessage infoMessage = new 
InfoMessage(message);
+                               for (InfoMessageListenerRpcGateway 
listenerRpcGateway : infoMessageListeners.values()) {
+                                       listenerRpcGateway
+                                               .notifyInfoMessage(infoMessage);
+                               }
+                       }
+               });
+       }
+
        private static class JobMasterLeaderListener implements 
LeaderRetrievalListener {
 
                private final JobID jobID;
@@ -343,5 +508,6 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> impleme
                        // TODO
                }
        }
+
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/415af17f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 484cea7..7c44006 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -75,4 +76,26 @@ public interface ResourceManagerGateway extends RpcGateway {
                String taskExecutorAddress,
                ResourceID resourceID,
                @RpcTimeout Time timeout);
+
+       /**
+        * Registers an infoMessage listener
+        *
+        * @param infoMessageListenerAddress address of infoMessage listener to 
register to this resource manager
+        */
+       void registerInfoMessageListener(String infoMessageListenerAddress);
+
+       /**
+        * Unregisters an infoMessage listener
+        *
+        * @param infoMessageListenerAddress address of infoMessage listener to 
unregister from this resource manager
+        *
+        */
+       void unRegisterInfoMessageListener(String infoMessageListenerAddress);
+
+       /**
+        * shutdown cluster
+        * @param finalStatus
+        * @param optionalDiagnostics
+        */
+       void shutDownCluster(final ApplicationStatus finalStatus, final String 
optionalDiagnostics);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/415af17f/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
new file mode 100644
index 0000000..84db1ee
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+/**
+ * A standalone implementation of the resource manager. Used when the system 
is started in
+ * standalone mode (via scripts), rather than via a resource framework like 
YARN or Mesos.
+ */
+public class StandaloneResourceManager extends 
ResourceManager<ResourceManagerGateway, TaskExecutorRegistration> {
+
+       public StandaloneResourceManager(RpcService rpcService,
+               HighAvailabilityServices highAvailabilityServices,
+               SlotManager slotManager) {
+               super(rpcService, highAvailabilityServices, slotManager);
+       }
+
+       @Override
+       protected void initialize() throws Exception {
+               // nothing to initialize
+       }
+
+       @Override
+       protected void fatalError(final String message, final Throwable error) {
+               log.error("FATAL ERROR IN RESOURCE MANAGER: " + message, error);
+               // kill this process
+               System.exit(EXIT_CODE_FATAL_ERROR);
+       }
+
+       @Override
+       protected TaskExecutorRegistration workerStarted(ResourceID resourceID, 
TaskExecutorGateway taskExecutorGateway) {
+               InstanceID instanceID = new InstanceID();
+               TaskExecutorRegistration taskExecutorRegistration = new 
TaskExecutorRegistration(taskExecutorGateway, instanceID);
+               return taskExecutorRegistration;
+       }
+
+       @Override
+       protected void shutDownApplication(ApplicationStatus finalStatus, 
String optionalDiagnostics) {
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/415af17f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index 64a1191..fdb83f5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -55,7 +55,7 @@ public class ResourceManagerHATest {
                
highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
 
                SlotManager slotManager = mock(SlotManager.class);
-               final ResourceManager resourceManager = new 
ResourceManager(rpcService, highAvailabilityServices, slotManager);
+               final ResourceManager resourceManager = new 
StandaloneResourceManager(rpcService, highAvailabilityServices, slotManager);
                resourceManager.start();
                // before grant leadership, resourceManager's leaderId is null
                Assert.assertNull(resourceManager.getLeaderSessionID());

http://git-wip-us.apache.org/repos/asf/flink/blob/415af17f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 332c093..8f09152 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -160,7 +160,7 @@ public class ResourceManagerJobMasterTest {
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
                
highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
                highAvailabilityServices.setJobMasterLeaderRetriever(jobID, 
jobMasterLeaderRetrievalService);
-               ResourceManager resourceManager = new 
ResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
+               ResourceManager resourceManager = new 
StandaloneResourceManager(rpcService, highAvailabilityServices, new 
SimpleSlotManager());
                resourceManager.start();
                return resourceManager;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/415af17f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index ed7c7d7..e6d1ed5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -121,7 +121,7 @@ public class ResourceManagerTaskExecutorTest {
        private ResourceManager 
createAndStartResourceManager(TestingLeaderElectionService 
rmLeaderElectionService) {
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
                
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
-               ResourceManager resourceManager = new 
ResourceManager(rpcService, highAvailabilityServices, new SimpleSlotManager());
+               ResourceManager resourceManager = new 
StandaloneResourceManager(rpcService, highAvailabilityServices, new 
SimpleSlotManager());
                resourceManager.start();
                return resourceManager;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/415af17f/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 0232fab..ff25897 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
@@ -100,7 +101,7 @@ public class SlotProtocolTest extends TestLogger {
 
                TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
                ResourceManager resourceManager =
-                       new ResourceManager(testRpcService, testingHaServices, 
slotManager);
+                       new StandaloneResourceManager(testRpcService, 
testingHaServices, slotManager);
                resourceManager.start();
                rmLeaderElectionService.isLeader(rmLeaderID);
 
@@ -179,7 +180,7 @@ public class SlotProtocolTest extends TestLogger {
 
                TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
                ResourceManager resourceManager =
-                       new ResourceManager(testRpcService, testingHaServices, 
slotManager);
+                       new StandaloneResourceManager(testRpcService, 
testingHaServices, slotManager);
                resourceManager.start();
                rmLeaderElectionService.isLeader(rmLeaderID);
 

Reply via email to