Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2315#discussion_r74971461
  
    --- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
    @@ -0,0 +1,755 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.runtime.clusterframework;
    +
    +import akka.actor.ActorRef;
    +import akka.actor.Props;
    +import com.netflix.fenzo.TaskRequest;
    +import com.netflix.fenzo.TaskScheduler;
    +import com.netflix.fenzo.VirtualMachineLease;
    +import com.netflix.fenzo.functions.Action1;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
    +import org.apache.flink.mesos.scheduler.ConnectionMonitor;
    +import org.apache.flink.mesos.scheduler.LaunchableTask;
    +import org.apache.flink.mesos.scheduler.LaunchCoordinator;
    +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
    +import org.apache.flink.mesos.scheduler.SchedulerProxy;
    +import org.apache.flink.mesos.scheduler.TaskMonitor;
    +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
    +import org.apache.flink.mesos.scheduler.Tasks;
    +import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
    +import org.apache.flink.mesos.scheduler.messages.Disconnected;
    +import org.apache.flink.mesos.scheduler.messages.Error;
    +import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
    +import org.apache.flink.mesos.scheduler.messages.ReRegistered;
    +import org.apache.flink.mesos.scheduler.messages.Registered;
    +import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
    +import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
    +import org.apache.flink.mesos.util.MesosConfiguration;
    +import org.apache.flink.runtime.clusterframework.ApplicationStatus;
    +import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
    +import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
    +import org.apache.flink.runtime.clusterframework.messages.StopCluster;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.mesos.Protos;
    +import org.apache.mesos.Protos.FrameworkInfo;
    +import org.apache.mesos.SchedulerDriver;
    +import org.slf4j.Logger;
    +import scala.Option;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +/**
    + * Flink Resource Manager for Apache Mesos.
    + */
    +public class MesosFlinkResourceManager extends 
