[ 
https://issues.apache.org/jira/browse/FLINK-1771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14387970#comment-14387970
 ] 

ASF GitHub Bot commented on FLINK-1771:
---------------------------------------

Github user hsaputra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/542#discussion_r27452585
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java ---
    @@ -106,70 +111,129 @@ public FlinkYarnCluster(final YarnClient yarnClient, 
final ApplicationId appId,
                this.sessionFilesDir = sessionFilesDir;
                this.applicationId = appId;
                this.detached = detached;
    +           this.flinkConfig = flinkConfig;
    +           this.appId = appId;
     
                // get one application report manually
                intialAppReport = yarnClient.getApplicationReport(appId);
                String jobManagerHost = intialAppReport.getHost();
                int jobManagerPort = intialAppReport.getRpcPort();
                this.jobManagerAddress = new InetSocketAddress(jobManagerHost, 
jobManagerPort);
    +   }
     
    -           if(!detached) {
    -                   // start actor system
    -                   LOG.info("Start actor system.");
    -                   InetAddress ownHostname = 
NetUtils.resolveAddress(jobManagerAddress); // find name of own public 
interface, able to connect to the JM
    -                   actorSystem = AkkaUtils.createActorSystem(flinkConfig,
    -                                   new Some(new Tuple2<String, 
Integer>(ownHostname.getCanonicalHostName(), 0)));
    +   /**
    +    * Connect the FlinkYarnCluster to the ApplicationMaster.
    +    *
    +    * Detached YARN sessions don't need to connect to the 
ApplicationMaster.
    +    * Detached per job YARN sessions need to connect until the required 
number of TaskManagers have been started.
    +    * 
    +    * @throws IOException
    +    */
    +   public void connectToCluster() throws IOException {
    +           if(isConnected) {
    +                   throw new IllegalStateException("Can not connect to the 
cluster again");
    +           }
     
    -                   // start application client
    -                   LOG.info("Start application client.");
    +           // start actor system
    +           LOG.info("Start actor system.");
    +           InetAddress ownHostname = 
NetUtils.resolveAddress(jobManagerAddress); // find name of own public 
interface, able to connect to the JM
    +           actorSystem = AkkaUtils.createActorSystem(flinkConfig,
    +                           new Some(new Tuple2<String, 
Integer>(ownHostname.getCanonicalHostName(), 0)));
     
    -                   applicationClient = 
actorSystem.actorOf(Props.create(ApplicationClient.class), "applicationClient");
    +           // start application client
    +           LOG.info("Start application client.");
     
    -                   // instruct ApplicationClient to start a periodical 
status polling
    -                   applicationClient.tell(new 
Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient);
    +           applicationClient = 
actorSystem.actorOf(Props.create(ApplicationClient.class, flinkConfig), 
"applicationClient");
     
    +           // instruct ApplicationClient to start a periodical status 
polling
    +           applicationClient.tell(new 
Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient);
     
    -                   // add hook to ensure proper shutdown
    -                   
Runtime.getRuntime().addShutdownHook(clientShutdownHook);
     
    -                   actorRunner = new Thread(new Runnable() {
    -                           @Override
    -                           public void run() {
    -                           // blocks until ApplicationMaster has been 
stopped
    -                           actorSystem.awaitTermination();
    +           actorRunner = new Thread(new Runnable() {
    +                   @Override
    +                   public void run() {
    +                   // blocks until ApplicationMaster has been stopped
    +                   actorSystem.awaitTermination();
     
    -                           // get final application report
    -                           try {
    -                                   ApplicationReport appReport = 
yarnClient.getApplicationReport(appId);
    -
    -                                   LOG.info("Application " + appId + " 
finished with state " + appReport
    -                                                   
.getYarnApplicationState() + " and final state " + appReport
    -                                                   
.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
    -
    -                                   if (appReport.getYarnApplicationState() 
== YarnApplicationState.FAILED || appReport.getYarnApplicationState()
    -                                                   == 
YarnApplicationState.KILLED) {
    -                                           LOG.warn("Application failed. 
Diagnostics " + appReport.getDiagnostics());
    -                                           LOG.warn("If log aggregation is 
activated in the Hadoop cluster, we recommend to retrieve "
    -                                                           + "the full 
application log using this command:\n"
    -                                                           + "\tyarn logs 
-applicationId " + appReport.getApplicationId() + "\n"
    -                                                           + "(It 
sometimes takes a few seconds until the logs are aggregated)");
    -                                   }
    -                           } catch (Exception e) {
    -                                   LOG.warn("Error while getting final 
application report", e);
    -                           }
    +                   // get final application report
    +                   try {
    +                           ApplicationReport appReport = 
yarnClient.getApplicationReport(appId);
    +
    +                           LOG.info("Application " + appId + " finished 
with state " + appReport
    +                                           .getYarnApplicationState() + " 
and final state " + appReport
    +                                           .getFinalApplicationStatus() + 
" at " + appReport.getFinishTime());
    +
    +                           if (appReport.getYarnApplicationState() == 
YarnApplicationState.FAILED || appReport.getYarnApplicationState()
    +                                           == YarnApplicationState.KILLED) 
{
    +                                   LOG.warn("Application failed. 
Diagnostics " + appReport.getDiagnostics());
    +                                   LOG.warn("If log aggregation is 
activated in the Hadoop cluster, we recommend to retrieve "
    +                                                   + "the full application 
log using this command:\n"
    +                                                   + "\tyarn logs 
-applicationId " + appReport.getApplicationId() + "\n"
    +                                                   + "(It sometimes takes 
a few seconds until the logs are aggregated)");
                                }
    -                   });
    -                   actorRunner.setDaemon(true);
    -                   actorRunner.start();
    +                   } catch (Exception e) {
    +                           LOG.warn("Error while getting final application 
report", e);
    +                   }
    +                   }
    --- End diff --
    
    misalign on indentation?


> Add support for submitting single jobs to a detached YARN session
> -----------------------------------------------------------------
>
>                 Key: FLINK-1771
>                 URL: https://issues.apache.org/jira/browse/FLINK-1771
>             Project: Flink
>          Issue Type: Improvement
>          Components: YARN Client
>    Affects Versions: 0.9
>            Reporter: Robert Metzger
>            Assignee: Robert Metzger
>
> We need tests ensuring that the processing slots are set properly when 
> starting Flink on YARN, in particular with the per job YARN session feature.
> Also, the YARN tests for detached YARN sessions / per job yarn clusters are 
> polluting the local home-directory.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to