[hotfix] Clean up CliFrontend after removing web client
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17fa6a9b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17fa6a9b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17fa6a9b Branch: refs/heads/master Commit: 17fa6a9bc965eb4dcd64123b7d1c66a75c077db6 Parents: c0fd36b Author: Stephan Ewen <se...@apache.org> Authored: Fri Jan 15 18:36:13 2016 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Sat Jan 16 15:46:56 2016 +0100 ---------------------------------------------------------------------- .../org/apache/flink/client/CliFrontend.java | 125 ++++++++----------- .../flink/yarn/FlinkYarnSessionCliTest.java | 10 +- .../apache/flink/yarn/FlinkYarnClientBase.java | 9 +- .../flink/yarn/ApplicationMasterBase.scala | 8 +- 4 files changed, 62 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/17fa6a9b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 7e1cef7..4b9bd06 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -19,13 +19,14 @@ package org.apache.flink.client; import akka.actor.ActorSystem; + import org.apache.commons.cli.CommandLine; + import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.accumulators.AccumulatorHelper; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.cli.CancelOptions; import org.apache.flink.client.cli.CliArgsException; import org.apache.flink.client.cli.CliFrontendParser; @@ -65,8 +66,10 @@ import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus; import org.apache.flink.util.StringUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import scala.Some; import scala.concurrent.Await; import scala.concurrent.Future; @@ -87,6 +90,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -95,7 +99,6 @@ import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint; import static org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepointFailure; import static org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointFailure; -import static org.apache.flink.runtime.messages.JobManagerMessages.getRequestRunningJobsStatus; /** * Implementation of a simple command line frontend for executing programs. @@ -133,6 +136,7 @@ public class CliFrontend { private static final Logger LOG = LoggerFactory.getLogger(CliFrontend.class); + private final Configuration config; private final FiniteDuration askTimeout; @@ -143,12 +147,6 @@ public class CliFrontend { private AbstractFlinkYarnCluster yarnCluster; - static boolean webFrontend = false; - - private FlinkPlan optimizedPlan; - - private PackagedProgram packagedProgram; - /** * * @throws Exception Thrown if the configuration directory was not found, the configuration could not @@ -222,9 +220,9 @@ public class CliFrontend { // handle the YARN client's dynamic properties String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING); - List<Tuple2<String, String>> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded); - for (Tuple2<String, String> dynamicProperty : dynamicProperties) { - this.config.setString(dynamicProperty.f0, dynamicProperty.f1); + Map<String, String> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded); + for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet()) { + this.config.setString(dynamicProperty.getKey(), dynamicProperty.getValue()); } } @@ -408,42 +406,34 @@ public class CliFrontend { LOG.info("Creating program plan dump"); Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); - FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, parallelism); + + String jsonPlan = null; + if (flinkPlan instanceof OptimizedPlan) { + jsonPlan = new PlanJSONDumpGenerator().getOptimizerPlanAsJSON((OptimizedPlan) flinkPlan); + } else if (flinkPlan instanceof StreamingPlan) { + jsonPlan = ((StreamingPlan) flinkPlan).getStreamingPlanAsJSON(); + } - if (webFrontend) { - this.optimizedPlan = flinkPlan; - this.packagedProgram = program; - } else { - String jsonPlan = null; - if (flinkPlan instanceof OptimizedPlan) { - jsonPlan = new PlanJSONDumpGenerator().getOptimizerPlanAsJSON((OptimizedPlan) flinkPlan); - } else if (flinkPlan instanceof StreamingPlan) { - jsonPlan = ((StreamingPlan) flinkPlan).getStreamingPlanAsJSON(); - } - - if (jsonPlan != null) { - System.out.println("----------------------- Execution Plan -----------------------"); - System.out.println(jsonPlan); - System.out.println("--------------------------------------------------------------"); - } - else { - System.out.println("JSON plan could not be generated."); - } + if (jsonPlan != null) { + System.out.println("----------------------- Execution Plan -----------------------"); + System.out.println(jsonPlan); + System.out.println("--------------------------------------------------------------"); + } + else { + System.out.println("JSON plan could not be generated."); + } - String description = program.getDescription(); - if (description != null) { - System.out.println(); - System.out.println(description); - } - else { - System.out.println(); - System.out.println("No description provided."); - } + String description = program.getDescription(); + if (description != null) { + System.out.println(); + System.out.println(description); + } + else { + System.out.println(); + System.out.println("No description provided."); } return 0; - - } catch (Throwable t) { return handleError(t); @@ -492,7 +482,7 @@ public class CliFrontend { LOG.info("Connecting to JobManager to retrieve list of jobs"); Future<Object> response = jobManagerGateway.ask( - getRequestRunningJobsStatus(), + JobManagerMessages.getRequestRunningJobsStatus(), askTimeout); Object result; @@ -792,10 +782,8 @@ public class CliFrontend { yarnCluster.stopAfterJob(result.getJobID()); yarnCluster.disconnect(); } - - if (!webFrontend) { - System.out.println("Job has been submitted with JobID " + result.getJobID()); - } + + System.out.println("Job has been submitted with JobID " + result.getJobID()); return 0; } @@ -816,7 +804,7 @@ public class CliFrontend { LOG.info("Program execution finished"); - if (result instanceof JobExecutionResult && !webFrontend) { + if (result instanceof JobExecutionResult) { JobExecutionResult execResult = (JobExecutionResult) result; System.out.println("Job with JobID " + execResult.getJobID() + " has finished."); System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms"); @@ -933,7 +921,6 @@ public class CliFrontend { * @param options Command line options which contain JobManager address * @param programName Program name * @param userParallelism Given user parallelism - * @return * @throws Exception */ protected Client getClient( @@ -1035,9 +1022,6 @@ public class CliFrontend { * @return The return code for the process. */ private int handleArgException(Exception e) { - if (webFrontend) { - throw new RuntimeException(e); - } LOG.error("Invalid command line arguments." + (e.getMessage() == null ? "" : e.getMessage())); System.out.println(e.getMessage()); @@ -1053,9 +1037,6 @@ public class CliFrontend { * @return The return code for the process. */ private int handleError(Throwable t) { - if (webFrontend) { - throw new RuntimeException(t); - } LOG.error("Error while running the command.", t); System.err.println(); @@ -1080,9 +1061,7 @@ public class CliFrontend { private void logAndSysout(String message) { LOG.info(message); - if (!webFrontend) { - System.out.println(message); - } + System.out.println(message); } // -------------------------------------------------------------------------------------------- @@ -1117,9 +1096,6 @@ public class CliFrontend { if (SecurityUtils.isSecurityEnabled()) { String message = "Secure Hadoop environment setup detected. Running in secure context."; LOG.info(message); - if (!webFrontend) { - System.out.println(message); - } try { return SecurityUtils.runSecured(new SecurityUtils.FlinkSecuredRunner<Integer>() { @@ -1165,14 +1141,6 @@ public class CliFrontend { } } - public FlinkPlan getFlinkPlan() { - return this.optimizedPlan; - } - - public PackagedProgram getPackagedProgram() { - return this.packagedProgram; - } - public void shutdown() { ActorSystem sys = this.actorSystem; if (sys != null) { @@ -1251,20 +1219,25 @@ public class CliFrontend { return location; } - public static List<Tuple2<String, String>> getDynamicProperties(String dynamicPropertiesEncoded) { - List<Tuple2<String, String>> ret = new ArrayList<Tuple2<String, String>>(); - if(dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) { + public static Map<String, String> getDynamicProperties(String dynamicPropertiesEncoded) { + if (dynamicPropertiesEncoded != null && dynamicPropertiesEncoded.length() > 0) { + Map<String, String> properties = new HashMap<>(); + String[] propertyLines = dynamicPropertiesEncoded.split(CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR); - for(String propLine : propertyLines) { - if(propLine == null) { + for (String propLine : propertyLines) { + if (propLine == null) { continue; } + String[] kv = propLine.split("="); if (kv.length >= 2 && kv[0] != null && kv[1] != null && kv[0].length() > 0) { - ret.add(new Tuple2<String, String>(kv[0], kv[1])); + properties.put(kv[0], kv[1]); } } + return properties; + } + else { + return Collections.emptyMap(); } - return ret; } } http://git-wip-us.apache.org/repos/asf/flink/blob/17fa6a9b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java index fb644c3..30116af 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java @@ -22,11 +22,12 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; -import org.apache.flink.api.java.tuple.Tuple2; + import org.apache.flink.client.CliFrontend; import org.apache.flink.client.FlinkYarnSessionCli; import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; import org.apache.flink.test.util.TestBaseUtils; + import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -35,7 +36,6 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; import java.util.HashMap; -import java.util.List; import java.util.Map; public class FlinkYarnSessionCliTest { @@ -69,10 +69,8 @@ public class FlinkYarnSessionCliTest { Assert.assertNotNull(flinkYarnClient); - List<Tuple2<String, String>> dynProperties = CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded()); + Map<String, String> dynProperties = CliFrontend.getDynamicProperties(flinkYarnClient.getDynamicPropertiesEncoded()); Assert.assertEquals(1, dynProperties.size()); - Assert.assertEquals("akka.ask.timeout", dynProperties.get(0).f0); - Assert.assertEquals("5 min", dynProperties.get(0).f1); + Assert.assertEquals("5 min", dynProperties.get("akka.ask.timeout")); } - } http://git-wip-us.apache.org/repos/asf/flink/blob/17fa6a9b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java index 993d24e..de7c933 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java @@ -18,7 +18,6 @@ package org.apache.flink.yarn; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.client.CliFrontend; import org.apache.flink.client.FlinkYarnSessionCli; import org.apache.flink.configuration.ConfigConstants; @@ -26,6 +25,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobmanager.RecoveryMode; import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient; import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Records; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -358,9 +359,9 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient { // ------------------ Add dynamic properties to local flinkConfiguraton ------ - List<Tuple2<String, String>> dynProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncoded); - for (Tuple2<String, String> dynProperty : dynProperties) { - flinkConfiguration.setString(dynProperty.f0, dynProperty.f1); + Map<String, String> dynProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncoded); + for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) { + flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue()); } // ------------------ Check if the specified queue exists -------------- http://git-wip-us.apache.org/repos/asf/flink/blob/17fa6a9b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala index 12f8585..7579d7d 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala @@ -244,8 +244,8 @@ abstract class ApplicationMasterBase { import scala.collection.JavaConverters._ - for(property <- dynamicProperties.asScala){ - output.println(s"${property.f0}: ${property.f1}") + for (property <- dynamicProperties.asScala){ + output.println(s"${property._1}: ${property._2}") } output.close() @@ -262,8 +262,8 @@ abstract class ApplicationMasterBase { // add dynamic properties to JobManager configuration. val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString) import scala.collection.JavaConverters._ - for(property <- dynamicProperties.asScala){ - configuration.setString(property.f0, property.f1) + for (property <- dynamicProperties.asScala){ + configuration.setString(property._1, property._2) } configuration