[
https://issues.apache.org/jira/browse/FLINK-8328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308120#comment-16308120
]
ASF GitHub Bot commented on FLINK-8328:
---------------------------------------
Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5215#discussion_r159224695
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---
@@ -743,6 +690,142 @@ private void logAndSysout(String message) {
System.out.println(message);
}
+ public static void main(final String[] args) throws Exception {
+ final FlinkYarnSessionCli cli = new FlinkYarnSessionCli("",
""); // no prefix for the YARN session
+
+ final String configurationDirectory =
CliFrontend.getConfigurationDirectoryFromEnv();
+
+ final Configuration flinkConfiguration =
GlobalConfiguration.loadConfiguration();
+ SecurityUtils.install(new
SecurityConfiguration(flinkConfiguration));
+ int retCode =
SecurityUtils.getInstalledContext().runSecured(new Callable<Integer>() {
+ @Override
+ public Integer call() {
+ return cli.run(args, flinkConfiguration,
configurationDirectory);
+ }
+ });
+ System.exit(retCode);
+ }
+
+ private static void runInteractiveCli(
+ YarnClusterClient clusterClient,
+ YarnApplicationStatusMonitor yarnApplicationStatusMonitor,
+ boolean readConsoleInput) {
+ try (BufferedReader in = new BufferedReader(new
InputStreamReader(System.in))) {
+ boolean continueRepl = true;
+ int numTaskmanagers = 0;
+ long unknownStatusSince = System.currentTimeMillis();
+
+ while (continueRepl) {
+
+ final ApplicationStatus applicationStatus =
yarnApplicationStatusMonitor.getApplicationStatusNow();
+
+ switch (applicationStatus) {
+ case FAILED:
+ case CANCELED:
+ System.err.println("The Flink
Yarn cluster has failed.");
+ continueRepl = false;
+ break;
+ case UNKNOWN:
+ if (unknownStatusSince < 0L) {
+ unknownStatusSince =
System.currentTimeMillis();
+ }
+
+ if ((System.currentTimeMillis()
- unknownStatusSince) > CLIENT_POLLING_INTERVAL_MS) {
+ System.err.println("The
Flink Yarn cluster is in an unknown state. Please check the Yarn cluster.");
+ continueRepl = false;
+ } else {
+ continueRepl =
repStep(in, readConsoleInput);
+ }
+ break;
+ case SUCCEEDED:
+ if (unknownStatusSince > 0L) {
+ unknownStatusSince =
-1L;
+ }
+
+ // ------------------ check if
there are updates by the cluster -----------
+ try {
+ final
GetClusterStatusResponse status = clusterClient.getClusterStatus();
+
+ if (status != null &&
numTaskmanagers != status.numRegisteredTaskManagers()) {
+
System.err.println("Number of connected TaskManagers changed to " +
+
status.numRegisteredTaskManagers() + ". " +
+ "Slots
available: " + status.totalNumberOfSlots());
+ numTaskmanagers
= status.numRegisteredTaskManagers();
+ }
+ } catch (Exception e) {
+ LOG.warn("Could not
retrieve the current cluster status. Skipping current retrieval attempt ...",
e);
+ }
+
+
printClusterMessages(clusterClient);
+
+ continueRepl = repStep(in,
readConsoleInput);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Exception while running the interactive
command line interface.", e);
+ }
+ }
+
+ private static void printClusterMessages(YarnClusterClient
clusterClient) {
+ final List<String> messages = clusterClient.getNewMessages();
+ if (messages != null && messages.size() > 0) {
--- End diff --
nit: ```if (!messages.isEmpty())``` should suffice because `messages` is
never null
> Pull Yarn ApplicationStatus polling out of YarnClusterClient
> ------------------------------------------------------------
>
> Key: FLINK-8328
> URL: https://issues.apache.org/jira/browse/FLINK-8328
> Project: Flink
> Issue Type: Sub-task
> Components: Client
> Affects Versions: 1.5.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to make the {{FlinkYarnSessionCli}} work with Flip-6, we have to
> pull the Yarn {{ApplicationStatus}} polling out of the {{YarnClusterClient}}.
> I propose to introduce a dedicated {{YarnApplicationStatusMonitor}}. This has
> also the benefit of separating concerns better.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)