Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2315#discussion_r75080549
  
    --- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
    @@ -0,0 +1,755 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.runtime.clusterframework;
    +
    +import akka.actor.ActorRef;
    +import akka.actor.Props;
    +import com.netflix.fenzo.TaskRequest;
    +import com.netflix.fenzo.TaskScheduler;
    +import com.netflix.fenzo.VirtualMachineLease;
    +import com.netflix.fenzo.functions.Action1;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
    +import org.apache.flink.mesos.scheduler.ConnectionMonitor;
    +import org.apache.flink.mesos.scheduler.LaunchableTask;
    +import org.apache.flink.mesos.scheduler.LaunchCoordinator;
    +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
    +import org.apache.flink.mesos.scheduler.SchedulerProxy;
    +import org.apache.flink.mesos.scheduler.TaskMonitor;
    +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
    +import org.apache.flink.mesos.scheduler.Tasks;
    +import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
    +import org.apache.flink.mesos.scheduler.messages.Disconnected;
    +import org.apache.flink.mesos.scheduler.messages.Error;
    +import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
    +import org.apache.flink.mesos.scheduler.messages.ReRegistered;
    +import org.apache.flink.mesos.scheduler.messages.Registered;
    +import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
    +import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
    +import org.apache.flink.mesos.util.MesosConfiguration;
    +import org.apache.flink.runtime.clusterframework.ApplicationStatus;
    +import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
    +import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
    +import org.apache.flink.runtime.clusterframework.messages.StopCluster;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.mesos.Protos;
    +import org.apache.mesos.Protos.FrameworkInfo;
    +import org.apache.mesos.SchedulerDriver;
    +import org.slf4j.Logger;
    +import scala.Option;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +/**
    + * Flink Resource Manager for Apache Mesos.
    + */
    +public class MesosFlinkResourceManager extends 
