Repository: flink Updated Branches: refs/heads/master 45762162f -> c4783c856
[FLINK-4824] [client] CliFrontend shows misleading error message When a command-line program is run but no Flink job is executed the message to the user is now displayed without the stacktrace. When a Flink program throws ProgramParametrizationException the optional message is printed to stderr without a stacktrace. This closes #2662 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4783c85 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4783c85 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4783c85 Branch: refs/heads/master Commit: c4783c856a8ea71271d411fa8c4dc9c484f29ff0 Parents: 4576216 Author: Greg Hogan <[email protected]> Authored: Thu Oct 13 12:40:53 2016 -0400 Committer: Greg Hogan <[email protected]> Committed: Mon Oct 24 10:33:54 2016 -0400 ---------------------------------------------------------------------- .../org/apache/flink/client/CliFrontend.java | 29 +++++++++++ .../flink/client/program/ClusterClient.java | 33 ++++++------ .../flink/client/program/PackagedProgram.java | 18 ++++--- .../program/ProgramMissingJobException.java | 29 +++++++++++ .../ProgramParametrizationException.java | 41 +++++++++++++++ .../flink/graph/examples/JaccardIndex.java | 54 +++++++++++--------- 6 files changed, 154 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c4783c85/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 8b445e6..236ee94 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -39,6 +39,8 @@ import org.apache.flink.client.cli.StopOptions; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.client.program.ProgramMissingJobException; +import org.apache.flink.client.program.ProgramParametrizationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -826,6 +828,10 @@ public class CliFrontend { JobSubmissionResult result; try { result = client.run(program, parallelism); + } catch (ProgramParametrizationException e) { + return handleParametrizationException(e); + } catch (ProgramMissingJobException e) { + return handleMissingJobException(); } catch (ProgramInvocationException e) { return handleError(e); } finally { @@ -975,6 +981,29 @@ public class CliFrontend { } /** + * Displays an optional exception message for incorrect program parametrization. + * + * @param e The exception to display. + * @return The return code for the process. + */ + private int handleParametrizationException(ProgramParametrizationException e) { + System.err.println(e.getMessage()); + return 1; + } + + /** + * Displays a message for a program without a job to execute. + * + * @return The return code for the process. + */ + private int handleMissingJobException() { + System.err.println(); + System.err.println("The program didn't contain a Flink job. " + + "Perhaps you forgot to call execute() on the execution environment."); + return 1; + } + + /** * Displays an exception message. * * @param t The exception to display. http://git-wip-us.apache.org/repos/asf/flink/blob/c4783c85/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index 25bcadc..ff5701f 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -18,21 +18,15 @@ package org.apache.flink.client.program; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.Collections; -import java.util.List; -import java.util.Map; - +import akka.actor.ActorSystem; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; @@ -42,8 +36,6 @@ import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.StreamingPlan; import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; import org.apache.flink.optimizer.plantranslate.JobGraphGenerator; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.client.JobClient; import org.apache.flink.runtime.client.JobExecutionException; @@ -53,10 +45,10 @@ import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusRespon import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous; import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound; import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults; -import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.net.ConnectionUtils; import org.apache.flink.runtime.util.LeaderConnectionInfo; import org.apache.flink.runtime.util.LeaderRetrievalUtils; @@ -64,13 +56,20 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.Some; import scala.Tuple2; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; -import akka.actor.ActorSystem; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.Collections; +import java.util.List; +import java.util.Map; /** @@ -301,10 +300,11 @@ public abstract class ClusterClient { * @param prog the packaged program * @param parallelism the parallelism to execute the contained Flink job * @return The result of the execution + * @throws ProgramMissingJobException * @throws ProgramInvocationException */ public JobSubmissionResult run(PackagedProgram prog, int parallelism) - throws ProgramInvocationException + throws ProgramInvocationException, ProgramMissingJobException { Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); if (prog.isUsingProgramEntryPoint()) { @@ -321,8 +321,7 @@ public abstract class ClusterClient { // invoke main method prog.invokeInteractiveModeForExecution(); if (lastJobExecutionResult == null && factory.getLastEnvCreated() == null) { - throw new ProgramInvocationException("The program didn't contain Flink jobs. " + - "Perhaps you forgot to call execute() on the execution environment."); + throw new ProgramMissingJobException(); } if (isDetached()) { // in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here http://git-wip-us.apache.org/repos/asf/flink/blob/c4783c85/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index 2a88043..aca873e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -19,6 +19,14 @@ package org.apache.flink.client.program; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.Program; +import org.apache.flink.api.common.ProgramDescription; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.dag.DataSinkNode; +import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; +import org.apache.flink.util.InstantiationUtil; + import java.io.BufferedInputStream; import java.io.File; import java.io.FileOutputStream; @@ -43,14 +51,6 @@ import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.jar.Manifest; -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.Program; -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.optimizer.Optimizer; -import org.apache.flink.optimizer.dag.DataSinkNode; -import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; -import org.apache.flink.util.InstantiationUtil; - /** * This class encapsulates represents a program, packaged in a jar file. It supplies * functionality to extract nested libraries, search for the program entry point, and extract @@ -518,6 +518,8 @@ public class PackagedProgram { Throwable exceptionInMethod = e.getTargetException(); if (exceptionInMethod instanceof Error) { throw (Error) exceptionInMethod; + } else if (exceptionInMethod instanceof ProgramParametrizationException) { + throw (ProgramParametrizationException) exceptionInMethod; } else if (exceptionInMethod instanceof ProgramInvocationException) { throw (ProgramInvocationException) exceptionInMethod; } else { http://git-wip-us.apache.org/repos/asf/flink/blob/c4783c85/flink-clients/src/main/java/org/apache/flink/client/program/ProgramMissingJobException.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ProgramMissingJobException.java b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramMissingJobException.java new file mode 100644 index 0000000..43d608b --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramMissingJobException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program; + +/** + * Exception used to indicate that no job was executed during the invocation of a Flink program. + */ +public class ProgramMissingJobException extends Exception { + /** + * Serial version UID for serialization interoperability. + */ + private static final long serialVersionUID = -1964276369605091101L; +} http://git-wip-us.apache.org/repos/asf/flink/blob/c4783c85/flink-clients/src/main/java/org/apache/flink/client/program/ProgramParametrizationException.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ProgramParametrizationException.java b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramParametrizationException.java new file mode 100644 index 0000000..9b5ac82 --- /dev/null +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramParametrizationException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program; + +import org.apache.flink.util.Preconditions; + +/** + * Exception used to indicate that there is an error in the parametrization of a Flink program. + */ +public class ProgramParametrizationException extends RuntimeException { + /** + * Serial version UID for serialization interoperability. + */ + private static final long serialVersionUID = 909054589029890262L; + + /** + * Creates a <tt>ProgramParametrizationException</tt> with the given message. + * + * @param message + * The program usage string. + */ + public ProgramParametrizationException(String message) { + super(Preconditions.checkNotNull(message)); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c4783c85/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java index 4cfbc71..2845e2d 100644 --- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java +++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java @@ -19,6 +19,7 @@ package org.apache.flink.graph.examples; import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.text.StrBuilder; import org.apache.commons.lang3.text.WordUtils; import org.apache.commons.math3.random.JDKRandomGenerator; import org.apache.flink.api.common.JobExecutionResult; @@ -27,11 +28,12 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.CsvOutputFormat; import org.apache.flink.api.java.utils.DataSetUtils; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.client.program.ProgramParametrizationException; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphCsvReader; import org.apache.flink.graph.asm.simple.undirected.Simplify; -import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue; import org.apache.flink.graph.asm.translate.TranslateGraphIds; +import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue; import org.apache.flink.graph.generator.RMatGraph; import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; import org.apache.flink.graph.generator.random.RandomGenerableFactory; @@ -62,24 +64,29 @@ public class JaccardIndex { public static final boolean DEFAULT_CLIP_AND_FLIP = true; - private static void printUsage() { - System.out.println(WordUtils.wrap("The Jaccard Index measures the similarity between vertex" + - " neighborhoods and is computed as the number of shared neighbors divided by the number of" + - " distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are" + - " shared).", 80)); - System.out.println(); - System.out.println(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" + - " number of shared neighbors, and the number of distinct neighbors.", 80)); - System.out.println(); - System.out.println("usage: JaccardIndex --input <csv | rmat [options]> --output <print | hash | csv [options]>"); - System.out.println(); - System.out.println("options:"); - System.out.println(" --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]"); - System.out.println(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]"); - System.out.println(); - System.out.println(" --output print"); - System.out.println(" --output hash"); - System.out.println(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]"); + private static String getUsage(String message) { + return new StrBuilder() + .appendNewLine() + .appendln(WordUtils.wrap("The Jaccard Index measures the similarity between vertex" + + " neighborhoods and is computed as the number of shared neighbors divided by the number of" + + " distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are" + + " shared).", 80)) + .appendNewLine() + .appendln(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" + + " number of shared neighbors, and the number of distinct neighbors.", 80)) + .appendNewLine() + .appendln("usage: JaccardIndex --input <csv | rmat [options]> --output <print | hash | csv [options]>") + .appendNewLine() + .appendln("options:") + .appendln(" --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]") + .appendln(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]") + .appendNewLine() + .appendln(" --output print") + .appendln(" --output hash") + .appendln(" --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]") + .appendNewLine() + .appendln("Usage error: " + message) + .toString(); } public static void main(String[] args) throws Exception { @@ -123,8 +130,7 @@ public class JaccardIndex { } break; default: - printUsage(); - return; + throw new ProgramParametrizationException(getUsage("invalid CSV type")); } } break; @@ -161,8 +167,7 @@ public class JaccardIndex { } break; default: - printUsage(); - return; + throw new ProgramParametrizationException(getUsage("invalid input type")); } switch (parameters.get("output", "")) { @@ -192,8 +197,7 @@ public class JaccardIndex { break; default: - printUsage(); - return; + throw new ProgramParametrizationException(getUsage("invalid output type")); } JobExecutionResult result = env.getLastJobExecutionResult();
