[
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422954#comment-15422954
]
ASF GitHub Bot commented on FLINK-1984:
---------------------------------------
Github user EronWright commented on a diff in the pull request:
https://github.com/apache/flink/pull/2315#discussion_r74966906
--- Diff:
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends
FlinkResourceManager<RegisteredMesosWorkerNode> {
+
+ /** The Mesos configuration (master and framework info) */
+ private final MesosConfiguration mesosConfig;
+
+ /** The TaskManager container parameters (like container memory size) */
+ private final MesosTaskManagerParameters taskManagerParameters;
+
+ /** Context information used to start a TaskManager Java process */
+ private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+ /** Number of failed Mesos tasks before stopping the application. -1
means infinite. */
+ private final int maxFailedTasks;
+
+ /** Callback handler for the asynchronous Mesos scheduler */
+ private SchedulerProxy schedulerCallbackHandler;
+
+ /** Mesos scheduler driver */
+ private SchedulerDriver schedulerDriver;
+
+ private ActorRef connectionMonitor;
+
+ private ActorRef taskRouter;
+
+ private ActorRef launchCoordinator;
+
+ private ActorRef reconciliationCoordinator;
+
+ private MesosWorkerStore workerStore;
+
+ final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
+ final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
+ final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
+
+ /** The number of failed tasks since the master became active */
+ private int failedTasksSoFar;
+
+ public MesosFlinkResourceManager(
+ Configuration flinkConfig,
+ MesosConfiguration mesosConfig,
+ MesosWorkerStore workerStore,
+ LeaderRetrievalService leaderRetrievalService,
+ MesosTaskManagerParameters taskManagerParameters,
+ Protos.TaskInfo.Builder taskManagerLaunchContext,
+ int maxFailedTasks,
+ int numInitialTaskManagers) {
+
+ super(numInitialTaskManagers, flinkConfig,
leaderRetrievalService);
+
+ this.mesosConfig = requireNonNull(mesosConfig);
+
+ this.workerStore = requireNonNull(workerStore);
+
+ this.taskManagerParameters =
requireNonNull(taskManagerParameters);
+ this.taskManagerLaunchContext =
requireNonNull(taskManagerLaunchContext);
+ this.maxFailedTasks = maxFailedTasks;
+
+ this.workersInNew = new HashMap<>();
+ this.workersInLaunch = new HashMap<>();
+ this.workersBeingReturned = new HashMap<>();
+ }
+
+ //
------------------------------------------------------------------------
+ // Mesos-specific behavior
+ //
------------------------------------------------------------------------
+
+ @Override
+ protected void initialize() throws Exception {
+ LOG.info("Initializing Mesos resource master");
+
+ workerStore.start();
+
+ // create the scheduler driver to communicate with Mesos
+ schedulerCallbackHandler = new SchedulerProxy(self());
+
+ // register with Mesos
+ FrameworkInfo.Builder frameworkInfo =
mesosConfig.frameworkInfo()
+ .clone()
+ .setCheckpoint(true);
+
+ Option<Protos.FrameworkID> frameworkID =
workerStore.getFrameworkID();
+ if(frameworkID.isEmpty()) {
+ LOG.info("Registering as new framework.");
+ }
+ else {
+ LOG.info("Recovery scenario: re-registering using
framework ID {}.", frameworkID.get().getValue());
+ frameworkInfo.setId(frameworkID.get());
+ }
+
+ MesosConfiguration initializedMesosConfig =
mesosConfig.withFrameworkInfo(frameworkInfo);
+ MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
+ schedulerDriver =
initializedMesosConfig.createDriver(schedulerCallbackHandler, false);
+
+ // create supporting actors
+ connectionMonitor = createConnectionMonitor();
+ launchCoordinator = createLaunchCoordinator();
+ reconciliationCoordinator = createReconciliationCoordinator();
+ taskRouter = createTaskRouter();
+
+ recoverWorkers();
+
+ connectionMonitor.tell(new ConnectionMonitor.Start(), self());
+ schedulerDriver.start();
+ }
+
+ protected ActorRef createConnectionMonitor() {
+ return context().actorOf(
+
ConnectionMonitor.createActorProps(ConnectionMonitor.class, config),
+ "connectionMonitor");
+ }
+
+ protected ActorRef createTaskRouter() {
+ return context().actorOf(
+ Tasks.createActorProps(Tasks.class, config,
schedulerDriver, TaskMonitor.class),
+ "tasks");
+ }
+
+ protected ActorRef createLaunchCoordinator() {
+ return context().actorOf(
+
LaunchCoordinator.createActorProps(LaunchCoordinator.class, self(), config,
schedulerDriver, createOptimizer()),
+ "launchCoordinator");
+ }
+
+ protected ActorRef createReconciliationCoordinator() {
+ return context().actorOf(
+
ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class,
config, schedulerDriver),
+ "reconciliationCoordinator");
+ }
+
+ @Override
+ public void postStop() {
+ LOG.info("Stopping Mesos resource master");
+ super.postStop();
+ }
+
+ //
------------------------------------------------------------------------
+ // Actor messages
+ //
------------------------------------------------------------------------
+
+ @Override
+ protected void handleMessage(Object message) {
+
+ // check for Mesos-specific actor messages first
+
+ // --- messages about Mesos connection
+ if (message instanceof Registered) {
+ registered((Registered) message);
+ } else if (message instanceof ReRegistered) {
+ reregistered((ReRegistered) message);
+ } else if (message instanceof Disconnected) {
+ disconnected((Disconnected) message);
+ } else if (message instanceof Error) {
+ error(((Error) message).message());
+
+ // --- messages about offers
+ } else if (message instanceof ResourceOffers || message
instanceof OfferRescinded) {
+ launchCoordinator.tell(message, self());
+ } else if (message instanceof AcceptOffers) {
+ acceptOffers((AcceptOffers) message);
+
+ // --- messages about tasks
+ } else if (message instanceof StatusUpdate) {
+ taskStatusUpdated((StatusUpdate) message);
+ } else if (message instanceof
ReconciliationCoordinator.Reconcile) {
+ // a reconciliation request from a task
+ reconciliationCoordinator.tell(message, self());
+ } else if (message instanceof TaskMonitor.TaskTerminated) {
+ // a termination message from a task
+ TaskMonitor.TaskTerminated msg =
(TaskMonitor.TaskTerminated) message;
+ taskTerminated(msg.taskID(), msg.status());
+
+ } else {
+ // message handled by the generic resource master code
+ super.handleMessage(message);
+ }
+ }
+
+ /**
+ * Called to shut down the cluster (not a failover situation).
+ *
+ * @param finalStatus The application status to report.
+ * @param optionalDiagnostics An optional diagnostics message.
+ */
+ @Override
+ protected void shutdownApplication(ApplicationStatus finalStatus,
String optionalDiagnostics) {
+
+ LOG.info("Shutting down and unregistering as a Mesos
framework.");
+ try {
+ // unregister the framework, which implicitly removes
all tasks.
+ schedulerDriver.stop(false);
+ }
+ catch(Exception ex) {
+ LOG.warn("unable to unregister the framework", ex);
+ }
+
+ try {
+ workerStore.cleanup();
+ }
+ catch(Exception ex) {
+ LOG.warn("unable to cleanup the ZooKeeper state", ex);
+ }
+
+ context().stop(self());
+ }
+
+ @Override
+ protected void fatalError(String message, Throwable error) {
+ // we do not unregister, but cause a hard fail of this process,
to have it
+ // restarted by the dispatcher
+ LOG.error("FATAL ERROR IN MESOS APPLICATION MASTER: " +
message, error);
+ LOG.error("Shutting down process");
+
+ // kill this process, this will make an external supervisor
(the dispatcher) restart the process
+ System.exit(EXIT_CODE_FATAL_ERROR);
+ }
+
+ //
------------------------------------------------------------------------
+ // Worker Management
+ //
------------------------------------------------------------------------
+
+ /**
+ * Recover framework/worker information persisted by a prior
incarnation of the RM.
+ */
+ private void recoverWorkers() throws Exception {
+ // if this application master starts as part of an
ApplicationMaster/JobManager recovery,
+ // then some worker tasks are most likely still alive and we
can re-obtain them
+ final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts =
workerStore.recoverWorkers();
+
+ if (!tasksFromPreviousAttempts.isEmpty()) {
+ LOG.info("Retrieved {} TaskManagers from previous
attempt", tasksFromPreviousAttempts.size());
+
+ List<Tuple2<TaskRequest,String>> toAssign = new
ArrayList<>(tasksFromPreviousAttempts.size());
+ List<LaunchableTask> toLaunch = new
ArrayList<>(tasksFromPreviousAttempts.size());
+
+ for (final MesosWorkerStore.Worker worker :
tasksFromPreviousAttempts) {
+ LaunchableMesosWorker launchable =
createLaunchableMesosWorker(worker.taskID());
+
+ switch(worker.state()) {
+ case New:
+
workersInNew.put(extractResourceID(worker.taskID()), worker);
+ toLaunch.add(launchable);
+ break;
+ case Launched:
+
workersInLaunch.put(extractResourceID(worker.taskID()), worker);
+ toAssign.add(new
Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
+ break;
+ case Released:
+
workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
+ break;
+ }
+ taskRouter.tell(new
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
+ }
+
+ // tell the launch coordinator about prior assignments
+ if(toAssign.size() >= 1) {
+ launchCoordinator.tell(new
LaunchCoordinator.Assign(toAssign), self());
+ }
+ // tell the launch coordinator to launch any new tasks
+ if(toLaunch.size() >= 1) {
+ launchCoordinator.tell(new
LaunchCoordinator.Launch(toLaunch), self());
+ }
+ }
+ }
+
+ /**
+ * Plan for some additional workers to be launched.
+ *
+ * @param numWorkers The number of workers to allocate.
+ */
+ @Override
+ protected void requestNewWorkers(int numWorkers) {
+
+ try {
+ List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new
ArrayList<>(numWorkers);
+ List<LaunchableTask> toLaunch = new
ArrayList<>(numWorkers);
+
+ // generate new workers into persistent state and
launch associated actors
+ for (int i = 0; i < numWorkers; i++) {
+ MesosWorkerStore.Worker worker =
MesosWorkerStore.Worker.newTask(workerStore.newTaskID());
--- End diff --
Worker is the logical term used by the base class; task is the concrete
term used by Mesos. I strove for some internal consistency when dealing with
logical vs concrete.
> Integrate Flink with Apache Mesos
> ---------------------------------
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
> Issue Type: New Feature
> Components: Cluster Management
> Reporter: Robert Metzger
> Assignee: Eron Wright
> Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-:
> https://github.com/apache/flink/pull/251
> Update (May '16): a new effort is now underway, building on the recent
> ResourceManager work.
> Design document: ([google
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)