FlinkResourceManager<RegisteredMesosWorkerNode> {
    +
    +   /** The Mesos configuration (master and framework info) */
    +   private final MesosConfiguration mesosConfig;
    +
    +   /** The TaskManager container parameters (like container memory size) */
    +   private final MesosTaskManagerParameters taskManagerParameters;
    +
    +   /** Context information used to start a TaskManager Java process */
    +   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
    +
    +   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
    +   private final int maxFailedTasks;
    +
    +   /** Callback handler for the asynchronous Mesos scheduler */
    +   private SchedulerProxy schedulerCallbackHandler;
    +
    +   /** Mesos scheduler driver */
    +   private SchedulerDriver schedulerDriver;
    +
    +   private ActorRef connectionMonitor;
    +
    +   private ActorRef taskRouter;
    +
    +   private ActorRef launchCoordinator;
    +
    +   private ActorRef reconciliationCoordinator;
    +
    +   private MesosWorkerStore workerStore;
    +
    +   final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
    +   final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
    +   final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
    +
    +   /** The number of failed tasks since the master became active */
    +   private int failedTasksSoFar;
    +
    +   public MesosFlinkResourceManager(
    +           Configuration flinkConfig,
    +           MesosConfiguration mesosConfig,
    +           MesosWorkerStore workerStore,
    +           LeaderRetrievalService leaderRetrievalService,
    +           MesosTaskManagerParameters taskManagerParameters,
    +           Protos.TaskInfo.Builder taskManagerLaunchContext,
    +           int maxFailedTasks,
    +           int numInitialTaskManagers) {
    +
    +           super(numInitialTaskManagers, flinkConfig, 
leaderRetrievalService);
    +
    +           this.mesosConfig = requireNonNull(mesosConfig);
    +
    +           this.workerStore = requireNonNull(workerStore);
    +
    +           this.taskManagerParameters = 
requireNonNull(taskManagerParameters);
    +           this.taskManagerLaunchContext = 
requireNonNull(taskManagerLaunchContext);
    +           this.maxFailedTasks = maxFailedTasks;
    +
    +           this.workersInNew = new HashMap<>();
    +           this.workersInLaunch = new HashMap<>();
    +           this.workersBeingReturned = new HashMap<>();
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Mesos-specific behavior
    +   // 
------------------------------------------------------------------------
    +
    +   @Override
    +   protected void initialize() throws Exception {
    +           LOG.info("Initializing Mesos resource master");
    +
    +           workerStore.start();
    +
    +           // create the scheduler driver to communicate with Mesos
    +           schedulerCallbackHandler = new SchedulerProxy(self());
    +
    +           // register with Mesos
    +           FrameworkInfo.Builder frameworkInfo = 
mesosConfig.frameworkInfo()
    +                   .clone()
    +                   .setCheckpoint(true);
    +
    +           Option<Protos.FrameworkID> frameworkID = 
workerStore.getFrameworkID();
    +           if(frameworkID.isEmpty()) {
    +                   LOG.info("Registering as new framework.");
    +           }
    +           else {
    +                   LOG.info("Recovery scenario: re-registering using 
framework ID {}.", frameworkID.get().getValue());
    +                   frameworkInfo.setId(frameworkID.get());
    +           }
    +
    +           MesosConfiguration initializedMesosConfig = 
mesosConfig.withFrameworkInfo(frameworkInfo);
    +           MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
    +           schedulerDriver = 
initializedMesosConfig.createDriver(schedulerCallbackHandler, false);
    +
    +           // create supporting actors
    +           connectionMonitor = createConnectionMonitor();
    +           launchCoordinator = createLaunchCoordinator();
    +           reconciliationCoordinator = createReconciliationCoordinator();
    +           taskRouter = createTaskRouter();
    +
    +           recoverWorkers();
    +
    +           connectionMonitor.tell(new ConnectionMonitor.Start(), self());
    +           schedulerDriver.start();
    +   }
    +
    +   protected ActorRef createConnectionMonitor() {
    +           return context().actorOf(
    +                   
ConnectionMonitor.createActorProps(ConnectionMonitor.class, config),
    +                   "connectionMonitor");
    +   }
    +
    +   protected ActorRef createTaskRouter() {
    +           return context().actorOf(
    +                   Tasks.createActorProps(Tasks.class, config, 
schedulerDriver, TaskMonitor.class),
    +                   "tasks");
    +   }
    +
    +   protected ActorRef createLaunchCoordinator() {
    +           return context().actorOf(
    +                   
LaunchCoordinator.createActorProps(LaunchCoordinator.class, self(), config, 
schedulerDriver, createOptimizer()),
    +                   "launchCoordinator");
    +   }
    +
    +   protected ActorRef createReconciliationCoordinator() {
    +           return context().actorOf(
    +                   
ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, 
config, schedulerDriver),
    +                   "reconciliationCoordinator");
    +   }
    +
    +   @Override
    +   public void postStop() {
    +           LOG.info("Stopping Mesos resource master");
    +           super.postStop();
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Actor messages
    +   // 
------------------------------------------------------------------------
    +
    +   @Override
    +   protected void handleMessage(Object message) {
    +
    +           // check for Mesos-specific actor messages first
    +
    +           // --- messages about Mesos connection
    +           if (message instanceof Registered) {
    +                   registered((Registered) message);
    +           } else if (message instanceof ReRegistered) {
    +                   reregistered((ReRegistered) message);
    +           } else if (message instanceof Disconnected) {
    +                   disconnected((Disconnected) message);
    +           } else if (message instanceof Error) {
    +                   error(((Error) message).message());
    +
    +           // --- messages about offers
    +           } else if (message instanceof ResourceOffers || message 
instanceof OfferRescinded) {
    +                   launchCoordinator.tell(message, self());
    +           } else if (message instanceof AcceptOffers) {
    +                   acceptOffers((AcceptOffers) message);
    +
    +           // --- messages about tasks
    +           } else if (message instanceof StatusUpdate) {
    +                   taskStatusUpdated((StatusUpdate) message);
    +           } else if (message instanceof 
ReconciliationCoordinator.Reconcile) {
    +                   // a reconciliation request from a task
    +                   reconciliationCoordinator.tell(message, self());
    +           } else if (message instanceof TaskMonitor.TaskTerminated) {
    +                   // a termination message from a task
    +                   TaskMonitor.TaskTerminated msg = 
(TaskMonitor.TaskTerminated) message;
    +                   taskTerminated(msg.taskID(), msg.status());
    +
    +           } else  {
    +                   // message handled by the generic resource master code
    +                   super.handleMessage(message);
    +           }
    +   }
    +
    +   /**
    +    * Called to shut down the cluster (not a failover situation).
    +    *
    +    * @param finalStatus The application status to report.
    +    * @param optionalDiagnostics An optional diagnostics message.
    +     */
    +   @Override
    +   protected void shutdownApplication(ApplicationStatus finalStatus, 
String optionalDiagnostics) {
    +
    +           LOG.info("Shutting down and unregistering as a Mesos 
framework.");
    +           try {
    +                   // unregister the framework, which implicitly removes 
all tasks.
    +                   schedulerDriver.stop(false);
    +           }
    +           catch(Exception ex) {
    +                   LOG.warn("unable to unregister the framework", ex);
    +           }
    +
    +           try {
    +                   workerStore.cleanup();
    +           }
    +           catch(Exception ex) {
    +                   LOG.warn("unable to cleanup the ZooKeeper state", ex);
    +           }
    +
    +           context().stop(self());
    +   }
    +
    +   @Override
    +   protected void fatalError(String message, Throwable error) {
    +           // we do not unregister, but cause a hard fail of this process, 
to have it
    +           // restarted by the dispatcher
    +           LOG.error("FATAL ERROR IN MESOS APPLICATION MASTER: " + 
message, error);
    +           LOG.error("Shutting down process");
    +
    +           // kill this process, this will make an external supervisor 
(the dispatcher) restart the process
    +           System.exit(EXIT_CODE_FATAL_ERROR);
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Worker Management
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Recover framework/worker information persisted by a prior 
incarnation of the RM.
    +    */
    +   private void recoverWorkers() throws Exception {
    +           // if this application master starts as part of an 
ApplicationMaster/JobManager recovery,
    +           // then some worker tasks are most likely still alive and we 
can re-obtain them
    +           final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = 
workerStore.recoverWorkers();
    +
    +           if (!tasksFromPreviousAttempts.isEmpty()) {
    +                   LOG.info("Retrieved {} TaskManagers from previous 
attempt", tasksFromPreviousAttempts.size());
    +
    +                   List<Tuple2<TaskRequest,String>> toAssign = new 
ArrayList<>(tasksFromPreviousAttempts.size());
    +                   List<LaunchableTask> toLaunch = new 
ArrayList<>(tasksFromPreviousAttempts.size());
    +
    +                   for (final MesosWorkerStore.Worker worker : 
tasksFromPreviousAttempts) {
    +                           LaunchableMesosWorker launchable = 
createLaunchableMesosWorker(worker.taskID());
    +
    +                           switch(worker.state()) {
    +                                   case New:
    +                                           
workersInNew.put(extractResourceID(worker.taskID()), worker);
    +                                           toLaunch.add(launchable);
    +                                           break;
    +                                   case Launched:
    +                                           
workersInLaunch.put(extractResourceID(worker.taskID()), worker);
    +                                           toAssign.add(new 
Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
    +                                           break;
    +                                   case Released:
    +                                           
workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
    +                                           break;
    +                           }
    +                           taskRouter.tell(new 
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
    +                   }
    +
    +                   // tell the launch coordinator about prior assignments
    +                   if(toAssign.size() >= 1) {
    +                           launchCoordinator.tell(new 
LaunchCoordinator.Assign(toAssign), self());
    +                   }
    +                   // tell the launch coordinator to launch any new tasks
    +                   if(toLaunch.size() >= 1) {
    +                           launchCoordinator.tell(new 
LaunchCoordinator.Launch(toLaunch), self());
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Plan for some additional workers to be launched.
    +    *
    +    * @param numWorkers The number of workers to allocate.
    +     */
    +   @Override
    +   protected void requestNewWorkers(int numWorkers) {
    +
    +           try {
    +                   List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new 
ArrayList<>(numWorkers);
    +                   List<LaunchableTask> toLaunch = new 
ArrayList<>(numWorkers);
    +
    +                   // generate new workers into persistent state and 
launch associated actors
    +                   for (int i = 0; i < numWorkers; i++) {
    +                           MesosWorkerStore.Worker worker = 
MesosWorkerStore.Worker.newTask(workerStore.newTaskID());
    --- End diff --
    
    But here it is not really necessary to deal with tasks, right? On the 
RM-level we could simply treat them as workers and, thus, rename the method to 
`newWorker`, for example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to