Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2315#discussion_r74971461
--- Diff:
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends
FlinkResourceManager<RegisteredMesosWorkerNode> {
+
+ /** The Mesos configuration (master and framework info) */
+ private final MesosConfiguration mesosConfig;
+
+ /** The TaskManager container parameters (like container memory size) */
+ private final MesosTaskManagerParameters taskManagerParameters;
+
+ /** Context information used to start a TaskManager Java process */
+ private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+ /** Number of failed Mesos tasks before stopping the application. -1
means infinite. */
+ private final int maxFailedTasks;
+
+ /** Callback handler for the asynchronous Mesos scheduler */
+ private SchedulerProxy schedulerCallbackHandler;
+
+ /** Mesos scheduler driver */
+ private SchedulerDriver schedulerDriver;
+
+ private ActorRef connectionMonitor;
+
+ private ActorRef taskRouter;
+
+ private ActorRef launchCoordinator;
+
+ private ActorRef reconciliationCoordinator;
+
+ private MesosWorkerStore workerStore;
+
+ final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
+ final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
+ final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
+
+ /** The number of failed tasks since the master became active */
+ private int failedTasksSoFar;
+
+ public MesosFlinkResourceManager(
+ Configuration flinkConfig,
+ MesosConfiguration mesosConfig,
+ MesosWorkerStore workerStore,
+ LeaderRetrievalService leaderRetrievalService,
+ MesosTaskManagerParameters taskManagerParameters,
+ Protos.TaskInfo.Builder taskManagerLaunchContext,
+ int maxFailedTasks,
+ int numInitialTaskManagers) {
+
+ super(numInitialTaskManagers, flinkConfig,
leaderRetrievalService);
+
+ this.mesosConfig = requireNonNull(mesosConfig);
+
+ this.workerStore = requireNonNull(workerStore);
+
+ this.taskManagerParameters =
requireNonNull(taskManagerParameters);
+ this.taskManagerLaunchContext =
requireNonNull(taskManagerLaunchContext);
+ this.maxFailedTasks = maxFailedTasks;
+
+ this.workersInNew = new HashMap<>();
+ this.workersInLaunch = new HashMap<>();
+ this.workersBeingReturned = new HashMap<>();
+ }
+
+ //
------------------------------------------------------------------------
+ // Mesos-specific behavior
+ //
------------------------------------------------------------------------
+
+ @Override
+ protected void initialize() throws Exception {
+ LOG.info("Initializing Mesos resource master");
+
+ workerStore.start();
+
+ // create the scheduler driver to communicate with Mesos
+ schedulerCallbackHandler = new SchedulerProxy(self());
+
+ // register with Mesos
+ FrameworkInfo.Builder frameworkInfo =
mesosConfig.frameworkInfo()
+ .clone()
+ .setCheckpoint(true);
+
+ Option<Protos.FrameworkID> frameworkID =
workerStore.getFrameworkID();
+ if(frameworkID.isEmpty()) {
+ LOG.info("Registering as new framework.");
+ }
+ else {
+ LOG.info("Recovery scenario: re-registering using
framework ID {}.", frameworkID.get().getValue());
+ frameworkInfo.setId(frameworkID.get());
+ }
+
+ MesosConfiguration initializedMesosConfig =
mesosConfig.withFrameworkInfo(frameworkInfo);
+ MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
+ schedulerDriver =
initializedMesosConfig.createDriver(schedulerCallbackHandler, false);
+
+ // create supporting actors
+ connectionMonitor = createConnectionMonitor();
+ launchCoordinator = createLaunchCoordinator();
+ reconciliationCoordinator = createReconciliationCoordinator();
+ taskRouter = createTaskRouter();
+
+ recoverWorkers();
+
+ connectionMonitor.tell(new ConnectionMonitor.Start(), self());
+ schedulerDriver.start();
+ }
+
+ protected ActorRef createConnectionMonitor() {
+ return context().actorOf(
+
ConnectionMonitor.createActorProps(ConnectionMonitor.class, config),
+ "connectionMonitor");
+ }
+
+ protected ActorRef createTaskRouter() {
+ return context().actorOf(
+ Tasks.createActorProps(Tasks.class, config,
schedulerDriver, TaskMonitor.class),
+ "tasks");
+ }
+
+ protected ActorRef createLaunchCoordinator() {
+ return context().actorOf(
+
LaunchCoordinator.createActorProps(LaunchCoordinator.class, self(), config,
schedulerDriver, createOptimizer()),
+ "launchCoordinator");
+ }
+
+ protected ActorRef createReconciliationCoordinator() {
+ return context().actorOf(
+
ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class,
config, schedulerDriver),
+ "reconciliationCoordinator");
+ }
+
+ @Override
+ public void postStop() {
+ LOG.info("Stopping Mesos resource master");
+ super.postStop();
+ }
+
+ //
------------------------------------------------------------------------
+ // Actor messages
+ //
------------------------------------------------------------------------
+
+ @Override
+ protected void handleMessage(Object message) {
+
+ // check for Mesos-specific actor messages first
+
+ // --- messages about Mesos connection
+ if (message instanceof Registered) {
+ registered((Registered) message);
+ } else if (message instanceof ReRegistered) {
+ reregistered((ReRegistered) message);
+ } else if (message instanceof Disconnected) {
+ disconnected((Disconnected) message);
+ } else if (message instanceof Error) {
+ error(((Error) message).message());
+
+ // --- messages about offers
+ } else if (message instanceof ResourceOffers || message
instanceof OfferRescinded) {
+ launchCoordinator.tell(message, self());
+ } else if (message instanceof AcceptOffers) {
+ acceptOffers((AcceptOffers) message);
+
+ // --- messages about tasks
+ } else if (message instanceof StatusUpdate) {
+ taskStatusUpdated((StatusUpdate) message);
+ } else if (message instanceof
ReconciliationCoordinator.Reconcile) {
+ // a reconciliation request from a task
+ reconciliationCoordinator.tell(message, self());
+ } else if (message instanceof TaskMonitor.TaskTerminated) {
+ // a termination message from a task
+ TaskMonitor.TaskTerminated msg =
(TaskMonitor.TaskTerminated) message;
+ taskTerminated(msg.taskID(), msg.status());
+
+ } else {
+ // message handled by the generic resource master code
+ super.handleMessage(message);
+ }
+ }
+
+ /**
+ * Called to shut down the cluster (not a failover situation).
+ *
+ * @param finalStatus The application status to report.
+ * @param optionalDiagnostics An optional diagnostics message.
+ */
+ @Override
+ protected void shutdownApplication(ApplicationStatus finalStatus,
String optionalDiagnostics) {
+
+ LOG.info("Shutting down and unregistering as a Mesos
framework.");
+ try {
+ // unregister the framework, which implicitly removes
all tasks.
+ schedulerDriver.stop(false);
+ }
+ catch(Exception ex) {
+ LOG.warn("unable to unregister the framework", ex);
+ }
+
+ try {
+ workerStore.cleanup();
+ }
+ catch(Exception ex) {
+ LOG.warn("unable to cleanup the ZooKeeper state", ex);
+ }
+
+ context().stop(self());
+ }
+
+ @Override
+ protected void fatalError(String message, Throwable error) {
+ // we do not unregister, but cause a hard fail of this process,
to have it
+ // restarted by the dispatcher
+ LOG.error("FATAL ERROR IN MESOS APPLICATION MASTER: " +
message, error);
+ LOG.error("Shutting down process");
+
+ // kill this process, this will make an external supervisor
(the dispatcher) restart the process
+ System.exit(EXIT_CODE_FATAL_ERROR);
+ }
+
+ //
------------------------------------------------------------------------
+ // Worker Management
+ //
------------------------------------------------------------------------
+
+ /**
+ * Recover framework/worker information persisted by a prior
incarnation of the RM.
+ */
+ private void recoverWorkers() throws Exception {
+ // if this application master starts as part of an
ApplicationMaster/JobManager recovery,
+ // then some worker tasks are most likely still alive and we
can re-obtain them
+ final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts =
workerStore.recoverWorkers();
+
+ if (!tasksFromPreviousAttempts.isEmpty()) {
+ LOG.info("Retrieved {} TaskManagers from previous
attempt", tasksFromPreviousAttempts.size());
+
+ List<Tuple2<TaskRequest,String>> toAssign = new
ArrayList<>(tasksFromPreviousAttempts.size());
+ List<LaunchableTask> toLaunch = new
ArrayList<>(tasksFromPreviousAttempts.size());
+
+ for (final MesosWorkerStore.Worker worker :
tasksFromPreviousAttempts) {
+ LaunchableMesosWorker launchable =
createLaunchableMesosWorker(worker.taskID());
+
+ switch(worker.state()) {
+ case New:
+
workersInNew.put(extractResourceID(worker.taskID()), worker);
+ toLaunch.add(launchable);
+ break;
+ case Launched:
+
workersInLaunch.put(extractResourceID(worker.taskID()), worker);
+ toAssign.add(new
Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
+ break;
+ case Released:
+
workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
+ break;
+ }
+ taskRouter.tell(new
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
+ }
+
+ // tell the launch coordinator about prior assignments
+ if(toAssign.size() >= 1) {
+ launchCoordinator.tell(new
LaunchCoordinator.Assign(toAssign), self());
+ }
+ // tell the launch coordinator to launch any new tasks
+ if(toLaunch.size() >= 1) {
+ launchCoordinator.tell(new
LaunchCoordinator.Launch(toLaunch), self());
+ }
+ }
+ }
+
+ /**
+ * Plan for some additional workers to be launched.
+ *
+ * @param numWorkers The number of workers to allocate.
+ */
+ @Override
+ protected void requestNewWorkers(int numWorkers) {
+
+ try {
+ List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new
ArrayList<>(numWorkers);
+ List<LaunchableTask> toLaunch = new
ArrayList<>(numWorkers);
+
+ // generate new workers into persistent state and
launch associated actors
+ for (int i = 0; i < numWorkers; i++) {
+ MesosWorkerStore.Worker worker =
MesosWorkerStore.Worker.newTask(workerStore.newTaskID());
+ workerStore.putWorker(worker);
+
workersInNew.put(extractResourceID(worker.taskID()), worker);
+
+ LaunchableMesosWorker launchable =
createLaunchableMesosWorker(worker.taskID());
+
+ LOG.info("Scheduling Mesos task {} with ({} MB,
{} cpus).",
+ launchable.taskID().getValue(),
launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs());
+
+ toMonitor.add(new
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
+ toLaunch.add(launchable);
+ }
+
+ // tell the task router about the new plans
+ for (TaskMonitor.TaskGoalStateUpdated update :
toMonitor) {
+ taskRouter.tell(update, self());
+ }
+
+ // tell the launch coordinator to launch the new tasks
+ if(toLaunch.size() >= 1) {
+ launchCoordinator.tell(new
LaunchCoordinator.Launch(toLaunch), self());
+ }
+ }
+ catch(Exception ex) {
+ fatalError("unable to request new workers", ex);
+ }
+ }
+
+ /**
+ * Accept offers as advised by the launch coordinator.
+ *
+ * Acceptance is routed through the RM to update the persistent state
before
+ * forwarding the message to Mesos.
+ */
+ private void acceptOffers(AcceptOffers msg) {
+
+ try {
+ List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new
ArrayList<>(msg.operations().size());
+
+ // transition the persistent state of some tasks to
Launched
+ for (Protos.Offer.Operation op : msg.operations()) {
+ if (op.getType() !=
Protos.Offer.Operation.Type.LAUNCH) {
+ continue;
+ }
+ for (Protos.TaskInfo info :
op.getLaunch().getTaskInfosList()) {
+ MesosWorkerStore.Worker worker =
workersInNew.remove(extractResourceID(info.getTaskId()));
+ assert (worker != null);
+
+ worker =
worker.launchTask(info.getSlaveId(), msg.hostname());
+ workerStore.putWorker(worker);
+
workersInLaunch.put(extractResourceID(worker.taskID()), worker);
+
+ LOG.info("Launching Mesos task {} on
host {}.",
+ worker.taskID().getValue(),
worker.hostname().get());
+
+ toMonitor.add(new
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
+ }
+ }
+
+ // tell the task router about the new plans
+ for (TaskMonitor.TaskGoalStateUpdated update :
toMonitor) {
+ taskRouter.tell(update, self());
+ }
+
+ // send the acceptance message to Mesos
+ schedulerDriver.acceptOffers(msg.offerIds(),
msg.operations(), msg.filters());
+ }
+ catch(Exception ex) {
+ fatalError("unable to accept offers", ex);
+ }
+ }
+
+ /**
+ * Handle a task status change.
+ */
+ private void taskStatusUpdated(StatusUpdate message) {
+ taskRouter.tell(message, self());
+ reconciliationCoordinator.tell(message, self());
+ schedulerDriver.acknowledgeStatusUpdate(message.status());
+ }
+
+ /**
+ * Accept the given started worker into the internal state.
+ *
+ * @param resourceID The worker resource id
+ * @return A registered worker node record.
+ */
+ @Override
+ protected RegisteredMesosWorkerNode workerStarted(ResourceID
resourceID) {
+ MesosWorkerStore.Worker inLaunch =
workersInLaunch.remove(resourceID);
+ if (inLaunch == null) {
+ // Worker was not in state "being launched", this can
indicate that the TaskManager
+ // in this worker was already registered or that the
container was not started
+ // by this resource manager. Simply ignore this
resourceID.
+ return null;
+ }
+ return new RegisteredMesosWorkerNode(inLaunch);
+ }
+
+ /**
+ * Accept the given registered workers into the internal state.
+ *
+ * @param toConsolidate The worker IDs known previously to the
JobManager.
+ * @return A collection of registered worker node records.
+ */
+ @Override
+ protected Collection<RegisteredMesosWorkerNode>
reacceptRegisteredWorkers(Collection<ResourceID> toConsolidate) {
+
+ // we check for each task manager if we recognize its Mesos
task ID
+ List<RegisteredMesosWorkerNode> accepted = new
ArrayList<>(toConsolidate.size());
+ for (ResourceID resourceID : toConsolidate) {
+ MesosWorkerStore.Worker worker =
workersInLaunch.remove(resourceID);
+ if (worker != null) {
+ LOG.info("Mesos worker consolidation recognizes
TaskManager {}.", resourceID);
+ accepted.add(new
RegisteredMesosWorkerNode(worker));
+ }
+ else {
+ if(isStarted(resourceID)) {
+ LOG.info("TaskManager {} has already
been registered at the resource manager.", resourceID);
+ }
+ else {
+ LOG.info("Mesos worker consolidation
does not recognize TaskManager {}.", resourceID);
+ }
+ }
+ }
+ return accepted;
+ }
+
+ /**
+ * Release the given pending worker.
+ */
+ @Override
+ protected void releasePendingWorker(ResourceID id) {
+ MesosWorkerStore.Worker worker = workersInLaunch.remove(id);
+ if (worker != null) {
+ releaseWorker(worker);
+ } else {
+ LOG.error("Cannot find worker {} to release. Ignoring
request.", id);
+ }
+ }
+
+ /**
+ * Release the given started worker.
+ */
+ @Override
+ protected void releaseStartedWorker(RegisteredMesosWorkerNode worker) {
+ releaseWorker(worker.task());
+ }
+
+ /**
+ * Plan for the removal of the given worker.
+ */
+ private void releaseWorker(MesosWorkerStore.Worker worker) {
+ try {
+ LOG.info("Releasing worker {}", worker.taskID());
+
+ // update persistent state of worker to Released
+ worker = worker.releaseTask();
+ workerStore.putWorker(worker);
+
workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
+ taskRouter.tell(new
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
+
+ if (worker.hostname().isDefined()) {
+ // tell the launch coordinator that the task is
being unassigned from the host, for planning purposes
+ launchCoordinator.tell(new
LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), self());
+ }
+ }
+ catch (Exception ex) {
+ fatalError("unable to release worker", ex);
+ }
+ }
+
+ @Override
+ protected int getNumWorkerRequestsPending() {
+ return workersInNew.size();
+ }
+
+ @Override
+ protected int getNumWorkersPendingRegistration() {
+ return workersInLaunch.size();
+ }
+
+ //
------------------------------------------------------------------------
+ // Callbacks from the Mesos Master
+ //
------------------------------------------------------------------------
+
+ /**
+ * Called when connected to Mesos as a new framework.
+ */
+ private void registered(Registered message) {
+ connectionMonitor.tell(message, self());
+
+ try {
+
workerStore.setFrameworkID(Option.apply(message.frameworkId()));
+ }
+ catch(Exception ex) {
+ fatalError("unable to store the assigned framework ID",
ex);
+ return;
+ }
+
+ launchCoordinator.tell(message, self());
+ reconciliationCoordinator.tell(message, self());
+ taskRouter.tell(message, self());
+ }
+
+ /**
+ * Called when reconnected to Mesos following a failover event.
+ */
+ private void reregistered(ReRegistered message) {
+ connectionMonitor.tell(message, self());
+ launchCoordinator.tell(message, self());
+ reconciliationCoordinator.tell(message, self());
+ taskRouter.tell(message, self());
+ }
+
+ /**
+ * Called when disconnected from Mesos.
+ */
+ private void disconnected(Disconnected message) {
+ connectionMonitor.tell(message, self());
+ launchCoordinator.tell(message, self());
+ reconciliationCoordinator.tell(message, self());
+ taskRouter.tell(message, self());
+ }
+
+ /**
+ * Called when an error is reported by the scheduler callback.
+ */
+ private void error(String message) {
+ self().tell(new FatalErrorOccurred("Connection to Mesos
failed", new Exception(message)), self());
+ }
+
+ /**
+ * Invoked when a Mesos task reaches a terminal status.
+ */
+ private void taskTerminated(Protos.TaskID taskID, Protos.TaskStatus
status) {
+ // this callback occurs for failed containers and for released
containers alike
+
+ final ResourceID id = extractResourceID(taskID);
+
+ try {
+ workerStore.removeWorker(taskID);
+ }
+ catch(Exception ex) {
+ fatalError("unable to remove worker", ex);
+ return;
+ }
+
+ // check if this is a failed task or a released task
+ if (workersBeingReturned.remove(id) != null) {
+ // regular finished worker that we released
+ LOG.info("Worker {} finished successfully with
diagnostics: {}",
+ id, status.getMessage());
+ } else {
+ // failed worker, either at startup, or running
+ final MesosWorkerStore.Worker launched =
workersInLaunch.remove(id);
+ if (launched != null) {
+ LOG.info("Mesos task {} failed, with a
TaskManager in launch or registration. " +
+ "State: {} Reason: {} ({})", id,
status.getState(), status.getReason(), status.getMessage());
+ // we will trigger re-acquiring new workers at
the end
+ } else {
+ // failed registered worker
+ LOG.info("Mesos task {} failed, with a
registered TaskManager. " +
+ "State: {} Reason: {} ({})", id,
status.getState(), status.getReason(), status.getMessage());
+
+ // notify the generic logic, which notifies the
JobManager, etc.
+ notifyWorkerFailed(id, "Mesos task " + id + "
failed. State: " + status.getState());
+ }
+
+ // general failure logging
+ failedTasksSoFar++;
+
+ String diagMessage = String.format("Diagnostics for
task %s in state %s : " +
+ "reason=%s message=%s",
+ id, status.getState(), status.getReason(),
status.getMessage());
+ sendInfoMessage(diagMessage);
+
+ LOG.info(diagMessage);
+ LOG.info("Total number of failed tasks so far: " +
failedTasksSoFar);
--- End diff --
`{}`-placeholder
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---