Repository: flink
Updated Branches:
  refs/heads/master 00d22c396 -> e83d1ec10


[FLINK-1767] [streaming] Make StreamExecutionEnvironment return 
JobExecutionResult instead of void.

Conflicts:
        
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
        
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java

This closes #516


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e83d1ec1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e83d1ec1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e83d1ec1

Branch: refs/heads/master
Commit: e83d1ec102808cb9b3ef4208335f74e6f17bbe7a
Parents: 00d22c3
Author: Gabor Gevay <[email protected]>
Authored: Sat Mar 21 10:28:23 2015 +0100
Committer: mbalassi <[email protected]>
Committed: Mon Mar 23 12:45:03 2015 +0100

----------------------------------------------------------------------
 .../api/environment/LocalStreamEnvironment.java       | 13 ++++++++-----
 .../api/environment/RemoteStreamEnvironment.java      | 14 ++++++++------
 .../api/environment/StreamContextEnvironment.java     |  9 +++++----
 .../api/environment/StreamExecutionEnvironment.java   | 10 ++++++----
 .../api/environment/StreamPlanEnvironment.java        |  7 ++++---
 .../org/apache/flink/streaming/util/ClusterUtil.java  | 10 ++++++----
 .../flink/streaming/util/TestStreamEnvironment.java   |  7 ++++---
 7 files changed, 41 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index e1b1453..07a552b 100755
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.environment;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.streaming.util.ClusterUtil;
 
 public class LocalStreamEnvironment extends StreamExecutionEnvironment {
@@ -26,10 +27,12 @@ public class LocalStreamEnvironment extends 
StreamExecutionEnvironment {
        /**
         * Executes the JobGraph of the on a mini cluster of CLusterUtil with a
         * default name.
+        *
+        * @return The result of the job execution, containing elapsed time and 
accumulators.
         */
        @Override
-       public void execute() throws Exception {
-               ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), 
getParallelism());
+       public JobExecutionResult execute() throws Exception {
+               return 
ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism());
        }
 
        /**
@@ -38,10 +41,10 @@ public class LocalStreamEnvironment extends 
StreamExecutionEnvironment {
         * 
         * @param jobName
         *            name of the job
+        * @return The result of the job execution, containing elapsed time and 
accumulators.
         */
        @Override
-       public void execute(String jobName) throws Exception {
-               
ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName),
-                               getParallelism());
+       public JobExecutionResult execute(String jobName) throws Exception {
+               return 
ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName), 
getParallelism());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 3142bdd..4faa329 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -79,17 +80,17 @@ public class RemoteStreamEnvironment extends 
StreamExecutionEnvironment {
        }
 
        @Override
