[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206695#comment-15206695
 ] 

ASF GitHub Bot commented on FLINK-3544:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1741#discussion_r57017514
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java ---
    @@ -0,0 +1,647 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.yarn;
    +
    +import akka.actor.ActorRef;
    +import akka.actor.Props;
    +
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
    +import org.apache.flink.runtime.clusterframework.ApplicationStatus;
    +import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
    +import org.apache.flink.runtime.clusterframework.messages.StopCluster;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.instance.ActorGateway;
    +import org.apache.flink.runtime.instance.AkkaActorGateway;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.flink.yarn.messages.ContainersAllocated;
    +import org.apache.flink.yarn.messages.ContainersComplete;
    +
    +import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
    +import org.apache.hadoop.yarn.api.records.Container;
    +import org.apache.hadoop.yarn.api.records.ContainerId;
    +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
    +import org.apache.hadoop.yarn.api.records.ContainerStatus;
    +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
    +import org.apache.hadoop.yarn.api.records.Priority;
    +import org.apache.hadoop.yarn.api.records.Resource;
    +import org.apache.hadoop.yarn.client.api.AMRMClient;
    +import org.apache.hadoop.yarn.client.api.NMClient;
    +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
    +import org.apache.hadoop.yarn.conf.YarnConfiguration;
    +
    +import org.slf4j.Logger;
    +
    +import java.lang.reflect.Method;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +/**
    + * Specialized Flink Resource Manager implementation for YARN clusters. It 
is started as the
    + * YARN ApplicationMaster and implements the YARN-specific logic for 
container requests and failure
    + * monitoring.
    + */
    +public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYarnWorkerNode> {
    +   
    +   /** The heartbeat interval while the resource master is waiting for 
containers */
    +   private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
    +
    +   /** The default heartbeat interval during regular operation */
    +   private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
    +
    +   /** The containers where a TaskManager is starting and we are waiting 
for it to register */
    +   private final Map<ResourceID, YarnContainerInLaunch> containersInLaunch;
    +
    +   /** Containers we have released, where we are waiting for an 
acknowledgement that
    +    * they are released */
    +   private final Map<ContainerId, Container> containersBeingReturned;
    +
    +   /** The YARN / Hadoop configuration object */
    +   private final YarnConfiguration yarnConfig;
    +
    +   /** The TaskManager container parameters (like container memory size) */
    +   private final ContaineredTaskManagerParameters taskManagerParameters;
    +
    +   /** Context information used to start a TaskManager Java process */
    +   private final ContainerLaunchContext taskManagerLaunchContext;
    +
    +   /** Host name for the container running this process */
    +   private final String applicationMasterHostName;
    +
    +   /** Web interface URL, may be null */
    +   private final String webInterfaceURL;
    +
    +   /** Default heartbeat interval between this actor and the YARN resource 
manager */
    +   private final int yarnHeartbeatIntervalMillis;
    +
    +   /** Number of failed TaskManager containers before stopping the 
application. -1 means infinite. */ 
    +   private final int maxFailedContainers;
    +
    +   /** Callback handler for the asynchronous resourceManagerClient */
    +   private YarnResourceManagerCallbackHandler 
resourceManagerCallbackHandler;
    +
    +   /** Client to communicate with the Resource Manager (YARN's master) */
    +   private AMRMClientAsync<AMRMClient.ContainerRequest> 
resourceManagerClient;
    +
    +   /** Client to communicate with the Node manager and launch TaskManager 
processes */
    +   private NMClient nodeManagerClient;
    +
    +   /** The number of containers requested, but not yet granted */
    +   private int numPendingContainerRequests;
    +
    +   /** The number of failed containers since the master became active */
    +   private int failedContainersSoFar;
    +
    +
    +   public YarnFlinkResourceManager(
    +                   Configuration flinkConfig,
    +                   YarnConfiguration yarnConfig,
    +                   LeaderRetrievalService leaderRetrievalService,
    +                   String applicationMasterHostName,
    +                   String webInterfaceURL,
    +                   ContaineredTaskManagerParameters taskManagerParameters,
    +                   ContainerLaunchContext taskManagerLaunchContext,
    +                   int yarnHeartbeatIntervalMillis,
    +                   int maxFailedContainers,
    +                   int numInitialTaskManagers) {
    +
    +           super(numInitialTaskManagers, flinkConfig, 
leaderRetrievalService);
    +
    +           this.yarnConfig = requireNonNull(yarnConfig);
    +           this.taskManagerParameters = 
requireNonNull(taskManagerParameters);
    +           this.taskManagerLaunchContext = 
requireNonNull(taskManagerLaunchContext);
    +           this.applicationMasterHostName = 
requireNonNull(applicationMasterHostName);
    +           this.webInterfaceURL = webInterfaceURL;
    +           this.yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMillis;
    +           this.maxFailedContainers = maxFailedContainers;
    +
    +           this.containersInLaunch = new HashMap<>();
    +           this.containersBeingReturned = new HashMap<>();
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Actor messages
    +   // 
------------------------------------------------------------------------
    +
    +   @Override
    +   protected void handleMessage(Object message) {
    +
    +           // check for YARN specific actor messages first
    +
    +           if (message instanceof ContainersAllocated) {
    +                   containersAllocated(((ContainersAllocated) 
message).containers());
    +
    +           } else if (message instanceof ContainersComplete) {
    +                   containersComplete(((ContainersComplete) 
message).containers());
    +
    +           } else {
    +                   // message handled by the generic resource master code
    +                   super.handleMessage(message);
    +           }
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  YARN specific behavior
    +   // 
------------------------------------------------------------------------
    +
    +   @Override
    +   protected void initialize() throws Exception {
    +           LOG.info("Initializing YARN resource master");
    +
    +           // create the client to communicate with the resource manager
    +           ActorGateway selfGateway = new AkkaActorGateway(self(), 
getLeaderSessionID());
    +           resourceManagerCallbackHandler = new 
YarnResourceManagerCallbackHandler(selfGateway);
    +
    +           resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(
    +                   yarnHeartbeatIntervalMillis, 
resourceManagerCallbackHandler);
    +           resourceManagerClient.init(yarnConfig);
    +           resourceManagerClient.start();
    +
    +           // create the client to communicate with the node managers
    +           nodeManagerClient = NMClient.createNMClient();
    +           nodeManagerClient.init(yarnConfig);
    +           nodeManagerClient.start();
    +           nodeManagerClient.cleanupRunningContainersOnStop(true);
    +
    +           // register with Resource Manager
    +           LOG.info("Registering Application Master with tracking url {}", 
webInterfaceURL);
    +
    +           scala.Option<Object> portOption = 
AkkaUtils.getAddress(getContext().system()).port();
    +           int actorSystemPort = portOption.isDefined() ? (int) 
portOption.get() : -1;
    +
    +           RegisterApplicationMasterResponse response = 
resourceManagerClient.registerApplicationMaster(
    +                   applicationMasterHostName, actorSystemPort, 
webInterfaceURL);
    +
    +           // if this application master starts as part of an 
ApplicationMaster/JobManager recovery,
    +           // then some worker containers are most likely still alive and 
we can re-obtain them
    +           List<Container> containersFromPreviousAttempts = 
getContainersFromPreviousAttempts(response);
    +
    +           if (!containersFromPreviousAttempts.isEmpty()) {
    +                   LOG.info("Retrieved {} TaskManagers from previous 
attempt", containersFromPreviousAttempts.size());
    +
    +                   final long now = System.currentTimeMillis();
    +                   for (Container c : containersFromPreviousAttempts) {
    +                           containersInLaunch.put(new 
ResourceID(c.getId().toString()), new YarnContainerInLaunch(c, now));
    +                   }
    +
    +                   // adjust the progress indicator
    +                   updateProgress();
    +           }
    +   }
    +
    +   @Override
    +   protected void leaderUpdated() {
    +           AkkaActorGateway newGateway = new AkkaActorGateway(self(), 
getLeaderSessionID());
    +           
resourceManagerCallbackHandler.setCurrentLeaderGateway(newGateway);
    +   }
    +
    +   @Override
    +   protected void shutdownApplication(ApplicationStatus finalStatus, 
String optionalDiagnostics) {
    +           // first, de-register from YARN
    +           FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
    +           LOG.info("Unregistering application from the YARN Resource 
Manager");
    +           try {
    +                   
resourceManagerClient.unregisterApplicationMaster(yarnStatus, 
optionalDiagnostics, "");
    +           } catch (Throwable t) {
    +                   LOG.error("Could not unregister the application 
master.", t);
    +           }
    +
    +           // now shut down all our components
    +           try {
    +                   resourceManagerClient.stop();
    +           } catch (Throwable t) {
    +                   LOG.error("Could not cleanly shut down the Asynchronous 
Resource Manager Client", t);
    +           }
    +           try {
    +                   nodeManagerClient.stop();
    +           } catch (Throwable t) {
    +                   LOG.error("Could not cleanly shut down the Node Manager 
Client", t);
    +           }
    +   }
    +
    +   @Override
    +   protected void fatalError(String message, Throwable error) {
    +           // we do not unregister, but cause a hard fail of this process, 
to have it
    +           // restarted by YARN
    +           LOG.error("FATAL ERROR IN YARN APPLICATION MASTER: " + message, 
error);
    +           LOG.error("Shutting down process");
    +
    +           // kill this process, this will make YARN restart the process
    +           System.exit(EXIT_CODE_FATAL_ERROR);
    +   }
    +
    +   @Override
    +   protected void requestNewWorkers(int numWorkers) {
    +           final long mem = 
taskManagerParameters.taskManagerTotalMemoryMB();
    +           final int containerMemorySizeMB;
    +
    +           if (mem <= Integer.MAX_VALUE) {
    +                   containerMemorySizeMB = (int) mem;
    +           } else {
    +                   containerMemorySizeMB = Integer.MAX_VALUE;
    +                   LOG.error("Decreasing container size from {} MB to {} 
MB (integer value overflow)",
    +                           mem, containerMemorySizeMB);
    +           }
    +
    +           for (int i = 0; i < numWorkers; i++) {
    +                   numPendingContainerRequests++;
    +                   LOG.info("Requesting new TaskManager container with {} 
megabytes memory. Pending requests: {}",
    +                           containerMemorySizeMB, 
numPendingContainerRequests);
    +
    +                   // Priority for worker containers - priorities are 
intra-application
    +                   Priority priority = Priority.newInstance(0);
    +
    +                   // Resource requirements for worker containers
    +                   int taskManagerSlots = 
Integer.valueOf(System.getenv(FlinkYarnClientBase.ENV_SLOTS));
    +                   int vcores = 
config.getInteger(ConfigConstants.YARN_VCORES, Math.max(taskManagerSlots, 1));
    +                   Resource capability = 
Resource.newInstance(containerMemorySizeMB, vcores);
    +
    +                   resourceManagerClient.addContainerRequest(
    +                           new AMRMClient.ContainerRequest(capability, 
null, null, priority));
    +           }
    +
    +           // make sure we transmit the request fast and receive fast news 
of granted allocations
    +           
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
    +   }
    +
    +   @Override
    +   protected void releasePendingWorker(ResourceID id) {
    +           YarnContainerInLaunch container = containersInLaunch.remove(id);
    +           if (container != null) {
    +                   releaseYarnContainer(container.container());
    +           } else {
    +                   LOG.error("Cannot find container {} to release. 
Ignoring request.", id);
    +           }
    +   }
    +
    +   @Override
    +   protected void releaseRegisteredWorker(RegisteredYarnWorkerNode worker) 
{
    +           releaseYarnContainer(worker.yarnContainer());
    +   }
    +   
    +   private void releaseYarnContainer(Container container) {
    +           LOG.info("Releasing YARN container {}", container.getId());
    +
    +           containersBeingReturned.put(container.getId(), container);
    +
    +           // release the container on the node manager
    +           try {
    +                   nodeManagerClient.stopContainer(container.getId(), 
container.getNodeId());
    +           } catch (Throwable t) {
    +                   // we only log this error. since the resource manager 
also gets the release
    +                   // notification, the container should be eventually 
cleaned up
    +                   LOG.error("Error while calling YARN Node Manager to 
release container", t);
    +           }
    +           
    +           // tell the master that the container is no longer needed
    +           
resourceManagerClient.releaseAssignedContainer(container.getId());
    +   }
    +
    +   @Override
    +   protected RegisteredYarnWorkerNode workerRegistered(ResourceID 
resourceID) throws Exception {
    +           YarnContainerInLaunch inLaunch = 
containersInLaunch.remove(resourceID);
    +           if (inLaunch == null) {
    +                   throw new Exception("Cannot register Worker - unknown 
resource id " + resourceID);
    +           } else {
    +                   return new RegisteredYarnWorkerNode(resourceID, 
inLaunch.container());
    +           }
    +   }
    +
    +   @Override
    +   protected Collection<RegisteredYarnWorkerNode> 
reacceptRegisteredWorkers(Collection<ResourceID> toConsolidate) {
    +           // we check for each task manager if we recognize its container
    +           List<RegisteredYarnWorkerNode> accepted = new ArrayList<>();
    +           for (ResourceID resourceID : toConsolidate) {
    +                   YarnContainerInLaunch yci = 
containersInLaunch.remove(resourceID);
    +
    +                   if (yci != null) {
    +                           LOG.info("YARN container consolidation 
recognizes Resource {} ", resourceID);
    +
    +                           accepted.add(new 
RegisteredYarnWorkerNode(resourceID, yci.container()));
    +                   }
    +                   else {
    +                           LOG.info("YARN container consolidation does not 
recognize TaskManager {}",
    +                                   resourceID);
    +                   }
    +           }
    +           return accepted;
    +   }
    +
    +   @Override
    +   protected int getNumWorkerRequestsPending() {
    +           return numPendingContainerRequests;
    +   }
    +
    +   @Override
    +   protected int getNumWorkersPendingRegistration() {
    +           return containersInLaunch.size();
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Callbacks from the YARN Resource Manager 
    +   // 
------------------------------------------------------------------------
    +   
    +   private void containersAllocated(List<Container> containers) {
    +           final int numRequired = getDesignatedWorkerPoolSize();
    +           final int numRegistered = getNumberOfRegisteredTaskManagers();
    +           
    +           for (Container container : containers) {
    +                   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
    +                   LOG.info("Received new container: {} - Remaining 
pending container requests: {}",
    +                           container.getId(), numPendingContainerRequests);
    +                   
    +                   // decide whether to return the container, or whether 
to start a TaskManager
    +                   if (numRegistered + containersInLaunch.size() < 
numRequired) {
    +                           // start a TaskManager
    +                           final ResourceID containerIdString = new 
ResourceID(container.getId().toString());
    +                           final long now = System.currentTimeMillis();
    +                           containersInLaunch.put(containerIdString, new 
YarnContainerInLaunch(container, now));
    +                           
    +                           String message = "Launching TaskManager in 
container " + containerIdString
    +                                   + " on host " + 
container.getNodeId().getHost();
    +                           LOG.info(message);
    +                           sendInfoMessage(message);
    +                           
    +                           try {
    +                                   
nodeManagerClient.startContainer(container, taskManagerLaunchContext);
    +                           }
    +                           catch (Throwable t) {
    +                                   // failed to launch the container
    +                                   
containersInLaunch.remove(containerIdString);
    +                                   
    +                                   // return container, a new one will be 
requested eventually
    +                                   LOG.error("Could not start TaskManager 
in container " + containerIdString, t);
    +                                   
containersBeingReturned.put(container.getId(), container);
    +                                   
resourceManagerClient.releaseAssignedContainer(container.getId());
    +                           }
    +                   } else {
    +                           // return excessive container
    +                           LOG.info("Returning excess container {}", 
container.getId());
    +                           containersBeingReturned.put(container.getId(), 
container);
    +                           
resourceManagerClient.releaseAssignedContainer(container.getId());
    +                   }
    +           }
    +
    +           updateProgress();
    +           
    +           // if we are waiting for no further containers, we can go to the
    +           // regular heartbeat interval
    +           if (numPendingContainerRequests <= 0) {
    +                   
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
    +           }
    +           
    +           // make sure we re-check the status of workers / containers one 
more time at least,
    +           // in case some containers did not come up properly
    +           triggerCheckWorkers();
    +   }
    +
    +   /**
    +    * Invoked when the resource manager informs of completed containers.
    +    * Called via an actor message by the callback from the resource 
manager client.
    +    * 
    +    * @param containers The containers that have completed.
    +    */
    +   private void containersComplete(List<ContainerStatus> containers) {
    +           // the list contains both failed containers, as well as 
containers that
    +           // were gracefully returned by this application master
    +
    +           for (ContainerStatus status : containers) {
    +                   final ResourceID id = new 
ResourceID(status.getContainerId().toString());
    +
    +                   // check if this is a failed container or a completed 
container
    +                   if 
(containersBeingReturned.remove(status.getContainerId()) != null) {
    +                           // regular completed container that we released
    +                           LOG.info("Container {} completed successfully 
with diagnostics: {}",
    +                                   id, status.getDiagnostics());
    +                   } else {
    +                           // failed container, either at startup, or 
running
    +                           final String exitStatus;
    +                           switch (status.getExitStatus()) {
    +                                   case -103:
    +                                           exitStatus = "Vmem limit 
exceeded (-103)";
    +                                           break;
    +                                   case -104:
    +                                           exitStatus = "Pmem limit 
exceeded (-104)";
    +                                           break;
    +                                   default:
    +                                           exitStatus = 
String.valueOf(status.getExitStatus());
    +                           }
    +
    +                           final YarnContainerInLaunch launched = 
containersInLaunch.remove(id);
    +                           if (launched != null) {
    +                                   LOG.info("Container {} failed, with a 
TaskManager in launch or registration. " +
    +                                                   "Exit status: {}", id, 
exitStatus);
    +                                   // we will trigger re-acquiring new 
containers at the end
    +                           } else {
    +                                   // failed registered worker
    +                                   LOG.info("Container {} failed. Exit 
status: {}", id, exitStatus);
    +
    +                                   // notify the generic logic, which 
notifies the JobManager, etc.
    +                                   notifyWorkerFailed(id, "Container " + 
id + " failed. " + "Exit status: {}" + exitStatus);
    +                           }
    +
    +                           // general failure logging
    +                           failedContainersSoFar++;
    +
    +                           String diagMessage = String.format("Diagnostics 
for container %s in state %s : " +
    +                                   "exitStatus=%s diagnostics=%s",
    +                                   id, status.getState(), exitStatus, 
status.getDiagnostics());
    +                           sendInfoMessage(diagMessage);
    +
    +                           LOG.info(diagMessage);
    +                           LOG.info("Total number of failed containers so 
far: " + failedContainersSoFar);
    +
    +                           // maxFailedContainers == -1 is infinite number 
of retries.
    +                           if (maxFailedContainers >= 0 && 
failedContainersSoFar > maxFailedContainers) {
    +                                   String msg = "Stopping YARN session 
because the number of failed containers ("
    +                                           + failedContainersSoFar + ") 
exceeded the maximum failed containers ("
    +                                           + maxFailedContainers + "). 
This number is controlled by the '"
    +                                           + 
ConfigConstants.YARN_MAX_FAILED_CONTAINERS + "' configuration setting. "
    +                                           + "By default its the number of 
requested containers.";
    +
    +                                   LOG.error(msg);
    +                                   self().tell(decorateMessage(new 
StopCluster(ApplicationStatus.FAILED, msg)),
    +                                           ActorRef.noSender());
    +
    +                                   // no need to do anything else
    +                                   return;
    +                           }
    +                   }
    +           }
    +
    +           updateProgress();
    +
    +           // in case failed containers were among the finished 
containers, make
    +           // sure we re-examine and request new ones
    +           triggerCheckWorkers();
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Utilities
    +   // 
------------------------------------------------------------------------
    +
    +   private void updateProgress() {
    +           final int required = getDesignatedWorkerPoolSize();
    +           final int available = getNumberOfRegisteredTaskManagers() + 
containersInLaunch.size();
    +           final float progress = (required <= 0) ? 1.0f : available / 
(float) required;
    +
    +           if (resourceManagerCallbackHandler != null) {
    +                   
resourceManagerCallbackHandler.setCurrentProgress(progress);
    +           }
    +   }
    +
    +   /**
    +    * Converts a Flink application status enum to a YARN application 
status enum.
    +    * @param status The Flink application status.
    +    * @return The corresponding YARN application status.
    +    */
    +   private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
    +           if (status == null) {
    +                   return FinalApplicationStatus.UNDEFINED;
    +           }
    +           else {
    +                   switch (status) {
    +                           case SUCCEEDED:
    +                                   return FinalApplicationStatus.SUCCEEDED;
    +                           case FAILED:
    +                                   return FinalApplicationStatus.FAILED;
    +                           case CANCELED:
    +                                   return FinalApplicationStatus.KILLED;
    +                           default:
    +                                   return FinalApplicationStatus.UNDEFINED;
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Checks if a YARN application still has registered containers. If the 
application master
    +    * registered at the resource manager for the first time, this list 
will be empty. If the
    +    * application master registered a repeated time (after a failure and 
recovery), this list
    +    * will contain the containers that were previously allocated.
    +    * 
    +    * @param response The response object from the registration at the 
ResourceManager.
    +    * @return A list with containers from previous application attempt.
    +    */
    +   private List<Container> 
getContainersFromPreviousAttempts(RegisterApplicationMasterResponse response) {
    +           try {
    +                   Method m = RegisterApplicationMasterResponse.class
    +                           .getMethod("getContainersFromPreviousAttempts");
    --- End diff --
    
    Why did you remove the `RegisterApplicationMasterResponseReflector`? With 
the current implementation you will use reflection every time you call 
`getContainersFromPreviousAttempts`. This is inefficient and not necessary 
since `RegisterApplicationMasterResponse.class` won't change.


> ResourceManager runtime components
> ----------------------------------
>
>                 Key: FLINK-3544
>                 URL: https://issues.apache.org/jira/browse/FLINK-3544
>             Project: Flink
>          Issue Type: Sub-task
>          Components: ResourceManager
>    Affects Versions: 1.1.0
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>             Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to