Repository: flink
Updated Branches:
  refs/heads/master 45762162f -> c4783c856


[FLINK-4824] [client] CliFrontend shows misleading error message

When a command-line program is run but no Flink job is executed the
message to the user is now displayed without the stacktrace.

When a Flink program throws ProgramParametrizationException the
optional message is printed to stderr without a stacktrace.

This closes #2662


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4783c85
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4783c85
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4783c85

Branch: refs/heads/master
Commit: c4783c856a8ea71271d411fa8c4dc9c484f29ff0
Parents: 4576216
Author: Greg Hogan <[email protected]>
Authored: Thu Oct 13 12:40:53 2016 -0400
Committer: Greg Hogan <[email protected]>
Committed: Mon Oct 24 10:33:54 2016 -0400

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 29 +++++++++++
 .../flink/client/program/ClusterClient.java     | 33 ++++++------
 .../flink/client/program/PackagedProgram.java   | 18 ++++---
 .../program/ProgramMissingJobException.java     | 29 +++++++++++
 .../ProgramParametrizationException.java        | 41 +++++++++++++++
 .../flink/graph/examples/JaccardIndex.java      | 54 +++++++++++---------
 6 files changed, 154 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c4783c85/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 8b445e6..236ee94 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -39,6 +39,8 @@ import org.apache.flink.client.cli.StopOptions;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.ProgramMissingJobException;
