Repository: flink
Updated Branches:
  refs/heads/master e85f787b2 -> 082d87e51


[FLINK-4199] fix misleading CLI messages during job submission

- change CLI message upon cluster retrieval
- save JobExecutionResult for interactive executions
- only print Collection size in accumulator results
- remove unused helper method

This closes #2264


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17589d45
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17589d45
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17589d45

Branch: refs/heads/master
Commit: 17589d454d00efa43cdf6116ea29ff4f513b6f20
Parents: e85f787
Author: Maximilian Michels <[email protected]>
Authored: Tue Jul 19 09:51:23 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Tue Jul 19 18:00:04 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |  2 +-
 .../flink/client/program/ClusterClient.java     | 12 +++++-----
 .../common/accumulators/AccumulatorHelper.java  | 24 +++++++++++---------
 3 files changed, 20 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/17589d45/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index a4691c9..a888841 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -884,7 +884,7 @@ public class CliFrontend {
                ClusterClient client;
                try {
                        client = 
activeCommandLine.retrieveCluster(options.getCommandLine(), config);
-                       logAndSysout("Cluster retrieved: " + 
client.getClusterIdentifier());
+                       logAndSysout("Cluster configuration: " + 
client.getClusterIdentifier());
                } catch (UnsupportedOperationException e) {
                        try {
                                String applicationName = "Flink Application: " 
+ programName;

http://git-wip-us.apache.org/repos/asf/flink/blob/17589d45/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 6cb5abb..2e6a9cc 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -97,11 +97,11 @@ public abstract class ClusterClient {
        private boolean printStatusDuringExecution = true;
 
        /**
-        * For interactive invocations, the Job ID is only available after the 
ContextEnvironment has
+        * For interactive invocations, the job results are only available 
after the ContextEnvironment has
         * been run inside the user JAR. We pass the Client to every instance 
of the ContextEnvironment
-        * which lets us access the last JobID here.
+        * which lets us access the execution result here.
         */
-       private JobID lastJobID;
+       private JobExecutionResult lastJobExecutionResult;
 
        /** Switch for blocking/detached job submission of the client */
        private boolean detachedJobSubmission = false;
@@ -335,7 +335,7 @@ public abstract class ClusterClient {
                                }
                                else {
                                        // in blocking mode, we execute all 
Flink jobs contained in the user code and then return here
-                                       return new 
JobSubmissionResult(lastJobID);
+                                       return this.lastJobExecutionResult;
                                }
                        }
                        finally {
@@ -406,9 +406,9 @@ public abstract class ClusterClient {
 
                try {
                        logAndSysout("Submitting job with JobID: " + 
jobGraph.getJobID() + ". Waiting for job completion.");
-                       this.lastJobID = jobGraph.getJobID();
-                       return 
JobClient.submitJobAndWait(actorSystemLoader.get(),
+                       this.lastJobExecutionResult = 
JobClient.submitJobAndWait(actorSystemLoader.get(),
                                leaderRetrievalService, jobGraph, timeout, 
printStatusDuringExecution, classLoader);
+                       return this.lastJobExecutionResult;
                } catch (JobExecutionException e) {
                        throw new ProgramInvocationException("The program 
execution failed: " + e.getMessage(), e);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/17589d45/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
index bcae504..1a87235 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
@@ -23,6 +23,7 @@ import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -114,19 +115,20 @@ public class AccumulatorHelper {
        public static String getResultsFormated(Map<String, Object> map) {
                StringBuilder builder = new StringBuilder();
                for (Map.Entry<String, Object> entry : map.entrySet()) {
-                       builder.append("- ").append(entry.getKey()).append(" 
(").append(entry.getValue().getClass().getName());
-                       builder.append(")").append(": 
").append(entry.getValue().toString()).append("\n");
-               }
-               return builder.toString();
-       }
-
-       public static void resetAndClearAccumulators(Map<String, Accumulator<?, 
?>> accumulators) {
-               if (accumulators != null) {
-                       for (Map.Entry<String, Accumulator<?, ?>> entry : 
accumulators.entrySet()) {
-                               entry.getValue().resetLocal();
+                       builder
+                               .append("- ")
+                               .append(entry.getKey())
+                               .append(" (")
+                               .append(entry.getValue().getClass().getName())
+                               .append(")");
+                       if (entry.getValue() instanceof Collection) {
+                               builder.append(" [").append(((Collection) 
entry.getValue()).size()).append(" elements]");
+                       } else {
+                               builder.append(": 
").append(entry.getValue().toString());
                        }
-                       accumulators.clear();
+                       builder.append(System.lineSeparator());
                }
+               return builder.toString();
        }
 
        public static Map<String, Accumulator<?, ?>> copy(Map<String, 
Accumulator<?, ?>> accumulators) {

Reply via email to