FlinkResourceManager<RegisteredMesosWorkerNode> {
    +
    +   /** The Mesos configuration (master and framework info) */
    +   private final MesosConfiguration mesosConfig;
    +
    +   /** The TaskManager container parameters (like container memory size) */
    +   private final MesosTaskManagerParameters taskManagerParameters;
    +
    +   /** Context information used to start a TaskManager Java process */
    +   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
    +
    +   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
    +   private final int maxFailedTasks;
    +
    +   /** Callback handler for the asynchronous Mesos scheduler */
    +   private SchedulerProxy schedulerCallbackHandler;
    +
    +   /** Mesos scheduler driver */
    +   private SchedulerDriver schedulerDriver;
    +
    +   private ActorRef connectionMonitor;
    +
    +   private ActorRef taskRouter;
    +
    +   private ActorRef launchCoordinator;
    +
    +   private ActorRef reconciliationCoordinator;
    +
    +   private MesosWorkerStore workerStore;
    +
    +   final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
    +   final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
    +   final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
    +
    +   /** The number of failed tasks since the master became active */
    +   private int failedTasksSoFar;
    +
    +   public MesosFlinkResourceManager(
    +           Configuration flinkConfig,
    +           MesosConfiguration mesosConfig,
    +           MesosWorkerStore workerStore,
    +           LeaderRetrievalService leaderRetrievalService,
    +           MesosTaskManagerParameters taskManagerParameters,
    +           Protos.TaskInfo.Builder taskManagerLaunchContext,
    +           int maxFailedTasks,
    +           int numInitialTaskManagers) {
    +
    +           super(numInitialTaskManagers, flinkConfig, 
leaderRetrievalService);
    +
    +           this.mesosConfig = requireNonNull(mesosConfig);
    +
    +           this.workerStore = requireNonNull(workerStore);
    +
    +           this.taskManagerParameters = 
requireNonNull(taskManagerParameters);
    +           this.taskManagerLaunchContext = 
requireNonNull(taskManagerLaunchContext);
    +           this.maxFailedTasks = maxFailedTasks;
    +
    +           this.workersInNew = new HashMap<>();
    +           this.workersInLaunch = new HashMap<>();
    +           this.workersBeingReturned = new HashMap<>();
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Mesos-specific behavior
    +   // 
------------------------------------------------------------------------
    +
    +   @Override
    +   protected void initialize() throws Exception {
    +           LOG.info("Initializing Mesos resource master");
    +
    +           workerStore.start();
    +
    +           // create the scheduler driver to communicate with Mesos
    +           schedulerCallbackHandler = new SchedulerProxy(self());
    +
    +           // register with Mesos
    +           FrameworkInfo.Builder frameworkInfo = 
mesosConfig.frameworkInfo()
    +                   .clone()
    +                   .setCheckpoint(true);
    +
    +           Option<Protos.FrameworkID> frameworkID = 
workerStore.getFrameworkID();
    +           if(frameworkID.isEmpty()) {
    +                   LOG.info("Registering as new framework.");
    +           }
    +           else {
    +                   LOG.info("Recovery scenario: re-registering using 
framework ID {}.", frameworkID.get().getValue());
    +                   frameworkInfo.setId(frameworkID.get());
    +           }
    +
    +           MesosConfiguration initializedMesosConfig = 
mesosConfig.withFrameworkInfo(frameworkInfo);
    +           MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
    +           schedulerDriver = 
initializedMesosConfig.createDriver(schedulerCallbackHandler, false);
    +
    +           // create supporting actors
    +           connectionMonitor = createConnectionMonitor();
    +           launchCoordinator = createLaunchCoordinator();
    +           reconciliationCoordinator = createReconciliationCoordinator();
    +           taskRouter = createTaskRouter();
    +
    +           recoverWorkers();
    +
    +           connectionMonitor.tell(new ConnectionMonitor.Start(), self());
    +           schedulerDriver.start();
    +   }
    +
    +   protected ActorRef createConnectionMonitor() {
    +           return context().actorOf(
    +                   
ConnectionMonitor.createActorProps(ConnectionMonitor.class, config),
    +                   "connectionMonitor");
    +   }
    +
    +   protected ActorRef createTaskRouter() {
    +           return context().actorOf(
    +                   Tasks.createActorProps(Tasks.class, config, 
schedulerDriver, TaskMonitor.class),
    +                   "tasks");
    +   }
    +
    +   protected ActorRef createLaunchCoordinator() {
    +           return context().actorOf(
    +                   
LaunchCoordinator.createActorProps(LaunchCoordinator.class, self(), config, 
schedulerDriver, createOptimizer()),
    +                   "launchCoordinator");
    +   }
    +
    +   protected ActorRef createReconciliationCoordinator() {
    +           return context().actorOf(
    +                   
ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, 
config, schedulerDriver),
    +                   "reconciliationCoordinator");
    +   }
    +
    +   @Override
    +   public void postStop() {
    +           LOG.info("Stopping Mesos resource master");
    +           super.postStop();
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Actor messages
    +   // 
------------------------------------------------------------------------
    +
    +   @Override
    +   protected void handleMessage(Object message) {
    +
    +           // check for Mesos-specific actor messages first
    +
    +           // --- messages about Mesos connection
    +           if (message instanceof Registered) {
    +                   registered((Registered) message);
    +           } else if (message instanceof ReRegistered) {
    +                   reregistered((ReRegistered) message);
    +           } else if (message instanceof Disconnected) {
    +                   disconnected((Disconnected) message);
    +           } else if (message instanceof Error) {
    +                   error(((Error) message).message());
    +
    +           // --- messages about offers
    +           } else if (message instanceof ResourceOffers || message 
instanceof OfferRescinded) {
    +                   launchCoordinator.tell(message, self());
    +           } else if (message instanceof AcceptOffers) {
    +                   acceptOffers((AcceptOffers) message);
    +
    +           // --- messages about tasks
    +           } else if (message instanceof StatusUpdate) {
    +                   taskStatusUpdated((StatusUpdate) message);
    +           } else if (message instanceof 
ReconciliationCoordinator.Reconcile) {
    +                   // a reconciliation request from a task
    +                   reconciliationCoordinator.tell(message, self());
    +           } else if (message instanceof TaskMonitor.TaskTerminated) {
    +                   // a termination message from a task
    +                   TaskMonitor.TaskTerminated msg = 
(TaskMonitor.TaskTerminated) message;
    +                   taskTerminated(msg.taskID(), msg.status());
    +
    +           } else  {
    +                   // message handled by the generic resource master code
    +                   super.handleMessage(message);
    +           }
    +   }
    +
    +   /**
    +    * Called to shut down the cluster (not a failover situation).
    +    *
    +    * @param finalStatus The application status to report.
    +    * @param optionalDiagnostics An optional diagnostics message.
    +     */
    +   @Override
    +   protected void shutdownApplication(ApplicationStatus finalStatus, 
String optionalDiagnostics) {
    +
    +           LOG.info("Shutting down and unregistering as a Mesos 
framework.");
    +           try {
    +                   // unregister the framework, which implicitly removes 
all tasks.
    +                   schedulerDriver.stop(false);
    +           }
    +           catch(Exception ex) {
    +                   LOG.warn("unable to unregister the framework", ex);
    +           }
    +
    +           try {
    +                   workerStore.cleanup();
    +           }
    +           catch(Exception ex) {
    +                   LOG.warn("unable to cleanup the ZooKeeper state", ex);
    +           }
    +
    +           context().stop(self());
    +   }
    +
    +   @Override
    +   protected void fatalError(String message, Throwable error) {
    +           // we do not unregister, but cause a hard fail of this process, 
to have it
    +           // restarted by the dispatcher
    +           LOG.error("FATAL ERROR IN MESOS APPLICATION MASTER: " + 
message, error);
    +           LOG.error("Shutting down process");
    +
    +           // kill this process, this will make an external supervisor 
(the dispatcher) restart the process
    +           System.exit(EXIT_CODE_FATAL_ERROR);
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Worker Management
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Recover framework/worker information persisted by a prior 
incarnation of the RM.
    +    */
    +   private void recoverWorkers() throws Exception {
    +           // if this application master starts as part of an 
ApplicationMaster/JobManager recovery,
    +           // then some worker tasks are most likely still alive and we 
can re-obtain them
    +           final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = 
workerStore.recoverWorkers();
    +
    +           if (!tasksFromPreviousAttempts.isEmpty()) {
    +                   LOG.info("Retrieved {} TaskManagers from previous 
attempt", tasksFromPreviousAttempts.size());
    +
    +                   List<Tuple2<TaskRequest,String>> toAssign = new 
ArrayList<>(tasksFromPreviousAttempts.size());
    +                   List<LaunchableTask> toLaunch = new 
ArrayList<>(tasksFromPreviousAttempts.size());
    +
    +                   for (final MesosWorkerStore.Worker worker : 
tasksFromPreviousAttempts) {
    +                           LaunchableMesosWorker launchable = 
createLaunchableMesosWorker(worker.taskID());
    +
    +                           switch(worker.state()) {
    +                                   case New:
    +                                           
workersInNew.put(extractResourceID(worker.taskID()), worker);
    +                                           toLaunch.add(launchable);
    +                                           break;
    +                                   case Launched:
    +                                           
workersInLaunch.put(extractResourceID(worker.taskID()), worker);
    +                                           toAssign.add(new 
Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
    +                                           break;
    +                                   case Released:
    +                                           
workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
    +                                           break;
    +                           }
    +                           taskRouter.tell(new 
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
    +                   }
    +
    +                   // tell the launch coordinator about prior assignments
    +                   if(toAssign.size() >= 1) {
    +                           launchCoordinator.tell(new 
LaunchCoordinator.Assign(toAssign), self());
    +                   }
    +                   // tell the launch coordinator to launch any new tasks
    +                   if(toLaunch.size() >= 1) {
    +                           launchCoordinator.tell(new 
LaunchCoordinator.Launch(toLaunch), self());
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Plan for some additional workers to be launched.
    +    *
    +    * @param numWorkers The number of workers to allocate.
    +     */
    +   @Override
    +   protected void requestNewWorkers(int numWorkers) {
    +
    +           try {
    +                   List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new 
ArrayList<>(numWorkers);
    +                   List<LaunchableTask> toLaunch = new 
ArrayList<>(numWorkers);
    +
    +                   // generate new workers into persistent state and 
launch associated actors
    +                   for (int i = 0; i < numWorkers; i++) {
    +                           MesosWorkerStore.Worker worker = 
MesosWorkerStore.Worker.newTask(workerStore.newTaskID());
    +                           workerStore.putWorker(worker);
    +                           
workersInNew.put(extractResourceID(worker.taskID()), worker);
    +
    +                           LaunchableMesosWorker launchable = 
createLaunchableMesosWorker(worker.taskID());
    +
    +                           LOG.info("Scheduling Mesos task {} with ({} MB, 
{} cpus).",
    +                                   launchable.taskID().getValue(), 
launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs());
    +
    +                           toMonitor.add(new 
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
    +                           toLaunch.add(launchable);
    +                   }
    +
    +                   // tell the task router about the new plans
    +                   for (TaskMonitor.TaskGoalStateUpdated update : 
toMonitor) {
    +                           taskRouter.tell(update, self());
    +                   }
    +
    +                   // tell the launch coordinator to launch the new tasks
    +                   if(toLaunch.size() >= 1) {
    +                           launchCoordinator.tell(new 
LaunchCoordinator.Launch(toLaunch), self());
    +                   }
    +           }
    +           catch(Exception ex) {
    +                   fatalError("unable to request new workers", ex);
    +           }
    +   }
    +
    +   /**
    +    * Accept offers as advised by the launch coordinator.
    +    *
    +    * Acceptance is routed through the RM to update the persistent state 
before
    +    * forwarding the message to Mesos.
    +     */
    +   private void acceptOffers(AcceptOffers msg) {
    +
    +           try {
    +                   List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new 
ArrayList<>(msg.operations().size());
    +
    +                   // transition the persistent state of some tasks to 
Launched
    +                   for (Protos.Offer.Operation op : msg.operations()) {
    +                           if (op.getType() != 
Protos.Offer.Operation.Type.LAUNCH) {
    +                                   continue;
    +                           }
    +                           for (Protos.TaskInfo info : 
op.getLaunch().getTaskInfosList()) {
    +                                   MesosWorkerStore.Worker worker = 
workersInNew.remove(extractResourceID(info.getTaskId()));
    +                                   assert (worker != null);
    +
    +                                   worker = 
worker.launchTask(info.getSlaveId(), msg.hostname());
    +                                   workerStore.putWorker(worker);
    +                                   
workersInLaunch.put(extractResourceID(worker.taskID()), worker);
    +
    +                                   LOG.info("Launching Mesos task {} on 
host {}.",
    +                                           worker.taskID().getValue(), 
worker.hostname().get());
    +
    +                                   toMonitor.add(new 
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
    +                           }
    +                   }
    +
    +                   // tell the task router about the new plans
    +                   for (TaskMonitor.TaskGoalStateUpdated update : 
toMonitor) {
    +                           taskRouter.tell(update, self());
    +                   }
    +
    +                   // send the acceptance message to Mesos
    +                   schedulerDriver.acceptOffers(msg.offerIds(), 
msg.operations(), msg.filters());
    +           }
    +           catch(Exception ex) {
    +                   fatalError("unable to accept offers", ex);
    +           }
    +   }
    +
    +   /**
    +    * Handle a task status change.
    +     */
    +   private void taskStatusUpdated(StatusUpdate message) {
    +           taskRouter.tell(message, self());
    +           reconciliationCoordinator.tell(message, self());
    +           schedulerDriver.acknowledgeStatusUpdate(message.status());
    +   }
    +
    +   /**
    +    * Accept the given started worker into the internal state.
    +    *
    +    * @param resourceID The worker resource id
    +    * @return A registered worker node record.
    +    */
    +   @Override
    +   protected RegisteredMesosWorkerNode workerStarted(ResourceID 
resourceID) {
    +           MesosWorkerStore.Worker inLaunch = 
workersInLaunch.remove(resourceID);
    +           if (inLaunch == null) {
    +                   // Worker was not in state "being launched", this can 
indicate that the TaskManager
    +                   // in this worker was already registered or that the 
container was not started
    +                   // by this resource manager. Simply ignore this 
resourceID.
    +                   return null;
    +           }
    +           return new RegisteredMesosWorkerNode(inLaunch);
    +   }
    +
    +   /**
    +    * Accept the given registered workers into the internal state.
    +    *
    +    * @param toConsolidate The worker IDs known previously to the 
JobManager.
    +    * @return A collection of registered worker node records.
    +     */
    +   @Override
    +   protected Collection<RegisteredMesosWorkerNode> 
reacceptRegisteredWorkers(Collection<ResourceID> toConsolidate) {
    +
    +           // we check for each task manager if we recognize its Mesos 
task ID
    +           List<RegisteredMesosWorkerNode> accepted = new 
ArrayList<>(toConsolidate.size());
    +           for (ResourceID resourceID : toConsolidate) {
    +                   MesosWorkerStore.Worker worker = 
workersInLaunch.remove(resourceID);
    +                   if (worker != null) {
    +                           LOG.info("Mesos worker consolidation recognizes 
TaskManager {}.", resourceID);
    +                           accepted.add(new 
RegisteredMesosWorkerNode(worker));
    +                   }
    +                   else {
    +                           if(isStarted(resourceID)) {
    +                                   LOG.info("TaskManager {} has already 
been registered at the resource manager.", resourceID);
    +                           }
    +                           else {
    +                                   LOG.info("Mesos worker consolidation 
does not recognize TaskManager {}.", resourceID);
    +                           }
    +                   }
    +           }
    +           return accepted;
    +   }
    +
    +   /**
    +    * Release the given pending worker.
    +    */
    +   @Override
    +   protected void releasePendingWorker(ResourceID id) {
    +           MesosWorkerStore.Worker worker = workersInLaunch.remove(id);
    +           if (worker != null) {
    +                   releaseWorker(worker);
    +           } else {
    +                   LOG.error("Cannot find worker {} to release. Ignoring 
request.", id);
    +           }
    +   }
    +
    +   /**
    +    * Release the given started worker.
    +    */
    +   @Override
    +   protected void releaseStartedWorker(RegisteredMesosWorkerNode worker) {
    +           releaseWorker(worker.task());
    +   }
    +
    +   /**
    +    * Plan for the removal of the given worker.
    +     */
    +   private void releaseWorker(MesosWorkerStore.Worker worker) {
    +           try {
    +                   LOG.info("Releasing worker {}", worker.taskID());
    +
    +                   // update persistent state of worker to Released
    +                   worker = worker.releaseTask();
    +                   workerStore.putWorker(worker);
    +                   
workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
    +                   taskRouter.tell(new 
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
    +
    +                   if (worker.hostname().isDefined()) {
    +                           // tell the launch coordinator that the task is 
being unassigned from the host, for planning purposes
    +                           launchCoordinator.tell(new 
LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), self());
    +                   }
    +           }
    +           catch (Exception ex) {
    +                   fatalError("unable to release worker", ex);
    +           }
    +   }
    +
    +   @Override
    +   protected int getNumWorkerRequestsPending() {
    +           return workersInNew.size();
    +   }
    +
    +   @Override
    +   protected int getNumWorkersPendingRegistration() {
    +           return workersInLaunch.size();
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Callbacks from the Mesos Master
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Called when connected to Mesos as a new framework.
    +    */
    +   private void registered(Registered message) {
    +           connectionMonitor.tell(message, self());
    +
    +           try {
    +                   
workerStore.setFrameworkID(Option.apply(message.frameworkId()));
    +           }
    +           catch(Exception ex) {
    +                   fatalError("unable to store the assigned framework ID", 
ex);
    +                   return;
    +           }
    +
    +           launchCoordinator.tell(message, self());
    +           reconciliationCoordinator.tell(message, self());
    +           taskRouter.tell(message, self());
    +   }
    +
    +   /**
    +    * Called when reconnected to Mesos following a failover event.
    +    */
    +   private void reregistered(ReRegistered message) {
    +           connectionMonitor.tell(message, self());
    +           launchCoordinator.tell(message, self());
    +           reconciliationCoordinator.tell(message, self());
    +           taskRouter.tell(message, self());
    +   }
    +
    +   /**
    +    * Called when disconnected from Mesos.
    +    */
    +   private void disconnected(Disconnected message) {
    +           connectionMonitor.tell(message, self());
    +           launchCoordinator.tell(message, self());
    +           reconciliationCoordinator.tell(message, self());
    +           taskRouter.tell(message, self());
    +   }
    +
    +   /**
    +    * Called when an error is reported by the scheduler callback.
    +    */
    +   private void error(String message) {
    +           self().tell(new FatalErrorOccurred("Connection to Mesos 
failed", new Exception(message)), self());
    +   }
    +
    +   /**
    +    * Invoked when a Mesos task reaches a terminal status.
    +     */
    +   private void taskTerminated(Protos.TaskID taskID, Protos.TaskStatus 
status) {
    +           // this callback occurs for failed containers and for released 
containers alike
    +
    +           final ResourceID id = extractResourceID(taskID);
    +
    +           try {
    +                   workerStore.removeWorker(taskID);
    +           }
    +           catch(Exception ex) {
    +                   fatalError("unable to remove worker", ex);
    +                   return;
    +           }
    +
    +           // check if this is a failed task or a released task
    +           if (workersBeingReturned.remove(id) != null) {
    +                   // regular finished worker that we released
    +                   LOG.info("Worker {} finished successfully with 
diagnostics: {}",
    +                           id, status.getMessage());
    +           } else {
    +                   // failed worker, either at startup, or running
    +                   final MesosWorkerStore.Worker launched = 
workersInLaunch.remove(id);
    +                   if (launched != null) {
    +                           LOG.info("Mesos task {} failed, with a 
TaskManager in launch or registration. " +
    +                                   "State: {} Reason: {} ({})", id, 
status.getState(), status.getReason(), status.getMessage());
    +                           // we will trigger re-acquiring new workers at 
the end
    +                   } else {
    +                           // failed registered worker
    +                           LOG.info("Mesos task {} failed, with a 
registered TaskManager. " +
    +                                   "State: {} Reason: {} ({})", id, 
status.getState(), status.getReason(), status.getMessage());
    +
    +                           // notify the generic logic, which notifies the 
JobManager, etc.
    +                           notifyWorkerFailed(id, "Mesos task " + id + " 
failed.  State: " + status.getState());
    +                   }
    +
    +                   // general failure logging
    +                   failedTasksSoFar++;
    +
    +                   String diagMessage = String.format("Diagnostics for 
task %s in state %s : " +
    +                                   "reason=%s message=%s",
    +                           id, status.getState(), status.getReason(), 
status.getMessage());
    +                   sendInfoMessage(diagMessage);
    +
    +                   LOG.info(diagMessage);
    +                   LOG.info("Total number of failed tasks so far: " + 
failedTasksSoFar);
    --- End diff --
    
    `{}`-placeholder


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to