+import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -826,6 +828,10 @@ public class CliFrontend {
                JobSubmissionResult result;
                try {
                        result = client.run(program, parallelism);
+               } catch (ProgramParametrizationException e) {
+                       return handleParametrizationException(e);
+               } catch (ProgramMissingJobException e) {
+                       return handleMissingJobException();
                } catch (ProgramInvocationException e) {
                        return handleError(e);
                } finally {
@@ -975,6 +981,29 @@ public class CliFrontend {
        }
 
        /**
+        * Displays an optional exception message for incorrect program 
parametrization.
+        *
+        * @param e The exception to display.
+        * @return The return code for the process.
+        */
+       private int 
handleParametrizationException(ProgramParametrizationException e) {
+               System.err.println(e.getMessage());
+               return 1;
+       }
+
+       /**
+        * Displays a message for a program without a job to execute.
+        *
+        * @return The return code for the process.
+        */
+       private int handleMissingJobException() {
+               System.err.println();
+               System.err.println("The program didn't contain a Flink job. " +
+                       "Perhaps you forgot to call execute() on the execution 
environment.");
+               return 1;
+       }
+
+       /**
         * Displays an exception message.
         * 
         * @param t The exception to display.

http://git-wip-us.apache.org/repos/asf/flink/blob/c4783c85/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 25bcadc..ff5701f 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -18,21 +18,15 @@
 
 package org.apache.flink.client.program;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
+import akka.actor.ActorSystem;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
@@ -42,8 +36,6 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -53,10 +45,10 @@ import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusRespon
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import 
org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
 import 
org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
-import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.net.ConnectionUtils;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -64,13 +56,20 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.Some;
 import scala.Tuple2;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorSystem;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 
 /**
@@ -301,10 +300,11 @@ public abstract class ClusterClient {
         * @param prog the packaged program
         * @param parallelism the parallelism to execute the contained Flink job
         * @return The result of the execution
+        * @throws ProgramMissingJobException
         * @throws ProgramInvocationException
         */
        public JobSubmissionResult run(PackagedProgram prog, int parallelism)
-                       throws ProgramInvocationException
+                       throws ProgramInvocationException, 
ProgramMissingJobException
        {
                
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
                if (prog.isUsingProgramEntryPoint()) {
@@ -321,8 +321,7 @@ public abstract class ClusterClient {
                                // invoke main method
                                prog.invokeInteractiveModeForExecution();
                                if (lastJobExecutionResult == null && 
factory.getLastEnvCreated() == null) {
-                                       throw new 
ProgramInvocationException("The program didn't contain Flink jobs. " +
-                                               "Perhaps you forgot to call 
execute() on the execution environment.");
+                                       throw new ProgramMissingJobException();
                                }
                                if (isDetached()) {
                                        // in detached mode, we execute the 
whole user code to extract the Flink job, afterwards we run it here

http://git-wip-us.apache.org/repos/asf/flink/blob/c4783c85/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 2a88043..aca873e 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -19,6 +19,14 @@
 
 package org.apache.flink.client.program;
 
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.Program;
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.dag.DataSinkNode;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.util.InstantiationUtil;
+
 import java.io.BufferedInputStream;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -43,14 +51,6 @@ import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import java.util.jar.Manifest;
 
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.Program;
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.dag.DataSinkNode;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.util.InstantiationUtil;
-
 /**
  * This class encapsulates represents a program, packaged in a jar file. It 
supplies
  * functionality to extract nested libraries, search for the program entry 
point, and extract
@@ -518,6 +518,8 @@ public class PackagedProgram {
                        Throwable exceptionInMethod = e.getTargetException();
                        if (exceptionInMethod instanceof Error) {
                                throw (Error) exceptionInMethod;
+                       } else if (exceptionInMethod instanceof 
ProgramParametrizationException) {
+                               throw (ProgramParametrizationException) 
exceptionInMethod;
                        } else if (exceptionInMethod instanceof 
ProgramInvocationException) {
                                throw (ProgramInvocationException) 
exceptionInMethod;
                        } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/c4783c85/flink-clients/src/main/java/org/apache/flink/client/program/ProgramMissingJobException.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ProgramMissingJobException.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramMissingJobException.java
new file mode 100644
index 0000000..43d608b
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramMissingJobException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+/**
+ * Exception used to indicate that no job was executed during the invocation 
of a Flink program.
+ */
+public class ProgramMissingJobException extends Exception {
+       /**
+        * Serial version UID for serialization interoperability.
+        */
+       private static final long serialVersionUID = -1964276369605091101L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4783c85/flink-clients/src/main/java/org/apache/flink/client/program/ProgramParametrizationException.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ProgramParametrizationException.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramParametrizationException.java
new file mode 100644
index 0000000..9b5ac82
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ProgramParametrizationException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Exception used to indicate that there is an error in the parametrization of 
a Flink program.
+ */
+public class ProgramParametrizationException extends RuntimeException {
+       /**
+        * Serial version UID for serialization interoperability.
+        */
+       private static final long serialVersionUID = 909054589029890262L;
+
+       /**
+        * Creates a <tt>ProgramParametrizationException</tt> with the given 
message.
+        *
+        * @param message
+        *        The program usage string.
+        */
+       public ProgramParametrizationException(String message) {
+               super(Preconditions.checkNotNull(message));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4783c85/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
index 4cfbc71..2845e2d 100644
--- 
a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
+++ 
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.examples;
 
 import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
 import org.apache.commons.math3.random.JDKRandomGenerator;
 import org.apache.flink.api.common.JobExecutionResult;
@@ -27,11 +28,12 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphCsvReader;
 import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import 
org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import 
org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
 import org.apache.flink.graph.generator.RMatGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
 import org.apache.flink.graph.generator.random.RandomGenerableFactory;
@@ -62,24 +64,29 @@ public class JaccardIndex {
 
        public static final boolean DEFAULT_CLIP_AND_FLIP = true;
 
-       private static void printUsage() {
-               System.out.println(WordUtils.wrap("The Jaccard Index measures 
the similarity between vertex" +
-                       " neighborhoods and is computed as the number of shared 
neighbors divided by the number of" +
-                       " distinct neighbors. Scores range from 0.0 (no shared 
neighbors) to 1.0 (all neighbors are" +
-                       " shared).", 80));
-               System.out.println();
-               System.out.println(WordUtils.wrap("This algorithm returns 
4-tuples containing two vertex IDs, the" +
-                       " number of shared neighbors, and the number of 
distinct neighbors.", 80));
-               System.out.println();
-               System.out.println("usage: JaccardIndex --input <csv | rmat 
[options]> --output <print | hash | csv [options]>");
-               System.out.println();
-               System.out.println("options:");
-               System.out.println("  --input csv --type <integer | string> 
--input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] 
[--input_field_delimiter FIELD_DELIMITER]");
-               System.out.println("  --input rmat [--scale SCALE] 
[--edge_factor EDGE_FACTOR]");
-               System.out.println();
-               System.out.println("  --output print");
-               System.out.println("  --output hash");
-               System.out.println("  --output csv --output_filename FILENAME 
[--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter 
FIELD_DELIMITER]");
+       private static String getUsage(String message) {
+               return new StrBuilder()
+                       .appendNewLine()
+                       .appendln(WordUtils.wrap("The Jaccard Index measures 
the similarity between vertex" +
+                               " neighborhoods and is computed as the number 
of shared neighbors divided by the number of" +
+                               " distinct neighbors. Scores range from 0.0 (no 
shared neighbors) to 1.0 (all neighbors are" +
+                               " shared).", 80))
+                       .appendNewLine()
+                       .appendln(WordUtils.wrap("This algorithm returns 
4-tuples containing two vertex IDs, the" +
+                               " number of shared neighbors, and the number of 
distinct neighbors.", 80))
+                       .appendNewLine()
+                       .appendln("usage: JaccardIndex --input <csv | rmat 
[options]> --output <print | hash | csv [options]>")
+                       .appendNewLine()
+                       .appendln("options:")
+                       .appendln("  --input csv --type <integer | string> 
--input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] 
[--input_field_delimiter FIELD_DELIMITER]")
+                       .appendln("  --input rmat [--scale SCALE] 
[--edge_factor EDGE_FACTOR]")
+                       .appendNewLine()
+                       .appendln("  --output print")
+                       .appendln("  --output hash")
+                       .appendln("  --output csv --output_filename FILENAME 
[--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter 
FIELD_DELIMITER]")
+                       .appendNewLine()
+                       .appendln("Usage error: " + message)
+                       .toString();
        }
 
        public static void main(String[] args) throws Exception {
@@ -123,8 +130,7 @@ public class JaccardIndex {
                                        } break;
 
                                        default:
-                                               printUsage();
-                                               return;
+                                               throw new 
ProgramParametrizationException(getUsage("invalid CSV type"));
                                }
                                } break;
 
@@ -161,8 +167,7 @@ public class JaccardIndex {
                                } break;
 
                        default:
-                               printUsage();
-                               return;
+                               throw new 
ProgramParametrizationException(getUsage("invalid input type"));
                }
 
                switch (parameters.get("output", "")) {
@@ -192,8 +197,7 @@ public class JaccardIndex {
                                break;
 
                        default:
-                               printUsage();
-                               return;
+                               throw new 
ProgramParametrizationException(getUsage("invalid output type"));
                }
 
                JobExecutionResult result = env.getLastJobExecutionResult();

Reply via email to