[ 
https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15690846#comment-15690846
 ] 

ASF GitHub Bot commented on FLINK-4928:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2744#discussion_r89360108
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java
 ---
    @@ -0,0 +1,612 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.yarn;
    +
    +import akka.actor.ActorSystem;
    +import org.apache.flink.api.common.JobExecutionResult;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.GlobalConfiguration;
    +import org.apache.flink.configuration.HighAvailabilityOptions;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.client.JobExecutionException;
    +import org.apache.flink.runtime.clusterframework.ApplicationStatus;
    +import org.apache.flink.runtime.clusterframework.BootstrapTools;
    +import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
    +import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
    +import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
    +import org.apache.flink.runtime.jobgraph.JobGraph;
    +import org.apache.flink.runtime.jobmanager.OnCompletionActions;
    +import org.apache.flink.runtime.jobmaster.JobManagerServices;
    +import org.apache.flink.runtime.jobmaster.JobMaster;
    +import org.apache.flink.runtime.leaderelection.LeaderContender;
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService;
    +import org.apache.flink.runtime.metrics.MetricRegistry;
    +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
    +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
    +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
    +import org.apache.flink.runtime.resourcemanager.ResourceManager;
    +import 
org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
    +import 
org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException;
    +import 
org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager;
    +import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
    +import org.apache.flink.runtime.rpc.FatalErrorHandler;
    +import org.apache.flink.runtime.rpc.RpcService;
    +import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
    +import org.apache.flink.runtime.security.SecurityContext;
    +import org.apache.flink.runtime.util.EnvironmentInformation;
    +import org.apache.flink.runtime.util.JvmShutdownSafeguard;
    +import org.apache.flink.runtime.util.SignalHandler;
    +import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
    +import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import javax.annotation.concurrent.GuardedBy;
    +
    +import java.io.File;
    +import java.io.FileInputStream;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.util.Map;
    +import java.util.UUID;
    +
    +/**
    + * This class is the executable entry point for the YARN application 
master.
    + * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmaster.JobMaster}
    + * and {@link org.apache.flink.yarn.YarnResourceManager}.
    + *
    + * The JobMasters handles Flink job execution, while the 
YarnResourceManager handles container
    + * allocation and failure detection.
    + */
    +public class YarnFlinkApplicationMasterRunner implements LeaderContender, 
OnCompletionActions, FatalErrorHandler {
    --- End diff --
    
    The current implementation tries to solve the Yarn cluster per job 
scenario, right? Maybe we could refactor the code such that we have 
`AbstractYarnFlinkApplicationMasterRunner` which does the generic Yarn 
initialization and two specializations `YarnPerJobFlinkApplicationMasterRunner` 
and `YarnClusterFlinkApplicationRunner`. The former basically works as this 
one. It starts a `ResourceManager` and a `JobManager` with the provided user 
code jar. The latter starts a `Dispatcher` (still to be implemented) and a 
`ResourceManager`. The `Dispatcher's` task is to receive job submission 
requests and to spawn for each request a new `JobMaster`. Then `JobMaster` is 
then responsible to run the submitted job.


> Implement FLIP-6 YARN Application Master Runner
> -----------------------------------------------
>
>                 Key: FLINK-4928
>                 URL: https://issues.apache.org/jira/browse/FLINK-4928
>             Project: Flink
>          Issue Type: Sub-task
>          Components: YARN
>         Environment: {{flip-6}} feature branch
>            Reporter: Stephan Ewen
>            Assignee: shuai.xu
>
> The Application Master Runner is the master process started in a YARN 
> container when submitting the Flink-on-YARN job to YARN.
> It has the following data available:
>   - Flink jars
>   - Job jars
>   - JobGraph
>   - Environment variables
>   - Contextual information like security tokens and certificates
> Its responsibility is the following:
>   - Read all configuration and environment variables, computing the effective 
> configuration
>   - Start all shared components (Rpc, HighAvailability Services)
>   - Start the ResourceManager
>   - Start the JobManager Runner



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to