Github user hsaputra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/542#discussion_r27452585
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java ---
    @@ -106,70 +111,129 @@ public FlinkYarnCluster(final YarnClient yarnClient, 
final ApplicationId appId,
                this.sessionFilesDir = sessionFilesDir;
                this.applicationId = appId;
                this.detached = detached;
    +           this.flinkConfig = flinkConfig;
    +           this.appId = appId;
     
                // get one application report manually
                intialAppReport = yarnClient.getApplicationReport(appId);
                String jobManagerHost = intialAppReport.getHost();
                int jobManagerPort = intialAppReport.getRpcPort();
                this.jobManagerAddress = new InetSocketAddress(jobManagerHost, 
jobManagerPort);
    +   }
     
    -           if(!detached) {
    -                   // start actor system
    -                   LOG.info("Start actor system.");
    -                   InetAddress ownHostname = 
NetUtils.resolveAddress(jobManagerAddress); // find name of own public 
interface, able to connect to the JM
    -                   actorSystem = AkkaUtils.createActorSystem(flinkConfig,
    -                                   new Some(new Tuple2<String, 
Integer>(ownHostname.getCanonicalHostName(), 0)));
    +   /**
    +    * Connect the FlinkYarnCluster to the ApplicationMaster.
    +    *
    +    * Detached YARN sessions don't need to connect to the 
ApplicationMaster.
    +    * Detached per job YARN sessions need to connect until the required 
number of TaskManagers have been started.
    +    * 
    +    * @throws IOException
    +    */
    +   public void connectToCluster() throws IOException {
    +           if(isConnected) {
    +                   throw new IllegalStateException("Can not connect to the 
cluster again");
    +           }
     
    -                   // start application client
    -                   LOG.info("Start application client.");
    +           // start actor system
    +           LOG.info("Start actor system.");
    +           InetAddress ownHostname = 
NetUtils.resolveAddress(jobManagerAddress); // find name of own public 
interface, able to connect to the JM
    +           actorSystem = AkkaUtils.createActorSystem(flinkConfig,
    +                           new Some(new Tuple2<String, 
Integer>(ownHostname.getCanonicalHostName(), 0)));
     
    -                   applicationClient = 
actorSystem.actorOf(Props.create(ApplicationClient.class), "applicationClient");
    +           // start application client
    +           LOG.info("Start application client.");
     
    -                   // instruct ApplicationClient to start a periodical 
status polling
    -                   applicationClient.tell(new 
Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient);
    +           applicationClient = 
actorSystem.actorOf(Props.create(ApplicationClient.class, flinkConfig), 
"applicationClient");
     
    +           // instruct ApplicationClient to start a periodical status 
polling
    +           applicationClient.tell(new 
Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient);
     
    -                   // add hook to ensure proper shutdown
    -                   
Runtime.getRuntime().addShutdownHook(clientShutdownHook);
     
    -                   actorRunner = new Thread(new Runnable() {
    -                           @Override
    -                           public void run() {
    -                           // blocks until ApplicationMaster has been 
stopped
    -                           actorSystem.awaitTermination();
    +           actorRunner = new Thread(new Runnable() {
    +                   @Override
    +                   public void run() {
    +                   // blocks until ApplicationMaster has been stopped
    +                   actorSystem.awaitTermination();
     
    -                           // get final application report
    -                           try {
    -                                   ApplicationReport appReport = 
yarnClient.getApplicationReport(appId);
    -
    -                                   LOG.info("Application " + appId + " 
finished with state " + appReport
    -                                                   
.getYarnApplicationState() + " and final state " + appReport
    -                                                   
.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
    -
    -                                   if (appReport.getYarnApplicationState() 
== YarnApplicationState.FAILED || appReport.getYarnApplicationState()
    -                                                   == 
YarnApplicationState.KILLED) {
    -                                           LOG.warn("Application failed. 
Diagnostics " + appReport.getDiagnostics());
    -                                           LOG.warn("If log aggregation is 
activated in the Hadoop cluster, we recommend to retrieve "
    -                                                           + "the full 
application log using this command:\n"
    -                                                           + "\tyarn logs 
-applicationId " + appReport.getApplicationId() + "\n"
    -                                                           + "(It 
sometimes takes a few seconds until the logs are aggregated)");
    -                                   }
    -                           } catch (Exception e) {
    -                                   LOG.warn("Error while getting final 
application report", e);
    -                           }
    +                   // get final application report
    +                   try {
    +                           ApplicationReport appReport = 
yarnClient.getApplicationReport(appId);
    +
    +                           LOG.info("Application " + appId + " finished 
with state " + appReport
    +                                           .getYarnApplicationState() + " 
and final state " + appReport
    +                                           .getFinalApplicationStatus() + 
" at " + appReport.getFinishTime());
    +
    +                           if (appReport.getYarnApplicationState() == 
YarnApplicationState.FAILED || appReport.getYarnApplicationState()
    +                                           == YarnApplicationState.KILLED) 
{
    +                                   LOG.warn("Application failed. 
Diagnostics " + appReport.getDiagnostics());
    +                                   LOG.warn("If log aggregation is 
activated in the Hadoop cluster, we recommend to retrieve "
    +                                                   + "the full application 
log using this command:\n"
    +                                                   + "\tyarn logs 
-applicationId " + appReport.getApplicationId() + "\n"
    +                                                   + "(It sometimes takes 
a few seconds until the logs are aggregated)");
                                }
    -                   });
    -                   actorRunner.setDaemon(true);
    -                   actorRunner.start();
    +                   } catch (Exception e) {
    +                           LOG.warn("Error while getting final application 
report", e);
    +                   }
    +                   }
    --- End diff --
    
    misalign on indentation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to