Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/542#discussion_r27456302
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java ---
@@ -106,70 +111,129 @@ public FlinkYarnCluster(final YarnClient yarnClient,
final ApplicationId appId,
this.sessionFilesDir = sessionFilesDir;
this.applicationId = appId;
this.detached = detached;
+ this.flinkConfig = flinkConfig;
+ this.appId = appId;
// get one application report manually
intialAppReport = yarnClient.getApplicationReport(appId);
String jobManagerHost = intialAppReport.getHost();
int jobManagerPort = intialAppReport.getRpcPort();
this.jobManagerAddress = new InetSocketAddress(jobManagerHost,
jobManagerPort);
+ }
- if(!detached) {
- // start actor system
- LOG.info("Start actor system.");
- InetAddress ownHostname =
NetUtils.resolveAddress(jobManagerAddress); // find name of own public
interface, able to connect to the JM
- actorSystem = AkkaUtils.createActorSystem(flinkConfig,
- new Some(new Tuple2<String,
Integer>(ownHostname.getCanonicalHostName(), 0)));
+ /**
+ * Connect the FlinkYarnCluster to the ApplicationMaster.
+ *
+ * Detached YARN sessions don't need to connect to the
ApplicationMaster.
+ * Detached per job YARN sessions need to connect until the required
number of TaskManagers have been started.
+ *
+ * @throws IOException
+ */
+ public void connectToCluster() throws IOException {
+ if(isConnected) {
+ throw new IllegalStateException("Can not connect to the
cluster again");
+ }
- // start application client
- LOG.info("Start application client.");
+ // start actor system
+ LOG.info("Start actor system.");
+ InetAddress ownHostname =
NetUtils.resolveAddress(jobManagerAddress); // find name of own public
interface, able to connect to the JM
+ actorSystem = AkkaUtils.createActorSystem(flinkConfig,
+ new Some(new Tuple2<String,
Integer>(ownHostname.getCanonicalHostName(), 0)));
- applicationClient =
actorSystem.actorOf(Props.create(ApplicationClient.class), "applicationClient");
+ // start application client
+ LOG.info("Start application client.");
- // instruct ApplicationClient to start a periodical
status polling
- applicationClient.tell(new
Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient);
+ applicationClient =
actorSystem.actorOf(Props.create(ApplicationClient.class, flinkConfig),
"applicationClient");
+ // instruct ApplicationClient to start a periodical status
polling
+ applicationClient.tell(new
Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient);
- // add hook to ensure proper shutdown
-
Runtime.getRuntime().addShutdownHook(clientShutdownHook);
- actorRunner = new Thread(new Runnable() {
- @Override
- public void run() {
- // blocks until ApplicationMaster has been
stopped
- actorSystem.awaitTermination();
+ actorRunner = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ // blocks until ApplicationMaster has been stopped
+ actorSystem.awaitTermination();
- // get final application report
- try {
- ApplicationReport appReport =
yarnClient.getApplicationReport(appId);
-
- LOG.info("Application " + appId + "
finished with state " + appReport
-
.getYarnApplicationState() + " and final state " + appReport
-
.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
-
- if (appReport.getYarnApplicationState()
== YarnApplicationState.FAILED || appReport.getYarnApplicationState()
- ==
YarnApplicationState.KILLED) {
- LOG.warn("Application failed.
Diagnostics " + appReport.getDiagnostics());
- LOG.warn("If log aggregation is
activated in the Hadoop cluster, we recommend to retrieve "
- + "the full
application log using this command:\n"
- + "\tyarn logs
-applicationId " + appReport.getApplicationId() + "\n"
- + "(It
sometimes takes a few seconds until the logs are aggregated)");
- }
- } catch (Exception e) {
- LOG.warn("Error while getting final
application report", e);
- }
+ // get final application report
+ try {
+ ApplicationReport appReport =
yarnClient.getApplicationReport(appId);
+
+ LOG.info("Application " + appId + " finished
with state " + appReport
+ .getYarnApplicationState() + "
and final state " + appReport
+ .getFinalApplicationStatus() +
" at " + appReport.getFinishTime());
+
+ if (appReport.getYarnApplicationState() ==
YarnApplicationState.FAILED || appReport.getYarnApplicationState()
+ == YarnApplicationState.KILLED)
{
+ LOG.warn("Application failed.
Diagnostics " + appReport.getDiagnostics());
+ LOG.warn("If log aggregation is
activated in the Hadoop cluster, we recommend to retrieve "
+ + "the full application
log using this command:\n"
+ + "\tyarn logs
-applicationId " + appReport.getApplicationId() + "\n"
+ + "(It sometimes takes
a few seconds until the logs are aggregated)");
}
- });
- actorRunner.setDaemon(true);
- actorRunner.start();
+ } catch (Exception e) {
+ LOG.warn("Error while getting final application
report", e);
+ }
+ }
--- End diff --
fixed
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---