[
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422998#comment-15422998
]
ASF GitHub Bot commented on FLINK-1984:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2315#discussion_r74971461
--- Diff:
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends
FlinkResourceManager<RegisteredMesosWorkerNode> {
+
+ /** The Mesos configuration (master and framework info) */
+ private final MesosConfiguration mesosConfig;
+
+ /** The TaskManager container parameters (like container memory size) */
+ private final MesosTaskManagerParameters taskManagerParameters;
+
+ /** Context information used to start a TaskManager Java process */
+ private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+ /** Number of failed Mesos tasks before stopping the application. -1
means infinite. */
+ private final int maxFailedTasks;
+
+ /** Callback handler for the asynchronous Mesos scheduler */
+ private SchedulerProxy schedulerCallbackHandler;
+
+ /** Mesos scheduler driver */
+ private SchedulerDriver schedulerDriver;
+
+ private ActorRef connectionMonitor;
+
+ private ActorRef taskRouter;
+
+ private ActorRef launchCoordinator;
+
+ private ActorRef reconciliationCoordinator;
+
+ private MesosWorkerStore workerStore;
+
+ final Map<ResourceID, MesosWorkerStore.Worker> workersInNew;
+ final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch;
+ final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned;
+
+ /** The number of failed tasks since the master became active */
+ private int failedTasksSoFar;
+
+ public MesosFlinkResourceManager(
+ Configuration flinkConfig,
+ MesosConfiguration mesosConfig,
+ MesosWorkerStore workerStore,
+ LeaderRetrievalService leaderRetrievalService,
+ MesosTaskManagerParameters taskManagerParameters,
+ Protos.TaskInfo.Builder taskManagerLaunchContext,
+ int maxFailedTasks,
+ int numInitialTaskManagers) {
+
+ super(numInitialTaskManagers, flinkConfig,
leaderRetrievalService);
+
+ this.mesosConfig = requireNonNull(mesosConfig);
+
+ this.workerStore = requireNonNull(workerStore);
+
+ this.taskManagerParameters =
requireNonNull(taskManagerParameters);
+ this.taskManagerLaunchContext =
requireNonNull(taskManagerLaunchContext);
+ this.maxFailedTasks = maxFailedTasks;
+
+ this.workersInNew = new HashMap<>();
+ this.workersInLaunch = new HashMap<>();
+ this.workersBeingReturned = new HashMap<>();
+ }
+
+ //
------------------------------------------------------------------------
+ // Mesos-specific behavior
+ //
------------------------------------------------------------------------
+
+ @Override
+ protected void initialize() throws Exception {
+ LOG.info("Initializing Mesos resource master");
+
+ workerStore.start();
+
+ // create the scheduler driver to communicate with Mesos
+ schedulerCallbackHandler = new SchedulerProxy(self());
+
+ // register with Mesos
+ FrameworkInfo.Builder frameworkInfo =
mesosConfig.frameworkInfo()
+ .clone()
+ .setCheckpoint(true);
+
+ Option<Protos.FrameworkID> frameworkID =
workerStore.getFrameworkID();
+ if(frameworkID.isEmpty()) {
+ LOG.info("Registering as new framework.");
+ }
+ else {
+ LOG.info("Recovery scenario: re-registering using
framework ID {}.", frameworkID.get().getValue());
+ frameworkInfo.setId(frameworkID.get());
+ }
+
+ MesosConfiguration initializedMesosConfig =
mesosConfig.withFrameworkInfo(frameworkInfo);
+ MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig);
+ schedulerDriver =
initializedMesosConfig.createDriver(schedulerCallbackHandler, false);
+
+ // create supporting actors
+ connectionMonitor = createConnectionMonitor();
+ launchCoordinator = createLaunchCoordinator();
+ reconciliationCoordinator = createReconciliationCoordinator();
+ taskRouter = createTaskRouter();
+
+ recoverWorkers();
+
+ connectionMonitor.tell(new ConnectionMonitor.Start(), self());
+ schedulerDriver.start();
+ }
+
+ protected ActorRef createConnectionMonitor() {
+ return context().actorOf(
+
ConnectionMonitor.createActorProps(ConnectionMonitor.class, config),
+ "connectionMonitor");
+ }
+
+ protected ActorRef createTaskRouter() {
+ return context().actorOf(
+ Tasks.createActorProps(Tasks.class, config,
schedulerDriver, TaskMonitor.class),
+ "tasks");
+ }
+
+ protected ActorRef createLaunchCoordinator() {
+ return context().actorOf(
+
LaunchCoordinator.createActorProps(LaunchCoordinator.class, self(), config,
schedulerDriver, createOptimizer()),
+ "launchCoordinator");
+ }
+
+ protected ActorRef createReconciliationCoordinator() {
+ return context().actorOf(
+
ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class,
config, schedulerDriver),
+ "reconciliationCoordinator");
+ }
+
+ @Override
+ public void postStop() {
+ LOG.info("Stopping Mesos resource master");
+ super.postStop();
+ }
+
+ //
------------------------------------------------------------------------
+ // Actor messages
+ //
------------------------------------------------------------------------
+
+ @Override
+ protected void handleMessage(Object message) {
+
+ // check for Mesos-specific actor messages first
+
+ // --- messages about Mesos connection
+ if (message instanceof Registered) {
+ registered((Registered) message);
+ } else if (message instanceof ReRegistered) {
+ reregistered((ReRegistered) message);
+ } else if (message instanceof Disconnected) {
+ disconnected((Disconnected) message);
+ } else if (message instanceof Error) {
+ error(((Error) message).message());
+
+ // --- messages about offers
+ } else if (message instanceof ResourceOffers || message
instanceof OfferRescinded) {
+ launchCoordinator.tell(message, self());
+ } else if (message instanceof AcceptOffers) {
+ acceptOffers((AcceptOffers) message);
+
+ // --- messages about tasks
+ } else if (message instanceof StatusUpdate) {
+ taskStatusUpdated((StatusUpdate) message);
+ } else if (message instanceof
ReconciliationCoordinator.Reconcile) {
+ // a reconciliation request from a task
+ reconciliationCoordinator.tell(message, self());
+ } else if (message instanceof TaskMonitor.TaskTerminated) {
+ // a termination message from a task
+ TaskMonitor.TaskTerminated msg =
(TaskMonitor.TaskTerminated) message;
+ taskTerminated(msg.taskID(), msg.status());
+
+ } else {
+ // message handled by the generic resource master code
+ super.handleMessage(message);
+ }
+ }
+
+ /**
+ * Called to shut down the cluster (not a failover situation).
+ *
+ * @param finalStatus The application status to report.
+ * @param optionalDiagnostics An optional diagnostics message.
+ */
+ @Override
+ protected void shutdownApplication(ApplicationStatus finalStatus,
String optionalDiagnostics) {
+
+ LOG.info("Shutting down and unregistering as a Mesos
framework.");
+ try {
+ // unregister the framework, which implicitly removes
all tasks.
+ schedulerDriver.stop(false);
+ }
+ catch(Exception ex) {
+ LOG.warn("unable to unregister the framework", ex);
+ }
+
+ try {
+ workerStore.cleanup();
+ }
+ catch(Exception ex) {
+ LOG.warn("unable to cleanup the ZooKeeper state", ex);
+ }
+
+ context().stop(self());
+ }
+
+ @Override
+ protected void fatalError(String message, Throwable error) {
+ // we do not unregister, but cause a hard fail of this process,
to have it
+ // restarted by the dispatcher
+ LOG.error("FATAL ERROR IN MESOS APPLICATION MASTER: " +
message, error);
+ LOG.error("Shutting down process");
+
+ // kill this process, this will make an external supervisor
(the dispatcher) restart the process
+ System.exit(EXIT_CODE_FATAL_ERROR);
+ }
+
+ //
------------------------------------------------------------------------
+ // Worker Management
+ //
------------------------------------------------------------------------
+
+ /**
+ * Recover framework/worker information persisted by a prior
incarnation of the RM.
+ */
+ private void recoverWorkers() throws Exception {
+ // if this application master starts as part of an
ApplicationMaster/JobManager recovery,
+ // then some worker tasks are most likely still alive and we
can re-obtain them
+ final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts =
workerStore.recoverWorkers();
+
+ if (!tasksFromPreviousAttempts.isEmpty()) {
+ LOG.info("Retrieved {} TaskManagers from previous
attempt", tasksFromPreviousAttempts.size());
+
+ List<Tuple2<TaskRequest,String>> toAssign = new
ArrayList<>(tasksFromPreviousAttempts.size());
+ List<LaunchableTask> toLaunch = new
ArrayList<>(tasksFromPreviousAttempts.size());
+
+ for (final MesosWorkerStore.Worker worker :
tasksFromPreviousAttempts) {
+ LaunchableMesosWorker launchable =
createLaunchableMesosWorker(worker.taskID());
+
+ switch(worker.state()) {
+ case New:
+
workersInNew.put(extractResourceID(worker.taskID()), worker);
+ toLaunch.add(launchable);
+ break;
+ case Launched:
+
workersInLaunch.put(extractResourceID(worker.taskID()), worker);
+ toAssign.add(new
Tuple2<>(launchable.taskRequest(), worker.hostname().get()));
+ break;
+ case Released:
+
workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
+ break;
+ }
+ taskRouter.tell(new
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
+ }
+
+ // tell the launch coordinator about prior assignments
+ if(toAssign.size() >= 1) {
+ launchCoordinator.tell(new
LaunchCoordinator.Assign(toAssign), self());
+ }
+ // tell the launch coordinator to launch any new tasks
+ if(toLaunch.size() >= 1) {
+ launchCoordinator.tell(new
LaunchCoordinator.Launch(toLaunch), self());
+ }
+ }
+ }
+
+ /**
+ * Plan for some additional workers to be launched.
+ *
+ * @param numWorkers The number of workers to allocate.
+ */
+ @Override
+ protected void requestNewWorkers(int numWorkers) {
+
+ try {
+ List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new
ArrayList<>(numWorkers);
+ List<LaunchableTask> toLaunch = new
ArrayList<>(numWorkers);
+
+ // generate new workers into persistent state and
launch associated actors
+ for (int i = 0; i < numWorkers; i++) {
+ MesosWorkerStore.Worker worker =
MesosWorkerStore.Worker.newTask(workerStore.newTaskID());
+ workerStore.putWorker(worker);
+
workersInNew.put(extractResourceID(worker.taskID()), worker);
+
+ LaunchableMesosWorker launchable =
createLaunchableMesosWorker(worker.taskID());
+
+ LOG.info("Scheduling Mesos task {} with ({} MB,
{} cpus).",
+ launchable.taskID().getValue(),
launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs());
+
+ toMonitor.add(new
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
+ toLaunch.add(launchable);
+ }
+
+ // tell the task router about the new plans
+ for (TaskMonitor.TaskGoalStateUpdated update :
toMonitor) {
+ taskRouter.tell(update, self());
+ }
+
+ // tell the launch coordinator to launch the new tasks
+ if(toLaunch.size() >= 1) {
+ launchCoordinator.tell(new
LaunchCoordinator.Launch(toLaunch), self());
+ }
+ }
+ catch(Exception ex) {
+ fatalError("unable to request new workers", ex);
+ }
+ }
+
+ /**
+ * Accept offers as advised by the launch coordinator.
+ *
+ * Acceptance is routed through the RM to update the persistent state
before
+ * forwarding the message to Mesos.
+ */
+ private void acceptOffers(AcceptOffers msg) {
+
+ try {
+ List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new
ArrayList<>(msg.operations().size());
+
+ // transition the persistent state of some tasks to
Launched
+ for (Protos.Offer.Operation op : msg.operations()) {
+ if (op.getType() !=
Protos.Offer.Operation.Type.LAUNCH) {
+ continue;
+ }
+ for (Protos.TaskInfo info :
op.getLaunch().getTaskInfosList()) {
+ MesosWorkerStore.Worker worker =
workersInNew.remove(extractResourceID(info.getTaskId()));
+ assert (worker != null);
+
+ worker =
worker.launchTask(info.getSlaveId(), msg.hostname());
+ workerStore.putWorker(worker);
+
workersInLaunch.put(extractResourceID(worker.taskID()), worker);
+
+ LOG.info("Launching Mesos task {} on
host {}.",
+ worker.taskID().getValue(),
worker.hostname().get());
+
+ toMonitor.add(new
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)));
+ }
+ }
+
+ // tell the task router about the new plans
+ for (TaskMonitor.TaskGoalStateUpdated update :
toMonitor) {
+ taskRouter.tell(update, self());
+ }
+
+ // send the acceptance message to Mesos
+ schedulerDriver.acceptOffers(msg.offerIds(),
msg.operations(), msg.filters());
+ }
+ catch(Exception ex) {
+ fatalError("unable to accept offers", ex);
+ }
+ }
+
+ /**
+ * Handle a task status change.
+ */
+ private void taskStatusUpdated(StatusUpdate message) {
+ taskRouter.tell(message, self());
+ reconciliationCoordinator.tell(message, self());
+ schedulerDriver.acknowledgeStatusUpdate(message.status());
+ }
+
+ /**
+ * Accept the given started worker into the internal state.
+ *
+ * @param resourceID The worker resource id
+ * @return A registered worker node record.
+ */
+ @Override
+ protected RegisteredMesosWorkerNode workerStarted(ResourceID
resourceID) {
+ MesosWorkerStore.Worker inLaunch =
workersInLaunch.remove(resourceID);
+ if (inLaunch == null) {
+ // Worker was not in state "being launched", this can
indicate that the TaskManager
+ // in this worker was already registered or that the
container was not started
+ // by this resource manager. Simply ignore this
resourceID.
+ return null;
+ }
+ return new RegisteredMesosWorkerNode(inLaunch);
+ }
+
+ /**
+ * Accept the given registered workers into the internal state.
+ *
+ * @param toConsolidate The worker IDs known previously to the
JobManager.
+ * @return A collection of registered worker node records.
+ */
+ @Override
+ protected Collection<RegisteredMesosWorkerNode>
reacceptRegisteredWorkers(Collection<ResourceID> toConsolidate) {
+
+ // we check for each task manager if we recognize its Mesos
task ID
+ List<RegisteredMesosWorkerNode> accepted = new
ArrayList<>(toConsolidate.size());
+ for (ResourceID resourceID : toConsolidate) {
+ MesosWorkerStore.Worker worker =
workersInLaunch.remove(resourceID);
+ if (worker != null) {
+ LOG.info("Mesos worker consolidation recognizes
TaskManager {}.", resourceID);
+ accepted.add(new
RegisteredMesosWorkerNode(worker));
+ }
+ else {
+ if(isStarted(resourceID)) {
+ LOG.info("TaskManager {} has already
been registered at the resource manager.", resourceID);
+ }
+ else {
+ LOG.info("Mesos worker consolidation
does not recognize TaskManager {}.", resourceID);
+ }
+ }
+ }
+ return accepted;
+ }
+
+ /**
+ * Release the given pending worker.
+ */
+ @Override
+ protected void releasePendingWorker(ResourceID id) {
+ MesosWorkerStore.Worker worker = workersInLaunch.remove(id);
+ if (worker != null) {
+ releaseWorker(worker);
+ } else {
+ LOG.error("Cannot find worker {} to release. Ignoring
request.", id);
+ }
+ }
+
+ /**
+ * Release the given started worker.
+ */
+ @Override
+ protected void releaseStartedWorker(RegisteredMesosWorkerNode worker) {
+ releaseWorker(worker.task());
+ }
+
+ /**
+ * Plan for the removal of the given worker.
+ */
+ private void releaseWorker(MesosWorkerStore.Worker worker) {
+ try {
+ LOG.info("Releasing worker {}", worker.taskID());
+
+ // update persistent state of worker to Released
+ worker = worker.releaseTask();
+ workerStore.putWorker(worker);
+
workersBeingReturned.put(extractResourceID(worker.taskID()), worker);
+ taskRouter.tell(new
TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self());
+
+ if (worker.hostname().isDefined()) {
+ // tell the launch coordinator that the task is
being unassigned from the host, for planning purposes
+ launchCoordinator.tell(new
LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), self());
+ }
+ }
+ catch (Exception ex) {
+ fatalError("unable to release worker", ex);
+ }
+ }
+
+ @Override
+ protected int getNumWorkerRequestsPending() {
+ return workersInNew.size();
+ }
+
+ @Override
+ protected int getNumWorkersPendingRegistration() {
+ return workersInLaunch.size();
+ }
+
+ //
------------------------------------------------------------------------
+ // Callbacks from the Mesos Master
+ //
------------------------------------------------------------------------
+
+ /**
+ * Called when connected to Mesos as a new framework.
+ */
+ private void registered(Registered message) {
+ connectionMonitor.tell(message, self());
+
+ try {
+
workerStore.setFrameworkID(Option.apply(message.frameworkId()));
+ }
+ catch(Exception ex) {
+ fatalError("unable to store the assigned framework ID",
ex);
+ return;
+ }
+
+ launchCoordinator.tell(message, self());
+ reconciliationCoordinator.tell(message, self());
+ taskRouter.tell(message, self());
+ }
+
+ /**
+ * Called when reconnected to Mesos following a failover event.
+ */
+ private void reregistered(ReRegistered message) {
+ connectionMonitor.tell(message, self());
+ launchCoordinator.tell(message, self());
+ reconciliationCoordinator.tell(message, self());
+ taskRouter.tell(message, self());
+ }
+
+ /**
+ * Called when disconnected from Mesos.
+ */
+ private void disconnected(Disconnected message) {
+ connectionMonitor.tell(message, self());
+ launchCoordinator.tell(message, self());
+ reconciliationCoordinator.tell(message, self());
+ taskRouter.tell(message, self());
+ }
+
+ /**
+ * Called when an error is reported by the scheduler callback.
+ */
+ private void error(String message) {
+ self().tell(new FatalErrorOccurred("Connection to Mesos
failed", new Exception(message)), self());
+ }
+
+ /**
+ * Invoked when a Mesos task reaches a terminal status.
+ */
+ private void taskTerminated(Protos.TaskID taskID, Protos.TaskStatus
status) {
+ // this callback occurs for failed containers and for released
containers alike
+
+ final ResourceID id = extractResourceID(taskID);
+
+ try {
+ workerStore.removeWorker(taskID);
+ }
+ catch(Exception ex) {
+ fatalError("unable to remove worker", ex);
+ return;
+ }
+
+ // check if this is a failed task or a released task
+ if (workersBeingReturned.remove(id) != null) {
+ // regular finished worker that we released
+ LOG.info("Worker {} finished successfully with
diagnostics: {}",
+ id, status.getMessage());
+ } else {
+ // failed worker, either at startup, or running
+ final MesosWorkerStore.Worker launched =
workersInLaunch.remove(id);
+ if (launched != null) {
+ LOG.info("Mesos task {} failed, with a
TaskManager in launch or registration. " +
+ "State: {} Reason: {} ({})", id,
status.getState(), status.getReason(), status.getMessage());
+ // we will trigger re-acquiring new workers at
the end
+ } else {
+ // failed registered worker
+ LOG.info("Mesos task {} failed, with a
registered TaskManager. " +
+ "State: {} Reason: {} ({})", id,
status.getState(), status.getReason(), status.getMessage());
+
+ // notify the generic logic, which notifies the
JobManager, etc.
+ notifyWorkerFailed(id, "Mesos task " + id + "
failed. State: " + status.getState());
+ }
+
+ // general failure logging
+ failedTasksSoFar++;
+
+ String diagMessage = String.format("Diagnostics for
task %s in state %s : " +
+ "reason=%s message=%s",
+ id, status.getState(), status.getReason(),
status.getMessage());
+ sendInfoMessage(diagMessage);
+
+ LOG.info(diagMessage);
+ LOG.info("Total number of failed tasks so far: " +
failedTasksSoFar);
--- End diff --
`{}`-placeholder
> Integrate Flink with Apache Mesos
> ---------------------------------
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
> Issue Type: New Feature
> Components: Cluster Management
> Reporter: Robert Metzger
> Assignee: Eron Wright
> Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-:
> https://github.com/apache/flink/pull/251
> Update (May '16): a new effort is now underway, building on the recent
> ResourceManager work.
> Design document: ([google
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)