[
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422823#comment-15422823
]
ASF GitHub Bot commented on FLINK-1984:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2315#discussion_r74948792
--- Diff:
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application
Master.
+ * It starts actor system and the actors for {@link
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+ /** Logger */
+ protected static final Logger LOG =
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+ /** The maximum time that TaskManagers may be waiting to register at
the JobManager,
+ * before they quit */
+ private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT =
new FiniteDuration(5, TimeUnit.MINUTES);
+
+ /** The process environment variables */
+ private static final Map<String, String> ENV = System.getenv();
+
+ /** The exit code returned if the initialization of the application
master failed */
+ private static final int INIT_ERROR_EXIT_CODE = 31;
+
+ /** The exit code returned if the process exits because a critical
actor died */
+ private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+ //
------------------------------------------------------------------------
+ // Program entry point
+ //
------------------------------------------------------------------------
+
+ /**
+ * The entry point for the Mesos AppMaster.
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args) {
+ EnvironmentInformation.logEnvironmentInfo(LOG, "Mesos
AppMaster", args);
+ SignalHandler.register(LOG);
+
+ // run and exit with the proper return code
+ int returnCode = new MesosApplicationMasterRunner().run(args);
+ System.exit(returnCode);
+ }
+
+ /**
+ * The instance entry point for the Mesos AppMaster. Obtains user group
+ * information and calls the main work method {@link #runPrivileged()}
as a
+ * privileged action.
+ *
+ * @param args The command line arguments.
+ * @return The process exit code.
+ */
+ protected int run(String[] args) {
+ try {
+ LOG.debug("All environment variables: {}", ENV);
+
+ final UserGroupInformation currentUser;
+ try {
+ currentUser =
UserGroupInformation.getCurrentUser();
+ } catch (Throwable t) {
+ throw new Exception("Cannot access
UserGroupInformation information for current user", t);
+ }
+
+ LOG.info("Running Flink as user {}",
currentUser.getShortUserName());
+
+ // run the actual work in a secured privileged action
+ return currentUser.doAs(new PrivilegedAction<Integer>()
{
+ @Override
+ public Integer run() {
+ return runPrivileged();
+ }
+ });
+ }
+ catch (Throwable t) {
+ // make sure that everything whatever ends up in the log
+ LOG.error("Mesos AppMaster initialization failed", t);
+ return INIT_ERROR_EXIT_CODE;
+ }
+ }
+
+ //
------------------------------------------------------------------------
+ // Core work method
+ //
------------------------------------------------------------------------
+
+ /**
+ * The main work method, must run as a privileged action.
+ *
+ * @return The return code for the Java process.
+ */
+ protected int runPrivileged() {
+
+ ActorSystem actorSystem = null;
+ WebMonitor webMonitor = null;
+ MesosArtifactServer artifactServer = null;
+
+ try {
+ // ------- (1) load and parse / validate all
configurations -------
+
+ // loading all config values here has the advantage
that the program fails fast, if any
+ // configuration problem occurs
+
+ final String workingDir =
ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
+ require(workingDir != null, "Sandbox directory variable
(%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX);
+
+ final String sessionID =
ENV.get(MesosConfigKeys.ENV_SESSION_ID);
+ require(sessionID != null, "Session ID (%s) not set",
MesosConfigKeys.ENV_SESSION_ID);
+
+ // Note that we use the "appMasterHostname" given by
the system, to make sure
+ // we use the hostnames consistently throughout akka.
+ // for akka "localhost" and "localhost.localdomain" are
different actors.
+ final String appMasterHostname =
InetAddress.getLocalHost().getHostName();
+
+ // Flink configuration
+ final Configuration dynamicProperties =
+
FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
+ LOG.debug("Mesos dynamic properties: {}",
dynamicProperties);
+
+ final Configuration config =
createConfiguration(workingDir, dynamicProperties);
+
+ // Mesos configuration
+ final MesosConfiguration mesosConfig =
createMesosConfig(config, appMasterHostname);
+
+ // environment values related to TM
+ final int taskManagerContainerMemory;
+ final int numInitialTaskManagers;
+ final int slotsPerTaskManager;
+
+ try {
+ taskManagerContainerMemory =
Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_MEMORY));
+ } catch (NumberFormatException e) {
+ throw new RuntimeException("Invalid value for "
+ MesosConfigKeys.ENV_TM_MEMORY + " : "
+ + e.getMessage());
+ }
+ try {
+ numInitialTaskManagers =
Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_COUNT));
+ } catch (NumberFormatException e) {
+ throw new RuntimeException("Invalid value for "
+ MesosConfigKeys.ENV_TM_COUNT + " : "
+ + e.getMessage());
+ }
+ try {
+ slotsPerTaskManager =
Integer.parseInt(ENV.get(MesosConfigKeys.ENV_SLOTS));
+ } catch (NumberFormatException e) {
+ throw new RuntimeException("Invalid value for "
+ MesosConfigKeys.ENV_SLOTS + " : "
+ + e.getMessage());
+ }
+
+ final ContaineredTaskManagerParameters
containeredParameters =
+ ContaineredTaskManagerParameters.create(config,
taskManagerContainerMemory, slotsPerTaskManager);
+
+ final MesosTaskManagerParameters taskManagerParameters =
+ MesosTaskManagerParameters.create(config,
containeredParameters);
+
+ LOG.info("TaskManagers will be created with {} task
slots",
+
taskManagerParameters.containeredParameters().numSlots());
+ LOG.info("TaskManagers will be started with container
size {} MB, JVM heap size {} MB, " +
+ "JVM direct memory limit {} MB, {}
cpus",
+
taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(),
+
taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(),
+
taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(),
+ taskManagerParameters.cpus());
+
+ // JM endpoint, which should be explicitly configured
by the dispatcher (based on acquired net resources)
+ final int listeningPort =
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+ ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
+ require(listeningPort >= 0 && listeningPort <= 65536,
"Config parameter \"" +
+ ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + "\"
is invalid, it must be between 0 and 65536");
+
+ // ----------------- (2) start the actor system
-------------------
+
+ // try to start the actor system, JobManager and
JobManager actor system
+ // using the configured address and ports
+ actorSystem = BootstrapTools.startActorSystem(config,
appMasterHostname, listeningPort, LOG);
+
+ final String akkaHostname =
AkkaUtils.getAddress(actorSystem).host().get();
+ final int akkaPort = (Integer)
AkkaUtils.getAddress(actorSystem).port().get();
+
+ LOG.info("Actor system bound to hostname {}.",
akkaHostname);
+
+ // try to start the artifact server
+ LOG.debug("Starting Artifact Server");
+ final int artifactServerPort =
config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY,
+
ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT);
+ artifactServer = new MesosArtifactServer(sessionID,
akkaHostname, artifactServerPort);
+
+ // ----------------- (3) Generate the configuration for
the TaskManagers -------------------
+
+ final Configuration taskManagerConfig =
BootstrapTools.generateTaskManagerConfiguration(
+ config, akkaHostname, akkaPort,
slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT);
+ LOG.debug("TaskManager configuration: {}",
taskManagerConfig);
+
+ final Protos.TaskInfo.Builder taskManagerContext =
createTaskManagerContext(
+ config, mesosConfig, ENV,
+ taskManagerParameters, taskManagerConfig,
+ workingDir, getTaskManagerClass(),
artifactServer, LOG);
+
+ // ----------------- (4) start the actors
-------------------
+
+ // 1) JobManager & Archive (in non-HA case, the leader
service takes this)
+ // 2) Web Monitor (we need its port to register)
+ // 3) Resource Master for Mesos
+ // 4) Process reapers for the JobManager and Resource
Master
+
+ // 1: the JobManager
+ LOG.debug("Starting JobManager actor");
+
+ // we start the JobManager with its standard name
+ ActorRef jobManager = JobManager.startJobManagerActors(
+ config, actorSystem,
+ new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
+ scala.Option.<String>empty(),
+ getJobManagerClass(),
+ getArchivistClass())._1();
+
+
+ // 2: the web monitor
+ LOG.debug("Starting Web Frontend");
+
+ webMonitor =
BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager,
LOG);
+ if(webMonitor != null) {
+ final URL webMonitorURL = new URL("http",
appMasterHostname, webMonitor.getServerPort(), "/");
+
mesosConfig.frameworkInfo().setWebuiUrl(webMonitorURL.toExternalForm());
+ }
+
+ // 3: Flink's Mesos ResourceManager
+ LOG.debug("Starting Mesos Flink Resource Manager");
+
+ // create the worker store to persist task information
across restarts
+ MesosWorkerStore workerStore =
createWorkerStore(config);
+
+ // we need the leader retrieval service here to be
informed of new
+ // leader session IDs, even though there can be only
one leader ever
+ LeaderRetrievalService leaderRetriever =
+
LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager);
+
+ Props resourceMasterProps =
MesosFlinkResourceManager.createActorProps(
--- End diff --
Resource master or resource manager? I think it should be
`resourceManagerProps`.
> Integrate Flink with Apache Mesos
> ---------------------------------
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
> Issue Type: New Feature
> Components: Cluster Management
> Reporter: Robert Metzger
> Assignee: Eron Wright
> Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-:
> https://github.com/apache/flink/pull/251
> Update (May '16): a new effort is now underway, building on the recent
> ResourceManager work.
> Design document: ([google
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)