[
https://issues.apache.org/jira/browse/FLINK-1771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14387970#comment-14387970
]
ASF GitHub Bot commented on FLINK-1771:
---------------------------------------
Github user hsaputra commented on a diff in the pull request:
https://github.com/apache/flink/pull/542#discussion_r27452585
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java ---
@@ -106,70 +111,129 @@ public FlinkYarnCluster(final YarnClient yarnClient,
final ApplicationId appId,
this.sessionFilesDir = sessionFilesDir;
this.applicationId = appId;
this.detached = detached;
+ this.flinkConfig = flinkConfig;
+ this.appId = appId;
// get one application report manually
intialAppReport = yarnClient.getApplicationReport(appId);
String jobManagerHost = intialAppReport.getHost();
int jobManagerPort = intialAppReport.getRpcPort();
this.jobManagerAddress = new InetSocketAddress(jobManagerHost,
jobManagerPort);
+ }
- if(!detached) {
- // start actor system
- LOG.info("Start actor system.");
- InetAddress ownHostname =
NetUtils.resolveAddress(jobManagerAddress); // find name of own public
interface, able to connect to the JM
- actorSystem = AkkaUtils.createActorSystem(flinkConfig,
- new Some(new Tuple2<String,
Integer>(ownHostname.getCanonicalHostName(), 0)));
+ /**
+ * Connect the FlinkYarnCluster to the ApplicationMaster.
+ *
+ * Detached YARN sessions don't need to connect to the
ApplicationMaster.
+ * Detached per job YARN sessions need to connect until the required
number of TaskManagers have been started.
+ *
+ * @throws IOException
+ */
+ public void connectToCluster() throws IOException {
+ if(isConnected) {
+ throw new IllegalStateException("Can not connect to the
cluster again");
+ }
- // start application client
- LOG.info("Start application client.");
+ // start actor system
+ LOG.info("Start actor system.");
+ InetAddress ownHostname =
NetUtils.resolveAddress(jobManagerAddress); // find name of own public
interface, able to connect to the JM
+ actorSystem = AkkaUtils.createActorSystem(flinkConfig,
+ new Some(new Tuple2<String,
Integer>(ownHostname.getCanonicalHostName(), 0)));
- applicationClient =
actorSystem.actorOf(Props.create(ApplicationClient.class), "applicationClient");
+ // start application client
+ LOG.info("Start application client.");
- // instruct ApplicationClient to start a periodical
status polling
- applicationClient.tell(new
Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient);
+ applicationClient =
actorSystem.actorOf(Props.create(ApplicationClient.class, flinkConfig),
"applicationClient");
+ // instruct ApplicationClient to start a periodical status
polling
+ applicationClient.tell(new
Messages.LocalRegisterClient(this.jobManagerAddress), applicationClient);
- // add hook to ensure proper shutdown
-
Runtime.getRuntime().addShutdownHook(clientShutdownHook);
- actorRunner = new Thread(new Runnable() {
- @Override
- public void run() {
- // blocks until ApplicationMaster has been
stopped
- actorSystem.awaitTermination();
+ actorRunner = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ // blocks until ApplicationMaster has been stopped
+ actorSystem.awaitTermination();
- // get final application report
- try {
- ApplicationReport appReport =
yarnClient.getApplicationReport(appId);
-
- LOG.info("Application " + appId + "
finished with state " + appReport
-
.getYarnApplicationState() + " and final state " + appReport
-
.getFinalApplicationStatus() + " at " + appReport.getFinishTime());
-
- if (appReport.getYarnApplicationState()
== YarnApplicationState.FAILED || appReport.getYarnApplicationState()
- ==
YarnApplicationState.KILLED) {
- LOG.warn("Application failed.
Diagnostics " + appReport.getDiagnostics());
- LOG.warn("If log aggregation is
activated in the Hadoop cluster, we recommend to retrieve "
- + "the full
application log using this command:\n"
- + "\tyarn logs
-applicationId " + appReport.getApplicationId() + "\n"
- + "(It
sometimes takes a few seconds until the logs are aggregated)");
- }
- } catch (Exception e) {
- LOG.warn("Error while getting final
application report", e);
- }
+ // get final application report
+ try {
+ ApplicationReport appReport =
yarnClient.getApplicationReport(appId);
+
+ LOG.info("Application " + appId + " finished
with state " + appReport
+ .getYarnApplicationState() + "
and final state " + appReport
+ .getFinalApplicationStatus() +
" at " + appReport.getFinishTime());
+
+ if (appReport.getYarnApplicationState() ==
YarnApplicationState.FAILED || appReport.getYarnApplicationState()
+ == YarnApplicationState.KILLED)
{
+ LOG.warn("Application failed.
Diagnostics " + appReport.getDiagnostics());
+ LOG.warn("If log aggregation is
activated in the Hadoop cluster, we recommend to retrieve "
+ + "the full application
log using this command:\n"
+ + "\tyarn logs
-applicationId " + appReport.getApplicationId() + "\n"
+ + "(It sometimes takes
a few seconds until the logs are aggregated)");
}
- });
- actorRunner.setDaemon(true);
- actorRunner.start();
+ } catch (Exception e) {
+ LOG.warn("Error while getting final application
report", e);
+ }
+ }
--- End diff --
misalign on indentation?
> Add support for submitting single jobs to a detached YARN session
> -----------------------------------------------------------------
>
> Key: FLINK-1771
> URL: https://issues.apache.org/jira/browse/FLINK-1771
> Project: Flink
> Issue Type: Improvement
> Components: YARN Client
> Affects Versions: 0.9
> Reporter: Robert Metzger
> Assignee: Robert Metzger
>
> We need tests ensuring that the processing slots are set properly when
> starting Flink on YARN, in particular with the per job YARN session feature.
> Also, the YARN tests for detached YARN sessions / per job yarn clusters are
> polluting the local home-directory.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)