[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15426188#comment-15426188
 ] 

ASF GitHub Bot commented on FLINK-1984:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2315#discussion_r75279529
  
    --- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
    @@ -0,0 +1,755 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.runtime.clusterframework;
    +
    +import akka.actor.ActorRef;
    +import akka.actor.Props;
    +import com.netflix.fenzo.TaskRequest;
    +import com.netflix.fenzo.TaskScheduler;
    +import com.netflix.fenzo.VirtualMachineLease;
    +import com.netflix.fenzo.functions.Action1;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
    +import org.apache.flink.mesos.scheduler.ConnectionMonitor;
    +import org.apache.flink.mesos.scheduler.LaunchableTask;
    +import org.apache.flink.mesos.scheduler.LaunchCoordinator;
    +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
    +import org.apache.flink.mesos.scheduler.SchedulerProxy;
    +import org.apache.flink.mesos.scheduler.TaskMonitor;
    +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
    +import org.apache.flink.mesos.scheduler.Tasks;
    +import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
    +import org.apache.flink.mesos.scheduler.messages.Disconnected;
    +import org.apache.flink.mesos.scheduler.messages.Error;
    +import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
    +import org.apache.flink.mesos.scheduler.messages.ReRegistered;
    +import org.apache.flink.mesos.scheduler.messages.Registered;
    +import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
    +import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
    +import org.apache.flink.mesos.util.MesosConfiguration;
    +import org.apache.flink.runtime.clusterframework.ApplicationStatus;
    +import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
    +import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
    +import org.apache.flink.runtime.clusterframework.messages.StopCluster;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.mesos.Protos;
    +import org.apache.mesos.Protos.FrameworkInfo;
    +import org.apache.mesos.SchedulerDriver;
    +import org.slf4j.Logger;
    +import scala.Option;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +/**
    + * Flink Resource Manager for Apache Mesos.
    + */
    +public class MesosFlinkResourceManager extends 
