Repository: flink Updated Branches: refs/heads/master e85f787b2 -> 082d87e51
[FLINK-4199] fix misleading CLI messages during job submission - change CLI message upon cluster retrieval - save JobExecutionResult for interactive executions - only print Collection size in accumulator results - remove unused helper method This closes #2264 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17589d45 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17589d45 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17589d45 Branch: refs/heads/master Commit: 17589d454d00efa43cdf6116ea29ff4f513b6f20 Parents: e85f787 Author: Maximilian Michels <[email protected]> Authored: Tue Jul 19 09:51:23 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Tue Jul 19 18:00:04 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/client/CliFrontend.java | 2 +- .../flink/client/program/ClusterClient.java | 12 +++++----- .../common/accumulators/AccumulatorHelper.java | 24 +++++++++++--------- 3 files changed, 20 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/17589d45/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index a4691c9..a888841 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -884,7 +884,7 @@ public class CliFrontend { ClusterClient client; try { client = activeCommandLine.retrieveCluster(options.getCommandLine(), config); - logAndSysout("Cluster retrieved: " + client.getClusterIdentifier()); + logAndSysout("Cluster configuration: " + client.getClusterIdentifier()); } catch (UnsupportedOperationException e) { try { String applicationName = "Flink Application: " + programName; http://git-wip-us.apache.org/repos/asf/flink/blob/17589d45/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index 6cb5abb..2e6a9cc 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -97,11 +97,11 @@ public abstract class ClusterClient { private boolean printStatusDuringExecution = true; /** - * For interactive invocations, the Job ID is only available after the ContextEnvironment has + * For interactive invocations, the job results are only available after the ContextEnvironment has * been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment - * which lets us access the last JobID here. + * which lets us access the execution result here. */ - private JobID lastJobID; + private JobExecutionResult lastJobExecutionResult; /** Switch for blocking/detached job submission of the client */ private boolean detachedJobSubmission = false; @@ -335,7 +335,7 @@ public abstract class ClusterClient { } else { // in blocking mode, we execute all Flink jobs contained in the user code and then return here - return new JobSubmissionResult(lastJobID); + return this.lastJobExecutionResult; } } finally { @@ -406,9 +406,9 @@ public abstract class ClusterClient { try { logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion."); - this.lastJobID = jobGraph.getJobID(); - return JobClient.submitJobAndWait(actorSystemLoader.get(), + this.lastJobExecutionResult = JobClient.submitJobAndWait(actorSystemLoader.get(), leaderRetrievalService, jobGraph, timeout, printStatusDuringExecution, classLoader); + return this.lastJobExecutionResult; } catch (JobExecutionException e) { throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e); } http://git-wip-us.apache.org/repos/asf/flink/blob/17589d45/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java index bcae504..1a87235 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java @@ -23,6 +23,7 @@ import org.apache.flink.util.SerializedValue; import java.io.IOException; import java.io.Serializable; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -114,19 +115,20 @@ public class AccumulatorHelper { public static String getResultsFormated(Map<String, Object> map) { StringBuilder builder = new StringBuilder(); for (Map.Entry<String, Object> entry : map.entrySet()) { - builder.append("- ").append(entry.getKey()).append(" (").append(entry.getValue().getClass().getName()); - builder.append(")").append(": ").append(entry.getValue().toString()).append("\n"); - } - return builder.toString(); - } - - public static void resetAndClearAccumulators(Map<String, Accumulator<?, ?>> accumulators) { - if (accumulators != null) { - for (Map.Entry<String, Accumulator<?, ?>> entry : accumulators.entrySet()) { - entry.getValue().resetLocal(); + builder + .append("- ") + .append(entry.getKey()) + .append(" (") + .append(entry.getValue().getClass().getName()) + .append(")"); + if (entry.getValue() instanceof Collection) { + builder.append(" [").append(((Collection) entry.getValue()).size()).append(" elements]"); + } else { + builder.append(": ").append(entry.getValue().toString()); } - accumulators.clear(); + builder.append(System.lineSeparator()); } + return builder.toString(); } public static Map<String, Accumulator<?, ?>> copy(Map<String, Accumulator<?, ?>> accumulators) {
