[
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15714781#comment-15714781
]
ASF GitHub Bot commented on FLINK-4928:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2744#discussion_r90622537
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
---
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.OnCompletionActions;
+import org.apache.flink.runtime.jobmaster.JobManagerRunner;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
+import
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
+import
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.ObjectInputStream;
+
+/**
+ * This class is the executable entry point for the YARN application
master.
+ * It starts actor system and the actors for {@link
org.apache.flink.runtime.jobmaster.JobManagerRunner}
+ * and {@link org.apache.flink.yarn.YarnResourceManager}.
+ *
+ * The JobMasnagerRunner start a {@link
org.apache.flink.runtime.jobmaster.JobMaster}
+ * JobMaster handles Flink job execution, while the YarnResourceManager
handles container
+ * allocation and failure detection.
+ */
+public class YarnFlinkApplicationMasterRunner extends
AbstractYarnFlinkApplicationMasterRunner
+ implements OnCompletionActions, FatalErrorHandler {
+
+ /** Logger */
+ protected static final Logger LOG =
LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class);
+
+ /** The job graph file path */
+ private static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path";
+
+ /** The lock to guard startup / shutdown / manipulation methods */
+ private final Object lock = new Object();
+
+ @GuardedBy("lock")
+ private MetricRegistry metricRegistry;
+
+ @GuardedBy("lock")
+ private HighAvailabilityServices haServices;
+
+ @GuardedBy("lock")
+ private RpcService commonRpcService;
+
+ @GuardedBy("lock")
+ private ResourceManager resourceManager;
+
+ @GuardedBy("lock")
+ private JobManagerRunner jobManagerRunner;
+
+ @GuardedBy("lock")
+ private JobGraph jobGraph;
+
+ //
------------------------------------------------------------------------
+ // Program entry point
+ //
------------------------------------------------------------------------
+
+ /**
+ * The entry point for the YARN application master.
+ *
+ * @param args The command line arguments.
+ */
+ public static void main(String[] args) {
+ EnvironmentInformation.logEnvironmentInfo(LOG, "YARN
ApplicationMaster runner", args);
+ SignalHandler.register(LOG);
+ JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+ // run and exit with the proper return code
+ int returnCode = new
YarnFlinkApplicationMasterRunner().run(args);
+ System.exit(returnCode);
+ }
+
+ @Override
+ protected int runApplicationMaster(Configuration config) {
+
+ try {
+ // ---- (1) create common services
+
+ // try to start the rpc service
+ // using the port range definition from the config.
+ final String amPortRange = config.getString(
+
ConfigConstants.YARN_APPLICATION_MASTER_PORT,
+
ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
+
+ synchronized (lock) {
+ haServices =
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(config);
+ metricRegistry = new
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+ commonRpcService = createRpcService(config,
appMasterHostname, amPortRange);
+
+ // ---- (2) init resource manager -------
+ resourceManager = createResourceManager(config);
+
+ // ---- (3) init job master parameters
+ jobManagerRunner =
createJobManagerRunner(config);
+
+ // ---- (4) start the resource manager and job
manager runner:
+ resourceManager.start();
+ LOG.debug("YARN Flink Resource Manager
started");
+
+ jobManagerRunner.start();
+ LOG.debug("Job Manager Runner started");
+
+ // ---- (5) start the web monitor
+ // TODO: add web monitor
+ }
+
+ // wait for resource manager to finish
+ resourceManager.getTerminationFuture().get();
+ // everything started, we can wait until all is done or
the process is killed
+ LOG.info("YARN Application Master finished");
+ }
+ catch (Throwable t) {
+ // make sure that everything whatever ends up in the log
+ LOG.error("YARN Application Master initialization
failed", t);
+ shutdown(ApplicationStatus.FAILED, t.getMessage());
+ return INIT_ERROR_EXIT_CODE;
+ }
+
+ return 0;
+ }
+
+ //
------------------------------------------------------------------------
+ // Utilities
+ //
------------------------------------------------------------------------
+
+ protected RpcService createRpcService(
+ Configuration configuration,
+ String bindAddress,
+ String portRange) throws Exception{
+ ActorSystem actorSystem =
BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG);
+ FiniteDuration duration = AkkaUtils.getTimeout(configuration);
+ return new AkkaRpcService(actorSystem,
Time.of(duration.length(), duration.unit()));
--- End diff --
Maybe you could extend `RcpServiceUtils.createRpcService` to also support
port ranges.
> Implement FLIP-6 YARN Application Master Runner
> -----------------------------------------------
>
> Key: FLINK-4928
> URL: https://issues.apache.org/jira/browse/FLINK-4928
> Project: Flink
> Issue Type: Sub-task
> Components: YARN
> Environment: {{flip-6}} feature branch
> Reporter: Stephan Ewen
> Assignee: shuai.xu
>
> The Application Master Runner is the master process started in a YARN
> container when submitting the Flink-on-YARN job to YARN.
> It has the following data available:
> - Flink jars
> - Job jars
> - JobGraph
> - Environment variables
> - Contextual information like security tokens and certificates
> Its responsibility is the following:
> - Read all configuration and environment variables, computing the effective
> configuration
> - Start all shared components (Rpc, HighAvailability Services)
> - Start the ResourceManager
> - Start the JobManager Runner
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)