FlinkResourceManager<RegisteredMesosWorkerNode> {
    +
    +   /** The Mesos configuration (master and framework info) */
    +   private final MesosConfiguration mesosConfig;
    +
    +   /** The TaskManager container parameters (like container memory size) */
    +   private final MesosTaskManagerParameters taskManagerParameters;
    +
    +   /** Context information used to start a TaskManager Java process */
    +   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
    +
    +   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
    +   private final int maxFailedTasks;
    +
    +   /** Callback handler for the asynchronous Mesos scheduler */
    +   private SchedulerProxy schedulerCallbackHandler;
    +
    +   /** Mesos scheduler driver */
    +   private SchedulerDriver schedulerDriver;
    +
    +   private ActorRef connectionMonitor;
    +
    +   private ActorRef taskRouter;
    +
    +   private ActorRef launchCoordinator;
    +
    +   private ActorRef reconciliationCoordinator;
    +
    +   private MesosWorkerStore workerStore;
    +
    +   final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
    +   final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
    +   final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
    +
    +   /** The number of failed tasks since the master became active */
    +   private int failedTasksSoFar;
    +
    +   public MesosFlinkResourceManager(
    +           Configuration flinkConfig,
    +           MesosConfiguration mesosConfig,
    +           MesosWorkerStore workerStore,
    +           LeaderRetrievalService leaderRetrievalService,
    +           MesosTaskManagerParameters taskManagerParameters,
    +           Protos.TaskInfo.Builder taskManagerLaunchContext,
    +           int maxFailedTasks,
    +           int numInitialTaskManagers) {
    +
    +           super(numInitialTaskManagers, flinkConfig, 
leaderRetrievalService);
    +
    +           this.mesosConfig = requireNonNull(mesosConfig);
    +
    +           this.workerStore = requireNonNull(workerStore);
    +
    +           this.taskManagerParameters = 
requireNonNull(taskManagerParameters);
    +           this.taskManagerLaunchContext = 
requireNonNull(taskManagerLaunchContext);
    +           this.maxFailedTasks = maxFailedTasks;
    +
    +           this.workersInNew = new HashMap<>();
    +           this.workersInLaunch = new HashMap<>();
    +           this.workersBeingReturned = new HashMap<>();
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Mesos-specific behavior
    +   // 
------------------------------------------------------------------------
    +
    +   @Override
    +   protected void initialize() throws Exception {
    +           LOG.info("Initializing Mesos resource master");
    +
    +           workerStore.start();
    +
    +           // create the scheduler driver to communicate with Mesos
    +           schedulerCallbackHandler = new SchedulerProxy(self());
    +
    +           // register with Mesos
    +           FrameworkInfo.Builder frameworkInfo = 
mesosConfig.frameworkInfo()
    +                   .clone()
    +                   .setCheckpoint(true);
    +
    +           Option<Protos.FrameworkID> frameworkID = 
workerStore.getFrameworkID();
    +           if(frameworkID.isEmpty()) {
    +                   LOG.info("Registering as new framework.");
    +           }
    +           else {
    +                   LOG.info("Recovery scenario: re-registering using 
framework ID {}.", frameworkID.get().getValue());
    +                   frameworkInfo.setId(frameworkID.get());
    +           }
    +
    +           MesosConfiguration initializedMesosConfig = 
mesosConfig.withFrameworkInfo(frameworkInfo);
    +           MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
    +           schedulerDriver = 
initializedMesosConfig.createDriver(schedulerCallbackHandler, false);
    +
    +           // create supporting actors
    +           connectionMonitor = createConnectionMonitor();
    +           launchCoordinator = createLaunchCoordinator();
    +           reconciliationCoordinator = createReconciliationCoordinator();
    +           taskRouter = createTaskRouter();
    +
    +           recoverWorkers();
    +
    +           connectionMonitor.tell(new ConnectionMonitor.Start(), self());
    +           schedulerDriver.start();
    +   }
    +
    +   protected ActorRef createConnectionMonitor() {
    +           return context().actorOf(
    +                   
ConnectionMonitor.createActorProps(ConnectionMonitor.class, config),
    +                   "connectionMonitor");
    +   }
    +
    +   protected ActorRef createTaskRouter() {
    +           return context().actorOf(
    +                   Tasks.createActorProps(Tasks.class, config, 
schedulerDriver, TaskMonitor.class),
    +                   "tasks");
    +   }
    +
    +   protected ActorRef createLaunchCoordinator() {
    +           return context().actorOf(
    +                   
LaunchCoordinator.createActorProps(LaunchCoordinator.class, self(), config, 
schedulerDriver, createOptimizer()),
    +                   "launchCoordinator");
    +   }
    +
    +   protected ActorRef createReconciliationCoordinator() {
    +           return context().actorOf(
    +                   
ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, 
config, schedulerDriver),
    +                   "reconciliationCoordinator");
    +   }
    +
    +   @Override
    +   public void postStop() {
    +           LOG.info("Stopping Mesos resource master");
    +           super.postStop();
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Actor messages
    +   // 
------------------------------------------------------------------------
    +
    +   @Override
    +   protected void handleMessage(Object message) {
    +
    +           // check for Mesos-specific actor messages first
    +
    +           // --- messages about Mesos connection
    +           if (message instanceof Registered) {
    +                   registered((Registered) message);
    +           } else if (message instanceof ReRegistered) {
    +                   reregistered((ReRegistered) message);
    +           } else if (message instanceof Disconnected) {
    +                   disconnected((Disconnected) message);
    +           } else if (message instanceof Error) {
    +                   error(((Error) message).message());
    +
    +           // --- messages about offers
    +           } else if (message instanceof ResourceOffers || message 
instanceof OfferRescinded) {
    +                   launchCoordinator.tell(message, self());
    +           } else if (message instanceof AcceptOffers) {
    +                   acceptOffers((AcceptOffers) message);
    +
    +           // --- messages about tasks
    +           } else if (message instanceof StatusUpdate) {
    +                   taskStatusUpdated((StatusUpdate) message);
    +           } else if (message instanceof 
ReconciliationCoordinator.Reconcile) {
    +                   // a reconciliation request from a task
    +                   reconciliationCoordinator.tell(message, self());
    +           } else if (message instanceof TaskMonitor.TaskTerminated) {
    +                   // a termination message from a task
    +                   TaskMonitor.TaskTerminated msg = 
(TaskMonitor.TaskTerminated) message;
    +                   taskTerminated(msg.taskID(), msg.status());
    +
    +           } else  {
    +                   // message handled by the generic resource master code
    +                   super.handleMessage(message);
    +           }
    +   }
    +
    +   /**
    +    * Called to shut down the cluster (not a failover situation).
    +    *
    +    * @param finalStatus The application status to report.
    +    * @param optionalDiagnostics An optional diagnostics message.
    +     */
    +   @Override
    +   protected void shutdownApplication(ApplicationStatus finalStatus, 
String optionalDiagnostics) {
    +
    +           LOG.info("Shutting down and unregistering as a Mesos 
framework.");
    +           try {
    +                   // unregister the framework, which implicitly removes 
all tasks.
    +                   schedulerDriver.stop(false);
    +           }
    +           catch(Exception ex) {
    +                   LOG.warn("unable to unregister the framework", ex);
    +           }
    +
    +           try {
    +                   workerStore.cleanup();
    +           }
    +           catch(Exception ex) {
    +                   LOG.warn("unable to cleanup the ZooKeeper state", ex);
    +           }
    +
    +           context().stop(self());
    +   }
    +
    +   @Override
    +   protected void fatalError(String message, Throwable error) {
    +           // we do not unregister, but cause a hard fail of this process, 
to have it
    +           // restarted by the dispatcher
    +           LOG.error("FATAL ERROR IN MESOS APPLICATION MASTER: " + 
message, error);
    +           LOG.error("Shutting down process");
    +
    +           // kill this process, this will make an external supervisor 
(the dispatcher) restart the process
    +           System.exit(EXIT_CODE_FATAL_ERROR);
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Worker Management
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Recover framework/worker information persisted by a prior 
incarnation of the RM.
    +    */
    +   private void recoverWorkers() throws Exception {
    +           // if this application master starts as part of an 
ApplicationMaster/JobManager recovery,
    +           // then some worker tasks are most likely still alive and we 
can re-obtain them
    +           final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = 
workerStore.recoverWorkers();
    +
    +           if (!tasksFromPreviousAttempts.isEmpty()) {
    +                   LOG.info("Retrieved {} TaskManagers from previous 
attempt", tasksFromPreviousAttempts.size());
    +
    +                   List<Tuple2<TaskRequest,String>> toAssign = new 
ArrayList<>(tasksFromPreviousAttempts.size());
    +                   List<LaunchableTask> toLaunch = new 
ArrayList<>(tasksFromPreviousAttempts.size());
    +
    +                   for (final MesosWorkerStore.Worker worker : 
tasksFromPreviousAttempts) {
    +                           LaunchableMesosWorker launchable = 
createLaunchableMesosWorker(worker.taskID());
    +
    +                           switch(worker.state()) {
    +                                   case New:
    +                                           
workersInNew.put(extractResourceID(worker.taskID()), worker);
    +                                           toLaunch.add(launchable);
    +                                           break;
    +                                   case Launched:
    +                                           
workersInLaunch.put(extractResourceID(worker.taskID()), worker);
    +                                           toAssign.add(new 
Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
    +                                           break;
    +                                   case Released:
    +                                           
workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
    +                                           break;
    +                           }
    +                           taskRouter.tell(new 
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
    +                   }
    +
    +                   // tell the launch coordinator about prior assignments
    +                   if(toAssign.size() >= 1) {
    +                           launchCoordinator.tell(new 
LaunchCoordinator.Assign(toAssign), self());
    +                   }
    +                   // tell the launch coordinator to launch any new tasks
    +                   if(toLaunch.size() >= 1) {
    +                           launchCoordinator.tell(new 
LaunchCoordinator.Launch(toLaunch), self());
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Plan for some additional workers to be launched.
    +    *
    +    * @param numWorkers The number of workers to allocate.
    +     */
    +   @Override
    +   protected void requestNewWorkers(int numWorkers) {
    +
    +           try {
    +                   List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new 
ArrayList<>(numWorkers);
    +                   List<LaunchableTask> toLaunch = new 
ArrayList<>(numWorkers);
    +
    +                   // generate new workers into persistent state and 
launch associated actors
    +                   for (int i = 0; i < numWorkers; i++) {
    +                           MesosWorkerStore.Worker worker = 
MesosWorkerStore.Worker.newTask(workerStore.newTaskID());
    +                           workerStore.putWorker(worker);
    +                           
workersInNew.put(extractResourceID(worker.taskID()), worker);
    +
    +                           LaunchableMesosWorker launchable = 
createLaunchableMesosWorker(worker.taskID());
    +
    +                           LOG.info("Scheduling Mesos task {} with ({} MB, 
{} cpus).",
    +                                   launchable.taskID().getValue(), 
launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs());
    +
    +                           toMonitor.add(new 
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
    +                           toLaunch.add(launchable);
    +                   }
    +
    +                   // tell the task router about the new plans
    +                   for (TaskMonitor.TaskGoalStateUpdated update : 
toMonitor) {
    +                           taskRouter.tell(update, self());
    +                   }
    +
    +                   // tell the launch coordinator to launch the new tasks
    +                   if(toLaunch.size() >= 1) {
    +                           launchCoordinator.tell(new 
LaunchCoordinator.Launch(toLaunch), self());
    +                   }
    +           }
    +           catch(Exception ex) {
    +                   fatalError("unable to request new workers", ex);
    +           }
    +   }
    +
    +   /**
    +    * Accept offers as advised by the launch coordinator.
    +    *
    +    * Acceptance is routed through the RM to update the persistent state 
before
    +    * forwarding the message to Mesos.
    +     */
    +   private void acceptOffers(AcceptOffers msg) {
    +
    +           try {
    +                   List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new 
ArrayList<>(msg.operations().size());
    +
    +                   // transition the persistent state of some tasks to 
Launched
    +                   for (Protos.Offer.Operation op : msg.operations()) {
    +                           if (op.getType() != 
Protos.Offer.Operation.Type.LAUNCH) {
    +                                   continue;
    +                           }
    +                           for (Protos.TaskInfo info : 
op.getLaunch().getTaskInfosList()) {
    +                                   MesosWorkerStore.Worker worker = 
workersInNew.remove(extractResourceID(info.getTaskId()));
    +                                   assert (worker != null);
    +
    +                                   worker = 
worker.launchTask(info.getSlaveId(), msg.hostname());
    +                                   workerStore.putWorker(worker);
    +                                   
workersInLaunch.put(extractResourceID(worker.taskID()), worker);
    +
    +                                   LOG.info("Launching Mesos task {} on 
host {}.",
    +                                           worker.taskID().getValue(), 
worker.hostname().get());
    +
    +                                   toMonitor.add(new 
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
    +                           }
    +                   }
    +
    +                   // tell the task router about the new plans
    +                   for (TaskMonitor.TaskGoalStateUpdated update : 
toMonitor) {
    +                           taskRouter.tell(update, self());
    +                   }
    +
    +                   // send the acceptance message to Mesos
    +                   schedulerDriver.acceptOffers(msg.offerIds(), 
msg.operations(), msg.filters());
    +           }
    +           catch(Exception ex) {
    +                   fatalError("unable to accept offers", ex);
    +           }
    +   }
    +
    +   /**
    +    * Handle a task status change.
    +     */
    +   private void taskStatusUpdated(StatusUpdate message) {
    +           taskRouter.tell(message, self());
    +           reconciliationCoordinator.tell(message, self());
    +           schedulerDriver.acknowledgeStatusUpdate(message.status());
    +   }
    +
    +   /**
    +    * Accept the given started worker into the internal state.
    +    *
    +    * @param resourceID The worker resource id
    +    * @return A registered worker node record.
    +    */
    +   @Override
    +   protected RegisteredMesosWorkerNode workerStarted(ResourceID 
resourceID) {
    +           MesosWorkerStore.Worker inLaunch = 
workersInLaunch.remove(resourceID);
    +           if (inLaunch == null) {
    +                   // Worker was not in state "being launched", this can 
indicate that the TaskManager
    +                   // in this worker was already registered or that the 
container was not started
    +                   // by this resource manager. Simply ignore this 
resourceID.
    +                   return null;
    +           }
    +           return new RegisteredMesosWorkerNode(inLaunch);
    +   }
    +
    +   /**
    +    * Accept the given registered workers into the internal state.
    +    *
    +    * @param toConsolidate The worker IDs known previously to the 
JobManager.
    +    * @return A collection of registered worker node records.
    +     */
    +   @Override
    +   protected Collection<RegisteredMesosWorkerNode> 
reacceptRegisteredWorkers(Collection<ResourceID> toConsolidate) {
    +
    +           // we check for each task manager if we recognize its Mesos 
task ID
    +           List<RegisteredMesosWorkerNode> accepted = new 
ArrayList<>(toConsolidate.size());
    +           for (ResourceID resourceID : toConsolidate) {
    +                   MesosWorkerStore.Worker worker = 
workersInLaunch.remove(resourceID);
    +                   if (worker != null) {
    +                           LOG.info("Mesos worker consolidation recognizes 
TaskManager {}.", resourceID);
    +                           accepted.add(new 
RegisteredMesosWorkerNode(worker));
    +                   }
    +                   else {
    +                           if(isStarted(resourceID)) {
    +                                   LOG.info("TaskManager {} has already 
been registered at the resource manager.", resourceID);
    +                           }
    +                           else {
    +                                   LOG.info("Mesos worker consolidation 
does not recognize TaskManager {}.", resourceID);
    +                           }
    +                   }
    +           }
    +           return accepted;
    +   }
    +
    +   /**
    +    * Release the given pending worker.
    +    */
    +   @Override
    +   protected void releasePendingWorker(ResourceID id) {
    +           MesosWorkerStore.Worker worker = workersInLaunch.remove(id);
    +           if (worker != null) {
    +                   releaseWorker(worker);
    +           } else {
    +                   LOG.error("Cannot find worker {} to release. Ignoring 
request.", id);
    +           }
    +   }
    +
    +   /**
    +    * Release the given started worker.
    +    */
    +   @Override
    +   protected void releaseStartedWorker(RegisteredMesosWorkerNode worker) {
    +           releaseWorker(worker.task());
    +   }
    +
    +   /**
    +    * Plan for the removal of the given worker.
    +     */
    +   private void releaseWorker(MesosWorkerStore.Worker worker) {
    +           try {
    +                   LOG.info("Releasing worker {}", worker.taskID());
    +
    +                   // update persistent state of worker to Released
    +                   worker = worker.releaseTask();
    +                   workerStore.putWorker(worker);
    +                   
workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
    +                   taskRouter.tell(new 
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
    +
    +                   if (worker.hostname().isDefined()) {
    +                           // tell the launch coordinator that the task is 
being unassigned from the host, for planning purposes
    +                           launchCoordinator.tell(new 
LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), self());
    +                   }
    +           }
    +           catch (Exception ex) {
    +                   fatalError("unable to release worker", ex);
    +           }
    +   }
    +
    +   @Override
    +   protected int getNumWorkerRequestsPending() {
    +           return workersInNew.size();
    +   }
    +
    +   @Override
    +   protected int getNumWorkersPendingRegistration() {
    +           return workersInLaunch.size();
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Callbacks from the Mesos Master
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Called when connected to Mesos as a new framework.
    +    */
    +   private void registered(Registered message) {
    +           connectionMonitor.tell(message, self());
    +
    +           try {
    +                   
workerStore.setFrameworkID(Option.apply(message.frameworkId()));
    +           }
    +           catch(Exception ex) {
    +                   fatalError("unable to store the assigned framework ID", 
ex);
    +                   return;
    +           }
    +
    +           launchCoordinator.tell(message, self());
    +           reconciliationCoordinator.tell(message, self());
    +           taskRouter.tell(message, self());
    +   }
    +
    +   /**
    +    * Called when reconnected to Mesos following a failover event.
    +    */
    +   private void reregistered(ReRegistered message) {
    +           connectionMonitor.tell(message, self());
    +           launchCoordinator.tell(message, self());
    +           reconciliationCoordinator.tell(message, self());
    +           taskRouter.tell(message, self());
    +   }
    +
    +   /**
    +    * Called when disconnected from Mesos.
    +    */
    +   private void disconnected(Disconnected message) {
    +           connectionMonitor.tell(message, self());
    +           launchCoordinator.tell(message, self());
    +           reconciliationCoordinator.tell(message, self());
    +           taskRouter.tell(message, self());
    +   }
    +
    +   /**
    +    * Called when an error is reported by the scheduler callback.
    +    */
    +   private void error(String message) {
    +           self().tell(new FatalErrorOccurred("Connection to Mesos 
failed", new Exception(message)), self());
    +   }
    +
    +   /**
    +    * Invoked when a Mesos task reaches a terminal status.
    +     */
    +   private void taskTerminated(Protos.TaskID taskID, Protos.TaskStatus 
status) {
    +           // this callback occurs for failed containers and for released 
containers alike
    +
    +           final ResourceID id = extractResourceID(taskID);
    +
    +           try {
    +                   workerStore.removeWorker(taskID);
    +           }
    +           catch(Exception ex) {
    +                   fatalError("unable to remove worker", ex);
    +                   return;
    +           }
    +
    +           // check if this is a failed task or a released task
    +           if (workersBeingReturned.remove(id) != null) {
    +                   // regular finished worker that we released
    +                   LOG.info("Worker {} finished successfully with 
diagnostics: {}",
    +                           id, status.getMessage());
    +           } else {
    +                   // failed worker, either at startup, or running
    +                   final MesosWorkerStore.Worker launched = 
workersInLaunch.remove(id);
    +                   if (launched != null) {
    +                           LOG.info("Mesos task {} failed, with a 
TaskManager in launch or registration. " +
    +                                   "State: {} Reason: {} ({})", id, 
status.getState(), status.getReason(), status.getMessage());
    +                           // we will trigger re-acquiring new workers at 
the end
    +                   } else {
    +                           // failed registered worker
    +                           LOG.info("Mesos task {} failed, with a 
registered TaskManager. " +
    +                                   "State: {} Reason: {} ({})", id, 
status.getState(), status.getReason(), status.getMessage());
    +
    +                           // notify the generic logic, which notifies the 
JobManager, etc.
    +                           notifyWorkerFailed(id, "Mesos task " + id + " 
failed.  State: " + status.getState());
    +                   }
    +
    +                   // general failure logging
    +                   failedTasksSoFar++;
    +
    +                   String diagMessage = String.format("Diagnostics for 
task %s in state %s : " +
    +                                   "reason=%s message=%s",
    +                           id, status.getState(), status.getReason(), 
status.getMessage());
    +                   sendInfoMessage(diagMessage);
    +
    +                   LOG.info(diagMessage);
    +                   LOG.info("Total number of failed tasks so far: " + 
failedTasksSoFar);
    +
    +                   // maxFailedTasks == -1 is infinite number of retries.
    +                   if (maxFailedTasks >= 0 && failedTasksSoFar > 
maxFailedTasks) {
    +                           String msg = "Stopping Mesos session because 
the number of failed tasks ("
    +                                   + failedTasksSoFar + ") exceeded the 
maximum failed tasks ("
    +                                   + maxFailedTasks + "). This number is 
controlled by the '"
    +                                   + 
ConfigConstants.MESOS_MAX_FAILED_TASKS + "' configuration setting. "
    +                                   + "By default its the number of 
requested tasks.";
    +
    +                           LOG.error(msg);
    +                           self().tell(decorateMessage(new 
StopCluster(ApplicationStatus.FAILED, msg)),
    +                                   ActorRef.noSender());
    +
    +                           // no need to do anything else
    +                           return;
    +                   }
    +           }
    +
    +           // in case failed containers were among the finished 
containers, make
    +           // sure we re-examine and request new ones
    +           triggerCheckWorkers();
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Utilities
    +   // 
------------------------------------------------------------------------
    +
    +   private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID 
taskID) {
    +           LaunchableMesosWorker launchable =
    +                   new LaunchableMesosWorker(taskManagerParameters, 
taskManagerLaunchContext, taskID);
    +           return launchable;
    --- End diff --
    
    Could be just `return new LaunchableMesosWorker(taskManagerParameters, 
taskManagerLaunchContext, taskID);`.


> Integrate Flink with Apache Mesos
> ---------------------------------
>
>                 Key: FLINK-1984
>                 URL: https://issues.apache.org/jira/browse/FLINK-1984
>             Project: Flink
>          Issue Type: New Feature
>          Components: Cluster Management
>            Reporter: Robert Metzger
>            Assignee: Eron Wright 
>            Priority: Minor
>         Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: 
> https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent 
> ResourceManager work.
> Design document:  ([google 
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to