Repository: flink
Updated Branches:
refs/heads/master 00d22c396 -> e83d1ec10
[FLINK-1767] [streaming] Make StreamExecutionEnvironment return
JobExecutionResult instead of void.
Conflicts:
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
This closes #516
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e83d1ec1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e83d1ec1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e83d1ec1
Branch: refs/heads/master
Commit: e83d1ec102808cb9b3ef4208335f74e6f17bbe7a
Parents: 00d22c3
Author: Gabor Gevay <[email protected]>
Authored: Sat Mar 21 10:28:23 2015 +0100
Committer: mbalassi <[email protected]>
Committed: Mon Mar 23 12:45:03 2015 +0100
----------------------------------------------------------------------
.../api/environment/LocalStreamEnvironment.java | 13 ++++++++-----
.../api/environment/RemoteStreamEnvironment.java | 14 ++++++++------
.../api/environment/StreamContextEnvironment.java | 9 +++++----
.../api/environment/StreamExecutionEnvironment.java | 10 ++++++----
.../api/environment/StreamPlanEnvironment.java | 7 ++++---
.../org/apache/flink/streaming/util/ClusterUtil.java | 10 ++++++----
.../flink/streaming/util/TestStreamEnvironment.java | 7 ++++---
7 files changed, 41 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index e1b1453..07a552b 100755
---
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.api.environment;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.streaming.util.ClusterUtil;
public class LocalStreamEnvironment extends StreamExecutionEnvironment {
@@ -26,10 +27,12 @@ public class LocalStreamEnvironment extends
StreamExecutionEnvironment {
/**
* Executes the JobGraph of the on a mini cluster of CLusterUtil with a
* default name.
+ *
+ * @return The result of the job execution, containing elapsed time and
accumulators.
*/
@Override
- public void execute() throws Exception {
- ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(),
getParallelism());
+ public JobExecutionResult execute() throws Exception {
+ return
ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getParallelism());
}
/**
@@ -38,10 +41,10 @@ public class LocalStreamEnvironment extends
StreamExecutionEnvironment {
*
* @param jobName
* name of the job
+ * @return The result of the job execution, containing elapsed time and
accumulators.
*/
@Override
- public void execute(String jobName) throws Exception {
-
ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName),
- getParallelism());
+ public JobExecutionResult execute(String jobName) throws Exception {
+ return
ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName),
getParallelism());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 3142bdd..4faa329 100644
---
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
@@ -79,17 +80,17 @@ public class RemoteStreamEnvironment extends
StreamExecutionEnvironment {
}
@Override
- public void execute() {
+ public JobExecutionResult execute() {
JobGraph jobGraph = streamGraph.getJobGraph();
- executeRemotely(jobGraph);
+ return executeRemotely(jobGraph);
}
@Override
- public void execute(String jobName) {
+ public JobExecutionResult execute(String jobName) {
JobGraph jobGraph = streamGraph.getJobGraph(jobName);
- executeRemotely(jobGraph);
+ return executeRemotely(jobGraph);
}
/**
@@ -97,8 +98,9 @@ public class RemoteStreamEnvironment extends
StreamExecutionEnvironment {
*
* @param jobGraph
* jobGraph to execute
+ * @return The result of the job execution, containing elapsed time and
accumulators.
*/
- private void executeRemotely(JobGraph jobGraph) {
+ private JobExecutionResult executeRemotely(JobGraph jobGraph) {
if (LOG.isInfoEnabled()) {
LOG.info("Running remotely at {}:{}", host, port);
}
@@ -112,7 +114,7 @@ public class RemoteStreamEnvironment extends
StreamExecutionEnvironment {
JobWithJars.buildUserCodeClassLoader(jarFiles,
JobWithJars.class.getClassLoader()));
try {
- client.run(jobGraph, true);
+ return client.run(jobGraph, true);
} catch (ProgramInvocationException e) {
throw new RuntimeException("Cannot execute job due to
ProgramInvocationException", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index f7dd0bf..b03ab0e 100644
---
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.environment;
import java.io.File;
import java.util.List;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.client.program.Client;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -50,12 +51,12 @@ public class StreamContextEnvironment extends
StreamExecutionEnvironment {
}
@Override
- public void execute() throws Exception {
- execute(null);
+ public JobExecutionResult execute() throws Exception {
+ return execute(null);
}
@Override
- public void execute(String jobName) throws Exception {
+ public JobExecutionResult execute(String jobName) throws Exception {
currentEnvironment = null;
JobGraph jobGraph;
@@ -70,7 +71,7 @@ public class StreamContextEnvironment extends
StreamExecutionEnvironment {
}
try {
- client.run(jobGraph, true);
+ return client.run(jobGraph, true);
} catch (Exception e) {
throw e;
http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 9f2ccff..6dd7947 100644
---
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -26,6 +26,7 @@ import com.esotericsoftware.kryo.Serializer;
import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -759,10 +760,11 @@ public abstract class StreamExecutionEnvironment {
* <p>
* The program execution will be logged and displayed with a generated
* default name.
- *
+ *
+ * @return The result of the job execution, containing elapsed time and
accumulators.
* @throws Exception
**/
- public abstract void execute() throws Exception;
+ public abstract JobExecutionResult execute() throws Exception;
/**
* Triggers the program execution. The environment will execute all
parts of
@@ -773,10 +775,10 @@ public abstract class StreamExecutionEnvironment {
*
* @param jobName
* Desired name of the job
- *
+ * @return The result of the job execution, containing elapsed time and
accumulators.
* @throws Exception
**/
- public abstract void execute(String jobName) throws Exception;
+ public abstract JobExecutionResult execute(String jobName) throws
Exception;
/**
* Getter of the {@link StreamGraph} of the streaming job.
http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index 592fa1a..02fccd0 100644
---
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.api.environment;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
@@ -48,12 +49,12 @@ public class StreamPlanEnvironment extends
StreamExecutionEnvironment {
}
@Override
- public void execute() throws Exception {
- execute("");
+ public JobExecutionResult execute() throws Exception {
+ return execute("");
}
@Override
- public void execute(String jobName) throws Exception {
+ public JobExecutionResult execute(String jobName) throws Exception {
currentEnvironment = null;
streamGraph.setJobName(jobName);
http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index d04e7e6..77ac0c5 100755
---
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -17,6 +17,7 @@
package org.apache.flink.streaming.util;
+import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobClient;
@@ -41,8 +42,9 @@ public class ClusterUtil {
* numberOfTaskTrackers
* @param memorySize
* memorySize
+ * @return The result of the job execution, containing elapsed time and
accumulators.
*/
- public static void runOnMiniCluster(JobGraph jobGraph, int parallelism,
long memorySize)
+ public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph,
int parallelism, long memorySize)
throws Exception {
Configuration configuration = jobGraph.getJobConfiguration();
@@ -59,7 +61,7 @@ public class ClusterUtil {
exec = new LocalFlinkMiniCluster(configuration, true);
ActorRef jobClient = exec.getJobClient();
- JobClient.submitJobAndWait(jobGraph, true, jobClient,
exec.timeout());
+ return JobClient.submitJobAndWait(jobGraph, true,
jobClient, exec.timeout());
} catch (Exception e) {
throw e;
@@ -70,7 +72,7 @@ public class ClusterUtil {
}
}
- public static void runOnMiniCluster(JobGraph jobGraph, int numOfSlots)
throws Exception {
- runOnMiniCluster(jobGraph, numOfSlots, -1);
+ public static JobExecutionResult runOnMiniCluster(JobGraph jobGraph,
int numOfSlots) throws Exception {
+ return runOnMiniCluster(jobGraph, numOfSlots, -1);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e83d1ec1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 5e785f9..a99e652 100644
---
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -50,12 +50,12 @@ public class TestStreamEnvironment extends
StreamExecutionEnvironment {
}
@Override
- public void execute() throws Exception {
- execute(DEFAULT_JOBNAME);
+ public JobExecutionResult execute() throws Exception {
+ return execute(DEFAULT_JOBNAME);
}
@Override
- public void execute(String jobName) throws Exception {
+ public JobExecutionResult execute(String jobName) throws Exception {
JobGraph jobGraph = streamGraph.getJobGraph(jobName);
if (internalExecutor) {
@@ -70,6 +70,7 @@ public class TestStreamEnvironment extends
StreamExecutionEnvironment {
try {
ActorRef client = executor.getJobClient();
latestResult = JobClient.submitJobAndWait(jobGraph,
false, client, executor.timeout());
+ return latestResult;
} catch(JobExecutionException e) {
if
(e.getMessage().contains("GraphConversionException")) {
throw new Exception(CANNOT_EXECUTE_EMPTY_JOB,
e);