-       public void execute() {
+       public JobExecutionResult execute() {
 
                JobGraph jobGraph = streamGraph.getJobGraph();
-               executeRemotely(jobGraph);
+               return executeRemotely(jobGraph);
        }
 
        @Override
-       public void execute(String jobName) {
+       public JobExecutionResult execute(String jobName) {
 
                JobGraph jobGraph = streamGraph.getJobGraph(jobName);
-               executeRemotely(jobGraph);
+               return executeRemotely(jobGraph);
        }
 
        /**
@@ -97,8 +98,9 @@ public class RemoteStreamEnvironment extends 
StreamExecutionEnvironment {
         * 
         * @param jobGraph
         *            jobGraph to execute
+        * @return The result of the job execution, containing elapsed time and 
accumulators.
         */
-       private void executeRemotely(JobGraph jobGraph) {
+       private JobExecutionResult executeRemotely(JobGraph jobGraph) {
                if (LOG.isInfoEnabled()) {
                        LOG.info("Running remotely at {}:{}", host, port);
                }
@@ -112,7 +114,7 @@ public class RemoteStreamEnvironment extends 
StreamExecutionEnvironment {
                                JobWithJars.buildUserCodeClassLoader(jarFiles, 
JobWithJars.class.getClassLoader()));
 
                try {
-                       client.run(jobGraph, true);
+                       return client.run(jobGraph, true);
                } catch (ProgramInvocationException e) {
                        throw new RuntimeException("Cannot execute job due to 
ProgramInvocationException", e);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index f7dd0bf..b03ab0e 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.environment;
 import java.io.File;
 import java.util.List;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -50,12 +51,12 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
        }
 
        @Override
-       public void execute() throws Exception {
-               execute(null);
+       public JobExecutionResult execute() throws Exception {
+               return execute(null);
        }
 
        @Override
-       public void execute(String jobName) throws Exception {
+       public JobExecutionResult execute(String jobName) throws Exception {
                currentEnvironment = null;
 
                JobGraph jobGraph;
@@ -70,7 +71,7 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
                }
 
                try {
-                       client.run(jobGraph, true);
+                       return client.run(jobGraph, true);
 
                } catch (Exception e) {
                        throw e;

http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 9f2ccff..6dd7947 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -26,6 +26,7 @@ import com.esotericsoftware.kryo.Serializer;
 
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -759,10 +760,11 @@ public abstract class StreamExecutionEnvironment {
         * <p>
         * The program execution will be logged and displayed with a generated
         * default name.
-        * 
+        *
+        * @return The result of the job execution, containing elapsed time and 
accumulators.
         * @throws Exception
         **/
-       public abstract void execute() throws Exception;
+       public abstract JobExecutionResult execute() throws Exception;
 
        /**
         * Triggers the program execution. The environment will execute all 
parts of
@@ -773,10 +775,10 @@ public abstract class StreamExecutionEnvironment {
         * 
         * @param jobName
         *            Desired name of the job
-        * 
+        * @return The result of the job execution, containing elapsed time and 
accumulators.
         * @throws Exception
         **/
-       public abstract void execute(String jobName) throws Exception;
+       public abstract JobExecutionResult execute(String jobName) throws 
Exception;
 
        /**
         * Getter of the {@link StreamGraph} of the streaming job.

http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index 592fa1a..02fccd0 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.environment;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
@@ -48,12 +49,12 @@ public class StreamPlanEnvironment extends 
StreamExecutionEnvironment {
        }
 
        @Override
-       public void execute() throws Exception {
-               execute("");
+       public JobExecutionResult execute() throws Exception {
+               return execute("");
        }
 
        @Override
-       public void execute(String jobName) throws Exception {
+       public JobExecutionResult execute(String jobName) throws Exception {
                currentEnvironment = null;
 
                streamGraph.setJobName(jobName);

http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index d04e7e6..77ac0c5 100755
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.util;
 
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobClient;
@@ -41,8 +42,9 @@ public class ClusterUtil {
         *            numberOfTaskTrackers
         * @param memorySize
         *            memorySize
+        * @return The result of the job execution, containing elapsed time and 
accumulators.
         */
-       public static void runOnMiniCluster(JobGraph jobGraph, int parallelism, 
long memorySize)
+       public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, 
int parallelism, long memorySize)
                        throws Exception {
 
                Configuration configuration = jobGraph.getJobConfiguration();
@@ -59,7 +61,7 @@ public class ClusterUtil {
                        exec = new LocalFlinkMiniCluster(configuration, true);
                        ActorRef jobClient = exec.getJobClient();
 
-                       JobClient.submitJobAndWait(jobGraph, true, jobClient, 
exec.timeout());
+                       return JobClient.submitJobAndWait(jobGraph, true, 
jobClient, exec.timeout());
 
                } catch (Exception e) {
                        throw e;
@@ -70,7 +72,7 @@ public class ClusterUtil {
                }
        }
 
-       public static void runOnMiniCluster(JobGraph jobGraph, int numOfSlots) 
throws Exception {
-               runOnMiniCluster(jobGraph, numOfSlots, -1);
+       public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph, 
int numOfSlots) throws Exception {
+               return runOnMiniCluster(jobGraph, numOfSlots, -1);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 5e785f9..a99e652 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -50,12 +50,12 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
        }
 
        @Override
-       public void execute() throws Exception {
-               execute(DEFAULT_JOBNAME);
+       public JobExecutionResult execute() throws Exception {
+               return execute(DEFAULT_JOBNAME);
        }
 
        @Override
-       public void execute(String jobName) throws Exception {
+       public JobExecutionResult execute(String jobName) throws Exception {
                JobGraph jobGraph = streamGraph.getJobGraph(jobName);
 
                if (internalExecutor) {
@@ -70,6 +70,7 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
                try {
                        ActorRef client = executor.getJobClient();
                        latestResult = JobClient.submitJobAndWait(jobGraph, 
false, client, executor.timeout());
+                       return latestResult;
                } catch(JobExecutionException e) {
                        if 
(e.getMessage().contains("GraphConversionException")) {
                                throw new Exception(CANNOT_EXECUTE_EMPTY_JOB, 
e);

Reply via email to