Repository: flink Updated Branches: refs/heads/master b263932e2 -> 82c420022
[FLINK-1434] [FLINK-1401] Streaming support added for webclient Closes #334 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/82c42002 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/82c42002 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/82c42002 Branch: refs/heads/master Commit: 82c420022ef3ffe3d7ad2172b4338ed12baf9e0e Parents: b263932 Author: Gyula Fora <[email protected]> Authored: Sun Jan 25 12:28:18 2015 +0100 Committer: Gyula Fora <[email protected]> Committed: Tue Jan 27 23:31:25 2015 +0100 ---------------------------------------------------------------------- .../flink-streaming-core/pom.xml | 6 ++ .../apache/flink/streaming/api/StreamGraph.java | 96 +++++++++++++++++++- .../api/StreamingJobGraphGenerator.java | 9 +- .../api/datastream/ConnectedDataStream.java | 10 +- .../streaming/api/datastream/DataStream.java | 23 +++-- .../api/datastream/GroupedDataStream.java | 4 +- .../api/datastream/StreamProjection.java | 50 +++++----- .../api/datastream/WindowedDataStream.java | 6 +- .../environment/StreamExecutionEnvironment.java | 28 ++++-- .../api/environment/StreamPlanEnvironment.java | 62 +++++++++++++ .../api/invokable/StreamInvokable.java | 4 + .../partitioner/FieldsPartitioner.java | 2 +- .../partitioner/StreamPartitioner.java | 2 +- .../api/scala/StreamExecutionEnvironment.scala | 9 ++ .../org/apache/flink/client/RemoteExecutor.java | 2 +- .../org/apache/flink/client/program/Client.java | 60 +++++++----- .../client/program/ContextEnvironment.java | 2 +- .../flink/client/program/PackagedProgram.java | 18 +++- .../flink/client/web/JobSubmissionServlet.java | 18 ++-- .../program/ExecutionPlanCreationTest.java | 2 +- .../apache/flink/compiler/plan/FlinkPlan.java | 28 ++++++ .../flink/compiler/plan/OptimizedPlan.java | 2 +- .../flink/compiler/plan/StreamingPlan.java | 38 ++++++++ 23 files changed, 385 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/pom.xml ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/pom.xml b/flink-addons/flink-streaming/flink-streaming-core/pom.xml index 9231cbf..a60bd01 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/pom.xml +++ b/flink-addons/flink-streaming/flink-streaming-core/pom.xml @@ -48,6 +48,12 @@ under the License. <version>${project.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.apache.sling</groupId> + <artifactId>org.apache.sling.commons.json</artifactId> + <version>2.0.6</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java index c515e63..b5e43af 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java @@ -17,6 +17,10 @@ package org.apache.flink.streaming.api; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintWriter; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -27,6 +31,7 @@ import java.util.Set; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.compiler.plan.StreamingPlan; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.streaming.api.collector.OutputSelector; @@ -39,18 +44,22 @@ import org.apache.flink.streaming.api.streamvertex.StreamIterationTail; import org.apache.flink.streaming.api.streamvertex.StreamVertex; import org.apache.flink.streaming.partitioner.StreamPartitioner; import org.apache.flink.streaming.state.OperatorState; +import org.apache.sling.commons.json.JSONArray; +import org.apache.sling.commons.json.JSONException; +import org.apache.sling.commons.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Object for building Apache Flink stream processing graphs */ -public class StreamGraph { +public class StreamGraph extends StreamingPlan { private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class); private final static String DEAFULT_JOB_NAME = "Flink Streaming Job"; protected boolean chaining = true; + private String jobName = DEAFULT_JOB_NAME; // Graph attributes private Map<String, Integer> operatorParallelisms; @@ -440,7 +449,7 @@ public class StreamGraph { * Gets the assembled {@link JobGraph} and adds a default name for it. */ public JobGraph getJobGraph() { - return getJobGraph(DEAFULT_JOB_NAME); + return getJobGraph(jobName); } /** @@ -452,11 +461,16 @@ public class StreamGraph { */ public JobGraph getJobGraph(String jobGraphName) { + this.jobName = jobGraphName; StreamingJobGraphGenerator optimizer = new StreamingJobGraphGenerator(this); return optimizer.createJobGraph(jobGraphName); } + public void setJobName(String jobName) { + this.jobName = jobName; + } + public void setChaining(boolean chaining) { this.chaining = chaining; } @@ -525,4 +539,82 @@ public class StreamGraph { return iterationTimeouts.get(vertexName); } + public String getOperatorName(String vertexName) { + return operatorNames.get(vertexName); + } + + @Override + public String getStreamingPlanAsJSON() { + + try { + JSONObject json = new JSONObject(); + JSONArray nodes = new JSONArray(); + + json.put("nodes", nodes); + + for (String id : operatorNames.keySet()) { + JSONObject node = new JSONObject(); + nodes.put(node); + + node.put("id", Integer.valueOf(id)); + node.put("type", getOperatorName(id)); + + if (sources.contains(id)) { + node.put("pact", "Data Source"); + } else { + node.put("pact", "Data Stream"); + } + + node.put("contents", getOperatorName(id) + " at " + + getInvokable(id).getUserFunction().getClass().getSimpleName()); + node.put("parallelism", getParallelism(id)); + + int numIn = getInEdges(id).size(); + if (numIn > 0) { + + JSONArray inputs = new JSONArray(); + node.put("predecessors", inputs); + + for (int i = 0; i < numIn; i++) { + + String inID = getInEdges(id).get(i); + + JSONObject input = new JSONObject(); + inputs.put(input); + + input.put("id", Integer.valueOf(inID)); + input.put("ship_strategy", getOutPartitioner(inID, id).getStrategy()); + if (i == 0) { + input.put("side", "first"); + } else if (i == 1) { + input.put("side", "second"); + } + } + } + + } + return json.toString(); + } catch (JSONException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("JSON plan creation failed: {}", e); + } + return ""; + } + + } + + @Override + public void dumpStreamingPlanAsJSON(File file) throws IOException { + PrintWriter pw = null; + try { + pw = new PrintWriter(new FileOutputStream(file), false); + pw.write(getStreamingPlanAsJSON()); + pw.flush(); + + } finally { + if (pw != null) { + pw.close(); + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java index fccb1e1..d9a1b66 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java @@ -141,13 +141,13 @@ public class StreamingJobGraphGenerator { } } - private String createChainedName(String vertexName, List<String> chainedOutputs) { + private String createChainedName(String vertexID, List<String> chainedOutputs) { + String vertexName = streamGraph.getOperatorName(vertexID); if (chainedOutputs.size() > 1) { List<String> outputChainedNames = new ArrayList<String>(); for (String chainable : chainedOutputs) { outputChainedNames.add(chainedNames.get(chainable)); } - return vertexName + " -> (" + StringUtils.join(outputChainedNames, ", ") + ")"; } else if (chainedOutputs.size() == 1) { return vertexName + " -> " + chainedNames.get(chainedOutputs.get(0)); @@ -162,7 +162,10 @@ public class StreamingJobGraphGenerator { AbstractJobVertex vertex = new AbstractJobVertex(chainedNames.get(vertexName)); vertex.setInvokableClass(streamGraph.getJobVertexClass(vertexName)); - vertex.setParallelism(streamGraph.getParallelism(vertexName)); + if (streamGraph.getParallelism(vertexName) > 0) { + vertex.setParallelism(streamGraph.getParallelism(vertexName)); + } + if (LOG.isDebugEnabled()) { LOG.debug("Parallelism set: {} for {}", streamGraph.getParallelism(vertexName), vertexName); http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java index db8649b..a6eade8 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java @@ -244,7 +244,7 @@ public class ConnectedDataStream<IN1, IN2> { TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoMapFunction.class, coMapper.getClass(), 2, null, null); - return addCoFunction("coMap", outTypeInfo, new CoMapInvokable<IN1, IN2, OUT>( + return addCoFunction("Co-Map", outTypeInfo, new CoMapInvokable<IN1, IN2, OUT>( clean(coMapper))); } @@ -269,7 +269,7 @@ public class ConnectedDataStream<IN1, IN2> { TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoFlatMapFunction.class, coFlatMapper.getClass(), 2, null, null); - return addCoFunction("coFlatMap", outTypeInfo, new CoFlatMapInvokable<IN1, IN2, OUT>( + return addCoFunction("Co-Flat Map", outTypeInfo, new CoFlatMapInvokable<IN1, IN2, OUT>( clean(coFlatMapper))); } @@ -294,7 +294,7 @@ public class ConnectedDataStream<IN1, IN2> { TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoReduceFunction.class, coReducer.getClass(), 2, null, null); - return addCoFunction("coReduce", outTypeInfo, getReduceInvokable(clean(coReducer))); + return addCoFunction("Co-Reduce", outTypeInfo, getReduceInvokable(clean(coReducer))); } @@ -361,7 +361,7 @@ public class ConnectedDataStream<IN1, IN2> { TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoWindowFunction.class, coWindowFunction.getClass(), 2, null, null); - return addCoFunction("coWindowReduce", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>( + return addCoFunction("Co-Window", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>( clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2)); } @@ -390,7 +390,7 @@ public class ConnectedDataStream<IN1, IN2> { throw new IllegalArgumentException("Slide interval must be positive"); } - return addCoFunction("coWindowReduce", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>( + return addCoFunction("Co-Window", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>( clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2)); } http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index c2f5d44..8e87b27 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -95,6 +95,7 @@ public class DataStream<OUT> { protected static Integer counter = 0; protected final StreamExecutionEnvironment environment; protected final String id; + protected final String type; protected int degreeOfParallelism; protected List<String> userDefinedNames; protected StreamPartitioner<OUT> partitioner; @@ -122,7 +123,8 @@ public class DataStream<OUT> { } counter++; - this.id = operatorType + "-" + counter.toString(); + this.id = counter.toString(); + this.type = operatorType; this.environment = environment; this.degreeOfParallelism = environment.getDegreeOfParallelism(); this.streamGraph = environment.getStreamGraph(); @@ -142,6 +144,7 @@ public class DataStream<OUT> { public DataStream(DataStream<OUT> dataStream) { this.environment = dataStream.environment; this.id = dataStream.id; + this.type = dataStream.type; this.degreeOfParallelism = dataStream.degreeOfParallelism; this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames); this.partitioner = dataStream.partitioner; @@ -465,7 +468,7 @@ public class DataStream<OUT> { TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType()); - return transform("map", outType, new MapInvokable<OUT, R>(clean(mapper))); + return transform("Map", outType, new MapInvokable<OUT, R>(clean(mapper))); } /** @@ -489,7 +492,7 @@ public class DataStream<OUT> { TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper), getType()); - return transform("flatMap", outType, new FlatMapInvokable<OUT, R>(clean(flatMapper))); + return transform("Flat Map", outType, new FlatMapInvokable<OUT, R>(clean(flatMapper))); } @@ -506,7 +509,7 @@ public class DataStream<OUT> { */ public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) { - return transform("reduce", getType(), new StreamReduceInvokable<OUT>(clean(reducer))); + return transform("Reduce", getType(), new StreamReduceInvokable<OUT>(clean(reducer))); } @@ -525,7 +528,7 @@ public class DataStream<OUT> { * @return The filtered DataStream. */ public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT> filter) { - return transform("filter", getType(), new FilterInvokable<OUT>(clean(filter))); + return transform("Filter", getType(), new FilterInvokable<OUT>(clean(filter))); } @@ -834,7 +837,7 @@ public class DataStream<OUT> { public SingleOutputStreamOperator<Long, ?> count() { TypeInformation<Long> outTypeInfo = TypeExtractor.getForObject(Long.valueOf(0)); - return transform("counter", outTypeInfo, new CounterInvokable<OUT>()); + return transform("Count", outTypeInfo, new CounterInvokable<OUT>()); } /** @@ -1090,12 +1093,12 @@ public class DataStream<OUT> { StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate); - SingleOutputStreamOperator<OUT, ?> returnStream = transform("reduce", getType(), invokable); + SingleOutputStreamOperator<OUT, ?> returnStream = transform("Aggregation", getType(), + invokable); return returnStream; } - /** * Method for passing user defined invokables along with the type * informations that will transform the DataStream. @@ -1179,8 +1182,8 @@ public class DataStream<OUT> { DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", getType(), sinkInvokable); - streamGraph.addStreamVertex(returnStream.getId(), sinkInvokable, getType(), null, "sink", - returnStream.getParallelism()); + streamGraph.addStreamVertex(returnStream.getId(), sinkInvokable, getType(), null, + "Stream Sink", returnStream.getParallelism()); this.connectGraph(this.copy(), returnStream.getId(), 0); http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java index 4a5c0c2..c871c20 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java @@ -66,7 +66,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> { */ @Override public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) { - return transform("groupReduce", getType(), new GroupedReduceInvokable<OUT>(clean(reducer), + return transform("Grouped Reduce", getType(), new GroupedReduceInvokable<OUT>(clean(reducer), keySelector)); } @@ -186,7 +186,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> { GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(clean(aggregate), keySelector); - SingleOutputStreamOperator<OUT, ?> returnStream = transform("groupReduce", getType(), + SingleOutputStreamOperator<OUT, ?> returnStream = transform("Grouped Aggregation", getType(), invokable); return returnStream; http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java index c8ad533..915ca30 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java @@ -83,7 +83,7 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple1<T0>> outType = (TypeInformation<Tuple1<T0>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.transform("projection", outType, new ProjectInvokable<IN, Tuple1<T0>>( + return dataStream.transform("Projection", outType, new ProjectInvokable<IN, Tuple1<T0>>( fieldIndexes, outType)); } @@ -111,7 +111,7 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple2<T0, T1>> outType = (TypeInformation<Tuple2<T0, T1>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.transform("projection", outType, + return dataStream.transform("Projection", outType, new ProjectInvokable<IN, Tuple2<T0, T1>>(fieldIndexes, outType)); } @@ -141,7 +141,7 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple3<T0, T1, T2>> outType = (TypeInformation<Tuple3<T0, T1, T2>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.transform("projection", outType, + return dataStream.transform("Projection", outType, new ProjectInvokable<IN, Tuple3<T0, T1, T2>>(fieldIndexes, outType)); } @@ -173,7 +173,7 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple4<T0, T1, T2, T3>> outType = (TypeInformation<Tuple4<T0, T1, T2, T3>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.transform("projection", outType, + return dataStream.transform("Projection", outType, new ProjectInvokable<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, outType)); } @@ -206,7 +206,7 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple5<T0, T1, T2, T3, T4>> outType = (TypeInformation<Tuple5<T0, T1, T2, T3, T4>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.transform("projection", outType, + return dataStream.transform("Projection", outType, new ProjectInvokable<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, outType)); } @@ -243,7 +243,7 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>> outType = (TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.transform("projection", outType, + return dataStream.transform("Projection", outType, new ProjectInvokable<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes, outType)); } @@ -283,7 +283,7 @@ public class StreamProjection<IN> { TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>> outType = (TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); return dataStream - .transform("projection", outType, + .transform("Projection", outType, new ProjectInvokable<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes, outType)); } @@ -325,7 +325,7 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> outType = (TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.transform("projection", outType, + return dataStream.transform("Projection", outType, new ProjectInvokable<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes, outType)); } @@ -369,7 +369,7 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> outType = (TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.transform("projection", outType, + return dataStream.transform("Projection", outType, new ProjectInvokable<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes, outType)); } @@ -415,7 +415,7 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> outType = (TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.transform("projection", outType, + return dataStream.transform("Projection", outType, new ProjectInvokable<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>( fieldIndexes, outType)); } @@ -465,7 +465,7 @@ public class StreamProjection<IN> { @SuppressWarnings("unchecked") TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> outType = (TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>) extractFieldTypes( fieldIndexes, types, inTypeInfo); - return dataStream.transform("projection", outType, + return dataStream.transform("Projection", outType, new ProjectInvokable<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>( fieldIndexes, outType)); } @@ -519,7 +519,7 @@ public class StreamProjection<IN> { fieldIndexes, types, inTypeInfo); return dataStream .transform( - "projection", + "Projection", outType, new ProjectInvokable<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>( fieldIndexes, outType)); @@ -576,7 +576,7 @@ public class StreamProjection<IN> { fieldIndexes, types, inTypeInfo); return dataStream .transform( - "projection", + "Projection", outType, new ProjectInvokable<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>( fieldIndexes, outType)); @@ -635,7 +635,7 @@ public class StreamProjection<IN> { fieldIndexes, types, inTypeInfo); return dataStream .transform( - "projection", + "Projection", outType, new ProjectInvokable<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>( fieldIndexes, outType)); @@ -697,7 +697,7 @@ public class StreamProjection<IN> { fieldIndexes, types, inTypeInfo); return dataStream .transform( - "projection", + "Projection", outType, new ProjectInvokable<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>( fieldIndexes, outType)); @@ -761,7 +761,7 @@ public class StreamProjection<IN> { fieldIndexes, types, inTypeInfo); return dataStream .transform( - "projection", + "Projection", outType, new ProjectInvokable<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>( fieldIndexes, outType)); @@ -827,7 +827,7 @@ public class StreamProjection<IN> { fieldIndexes, types, inTypeInfo); return dataStream .transform( - "projection", + "Projection", outType, new ProjectInvokable<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>( fieldIndexes, outType)); @@ -895,7 +895,7 @@ public class StreamProjection<IN> { fieldIndexes, types, inTypeInfo); return dataStream .transform( - "projection", + "Projection", outType, new ProjectInvokable<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>( fieldIndexes, outType)); @@ -966,7 +966,7 @@ public class StreamProjection<IN> { fieldIndexes, types, inTypeInfo); return dataStream .transform( - "projection", + "Projection", outType, new ProjectInvokable<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>( fieldIndexes, outType)); @@ -1039,7 +1039,7 @@ public class StreamProjection<IN> { fieldIndexes, types, inTypeInfo); return dataStream .transform( - "projection", + "Projection", outType, new ProjectInvokable<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>( fieldIndexes, outType)); @@ -1115,7 +1115,7 @@ public class StreamProjection<IN> { fieldIndexes, types, inTypeInfo); return dataStream .transform( - "projection", + "Projection", outType, new ProjectInvokable<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>( fieldIndexes, outType)); @@ -1193,7 +1193,7 @@ public class StreamProjection<IN> { fieldIndexes, types, inTypeInfo); return dataStream .transform( - "projection", + "Projection", outType, new ProjectInvokable<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>( fieldIndexes, outType)); @@ -1274,7 +1274,7 @@ public class StreamProjection<IN> { fieldIndexes, types, inTypeInfo); return dataStream .transform( - "projection", + "Projection", outType, new ProjectInvokable<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>( fieldIndexes, outType)); @@ -1357,7 +1357,7 @@ public class StreamProjection<IN> { fieldIndexes, types, inTypeInfo); return dataStream .transform( - "projection", + "Projection", outType, new ProjectInvokable<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>( fieldIndexes, outType)); @@ -1442,7 +1442,7 @@ public class StreamProjection<IN> { fieldIndexes, types, inTypeInfo); return dataStream .transform( - "projection", + "Projection", outType, new ProjectInvokable<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>( http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java index e81395d..4fe356b 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java @@ -231,7 +231,7 @@ public class WindowedDataStream<OUT> { * @return The transformed DataStream */ public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reduceFunction) { - return dataStream.transform("WindowReduce", getType(), + return dataStream.transform("Window-Reduce", getType(), getReduceInvokable(reduceFunction)); } @@ -279,7 +279,7 @@ public class WindowedDataStream<OUT> { public <R> SingleOutputStreamOperator<R, ?> reduceGroup( GroupReduceFunction<OUT, R> reduceFunction, TypeInformation<R> outType) { - return dataStream.transform("WindowReduce", outType, + return dataStream.transform("Window-Reduce", outType, getReduceGroupInvokable(reduceFunction)); } @@ -507,7 +507,7 @@ public class WindowedDataStream<OUT> { private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregator) { StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregator); - SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.transform("windowReduce", + SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.transform("Window-Aggregation", getType(), invokable); return returnStream; http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index e0cdd02..352e0fd 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -32,7 +32,9 @@ import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.client.program.Client; +import org.apache.flink.client.program.Client.OptimizerPlanEnvironment; import org.apache.flink.client.program.ContextEnvironment; +import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.StreamGraph; import org.apache.flink.streaming.api.datastream.DataStream; @@ -267,7 +269,7 @@ public abstract class StreamExecutionEnvironment { SourceFunction<OUT> function = new FromElementsFunction<OUT>(data); - return addSource(function, outTypeInfo, "elements"); + return addSource(function, outTypeInfo, "Elements source"); } /** @@ -294,7 +296,7 @@ public abstract class StreamExecutionEnvironment { TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data.iterator().next()); SourceFunction<OUT> function = new FromElementsFunction<OUT>(data); - return addSource(function, outTypeInfo, "collection"); + return addSource(function, outTypeInfo, "Collection Source"); } /** @@ -313,7 +315,7 @@ public abstract class StreamExecutionEnvironment { */ public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter) { return addSource(new SocketTextStreamFunction(hostname, port, delimiter), null, - "socketStream"); + "Socket Stream"); } /** @@ -345,13 +347,13 @@ public abstract class StreamExecutionEnvironment { if (from > to) { throw new IllegalArgumentException("Start of sequence must not be greater than the end"); } - return addSource(new GenSequenceFunction(from, to), null, "sequence"); + return addSource(new GenSequenceFunction(from, to), null, "Sequence Source"); } private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat, TypeInformation<String> typeInfo) { FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo); - DataStreamSource<String> returnStream = addSource(function, null, "fileSource"); + DataStreamSource<String> returnStream = addSource(function, null, "File Source"); streamGraph.setInputFormat(returnStream.getId(), inputFormat); return returnStream; } @@ -397,7 +399,7 @@ public abstract class StreamExecutionEnvironment { */ public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, TypeInformation<OUT> outTypeInfo) { - return addSource(function, outTypeInfo, function.getClass().getName()); + return addSource(function, outTypeInfo, "Custom Source"); } /** @@ -462,6 +464,8 @@ public abstract class StreamExecutionEnvironment { ContextEnvironment ctx = (ContextEnvironment) env; return createContextEnvironment(ctx.getClient(), ctx.getJars(), ctx.getDegreeOfParallelism()); + } else if (env instanceof OptimizerPlanEnvironment | env instanceof PreviewPlanEnvironment) { + return new StreamPlanEnvironment(env); } else { return createLocalEnvironment(); } @@ -592,4 +596,16 @@ public abstract class StreamExecutionEnvironment { return streamGraph; } + /** + * Creates the plan with which the system will execute the program, and + * returns it as a String using a JSON representation of the execution data + * flow graph. Note that this needs to be called, before the plan is + * executed. + * + * @return The execution plan of the program, as a JSON String. + */ + public String getExecutionPlan() { + return getStreamGraph().getStreamingPlanAsJSON(); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java new file mode 100644 index 0000000..412dcd0 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.environment; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.client.program.Client; +import org.apache.flink.client.program.Client.OptimizerPlanEnvironment; +import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.GlobalConfiguration; + +public class StreamPlanEnvironment extends StreamExecutionEnvironment { + + private ExecutionEnvironment env; + + protected StreamPlanEnvironment(ExecutionEnvironment env) { + super(); + this.env = env; + + int dop = env.getDegreeOfParallelism(); + if (dop > 0) { + setDegreeOfParallelism(dop); + } else { + setDegreeOfParallelism(GlobalConfiguration.getInteger( + ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY, + ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE)); + } + } + + @Override + public void execute() throws Exception { + execute(""); + } + + @Override + public void execute(String jobName) throws Exception { + + streamGraph.setJobName(jobName); + + if (env instanceof OptimizerPlanEnvironment) { + ((OptimizerPlanEnvironment) env).setPlan(streamGraph); + } else if (env instanceof PreviewPlanEnvironment) { + ((PreviewPlanEnvironment) env).setPreview(streamGraph.getStreamingPlanAsJSON()); + } + throw new Client.ProgramAbortException(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java index 614b67f..6cee5f2 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java @@ -179,4 +179,8 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable { public static enum ChainingStrategy { ALWAYS, NEVER, HEAD; } + + public Function getUserFunction() { + return userFunction; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java index e61555d..6bc036e 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java @@ -35,7 +35,7 @@ public class FieldsPartitioner<T> extends StreamPartitioner<T> { KeySelector<T, ?> keySelector; public FieldsPartitioner(KeySelector<T, ?> keySelector) { - super(PartitioningStrategy.FIELDS); + super(PartitioningStrategy.GROUPBY); this.keySelector = keySelector; } http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java index e8a06b4..19a8dba 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java @@ -27,7 +27,7 @@ public abstract class StreamPartitioner<T> implements public enum PartitioningStrategy { - FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, FIELDS; + FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY; } http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index e0f50f8..08c7fa3 100644 --- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -203,6 +203,15 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { */ def execute(jobName: String) = javaEnv.execute(jobName) + /** + * Creates the plan with which the system will execute the program, and + * returns it as a String using a JSON representation of the execution data + * flow graph. Note that this needs to be called, before the plan is + * executed. + * + */ + def getExecutionPlan() = javaEnv.getStreamGraph().getStreamingPlanAsJSON(); + } object StreamExecutionEnvironment { http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java index f8a9b32..1eb6be1 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java @@ -103,7 +103,7 @@ public class RemoteExecutor extends PlanExecutor { JobWithJars p = new JobWithJars(plan, this.jarFiles); Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader()); - OptimizedPlan op = c.getOptimizedPlan(p, -1); + OptimizedPlan op = (OptimizedPlan) c.getOptimizedPlan(p, -1); PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator(); return jsonGen.getOptimizerPlanAsJSON(op); } http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index d8f1bf7..f1444ff 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -26,20 +26,18 @@ import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.TimeUnit; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import org.apache.flink.runtime.messages.JobManagerMessages.SubmissionFailure; -import org.apache.flink.runtime.messages.JobManagerMessages.SubmissionResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironmentFactory; import org.apache.flink.compiler.CompilerException; import org.apache.flink.compiler.DataStatistics; import org.apache.flink.compiler.PactCompiler; import org.apache.flink.compiler.contextcheck.ContextChecker; import org.apache.flink.compiler.costs.DefaultCostEstimator; +import org.apache.flink.compiler.plan.FlinkPlan; import org.apache.flink.compiler.plan.OptimizedPlan; +import org.apache.flink.compiler.plan.StreamingPlan; import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator; import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator; import org.apache.flink.configuration.ConfigConstants; @@ -49,13 +47,17 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.client.JobClient; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.messages.JobManagerMessages.SubmissionFailure; +import org.apache.flink.runtime.messages.JobManagerMessages.SubmissionResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.ExecutionEnvironmentFactory; import scala.Tuple2; import scala.concurrent.duration.FiniteDuration; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; + +import com.google.common.base.Preconditions; /** * Encapsulates the functionality necessary to submit a program to a remote cluster. @@ -133,10 +135,10 @@ public class Client { public String getOptimizedPlanAsJson(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException { PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator(); - return jsonGen.getOptimizerPlanAsJSON(getOptimizedPlan(prog, parallelism)); + return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(prog, parallelism)); } - public OptimizedPlan getOptimizedPlan(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException { + public FlinkPlan getOptimizedPlan(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException { Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); if (prog.isUsingProgramEntryPoint()) { return getOptimizedPlan(prog.getPlanWithJars(), parallelism); @@ -189,7 +191,7 @@ public class Client { } } - public OptimizedPlan getOptimizedPlan(Plan p, int parallelism) throws CompilerException { + public FlinkPlan getOptimizedPlan(Plan p, int parallelism) throws CompilerException { if (parallelism > 0 && p.getDefaultParallelism() <= 0) { p.setDefaultParallelism(parallelism); } @@ -208,25 +210,31 @@ public class Client { * @throws CompilerException Thrown, if the compiler encounters an illegal situation. * @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file. */ - public OptimizedPlan getOptimizedPlan(JobWithJars prog, int parallelism) throws CompilerException, ProgramInvocationException { + public FlinkPlan getOptimizedPlan(JobWithJars prog, int parallelism) throws CompilerException, ProgramInvocationException { return getOptimizedPlan(prog.getPlan(), parallelism); } - public JobGraph getJobGraph(PackagedProgram prog, OptimizedPlan optPlan) throws ProgramInvocationException { + public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException { return getJobGraph(optPlan, prog.getAllLibraries()); } - private JobGraph getJobGraph(OptimizedPlan optPlan, List<File> jarFiles) { - NepheleJobGraphGenerator gen = new NepheleJobGraphGenerator(); - JobGraph job = gen.compileJobGraph(optPlan); - + private JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) { + JobGraph job = null; + + if (optPlan instanceof StreamingPlan) { + job = ((StreamingPlan) optPlan).getJobGraph(); + } else { + NepheleJobGraphGenerator gen = new NepheleJobGraphGenerator(); + job = gen.compileJobGraph((OptimizedPlan) optPlan); + } + for (File jar : jarFiles) { job.addJar(new Path(jar.getAbsolutePath())); } - + return job; } - + public JobExecutionResult run(final PackagedProgram prog, int parallelism, boolean wait) throws ProgramInvocationException { Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); if (prog.isUsingProgramEntryPoint()) { @@ -287,7 +295,7 @@ public class Client { * on the nephele system failed. */ public JobExecutionResult run(JobWithJars prog, int parallelism, boolean wait) throws CompilerException, ProgramInvocationException { - return run(getOptimizedPlan(prog, parallelism), prog.getJarFiles(), wait); + return run((OptimizedPlan) getOptimizedPlan(prog, parallelism), prog.getJarFiles(), wait); } @@ -346,11 +354,11 @@ public class Client { // -------------------------------------------------------------------------------------------- - private static final class OptimizerPlanEnvironment extends ExecutionEnvironment { + public static final class OptimizerPlanEnvironment extends ExecutionEnvironment { private final PactCompiler compiler; - private OptimizedPlan optimizerPlan; + private FlinkPlan optimizerPlan; private OptimizerPlanEnvironment(PactCompiler compiler) { @@ -385,6 +393,10 @@ public class Client { }; initializeContextEnvironment(factory); } + + public void setPlan(FlinkPlan plan){ + this.optimizerPlan = plan; + } } public static final class ProgramAbortException extends Error { http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java index 891ec1b..702514a 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java @@ -59,7 +59,7 @@ public class ContextEnvironment extends ExecutionEnvironment { public String getExecutionPlan() throws Exception { Plan p = createProgramPlan("unnamed job"); - OptimizedPlan op = this.client.getOptimizedPlan(p, getDegreeOfParallelism()); + OptimizedPlan op = (OptimizedPlan) this.client.getOptimizedPlan(p, getDegreeOfParallelism()); PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator(); return gen.getOptimizerPlanAsJSON(op); http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index b70a83c..f72baff 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -44,12 +44,12 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.Program; import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.util.InstantiationUtil; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.ExecutionEnvironmentFactory; import org.apache.flink.compiler.PactCompiler; import org.apache.flink.compiler.dag.DataSinkNode; import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator; +import org.apache.flink.util.InstantiationUtil; /** * This class encapsulates represents a program, packaged in a jar file. It supplies @@ -271,7 +271,9 @@ public class PackagedProgram { catch (Throwable t) { // the invocation gets aborted with the preview plan if (env.previewPlan != null) { - previewPlan = env.previewPlan; + previewPlan = env.previewPlan; + } else if (env.preview != null) { + return env.preview; } else { throw new ProgramInvocationException("The program caused an error: ", t); } @@ -290,7 +292,7 @@ public class PackagedProgram { else { throw new RuntimeException(); } - + PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator(); StringWriter string = new StringWriter(1024); PrintWriter pw = null; @@ -301,6 +303,7 @@ public class PackagedProgram { pw.close(); } return string.toString(); + } /** @@ -690,10 +693,12 @@ public class PackagedProgram { private List<DataSinkNode> previewPlan; private Plan plan; + private String preview = null; + @Override public JobExecutionResult execute(String jobName) throws Exception { this.plan = createProgramPlan(jobName); - this.previewPlan = PactCompiler.createPreOptimizedPlan(plan); + this.previewPlan = PactCompiler.createPreOptimizedPlan((Plan) plan); // do not go on with anything now! throw new Client.ProgramAbortException(); @@ -721,5 +726,10 @@ public class PackagedProgram { public Plan getPlan() { return this.plan; } + + public void setPreview(String preview) { + this.preview = preview; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java index 7eddb61..62414bf 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java +++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java @@ -40,7 +40,9 @@ import org.apache.flink.client.program.Client; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.compiler.CompilerException; +import org.apache.flink.compiler.plan.FlinkPlan; import org.apache.flink.compiler.plan.OptimizedPlan; +import org.apache.flink.compiler.plan.StreamingPlan; import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -166,7 +168,7 @@ public class JobSubmissionServlet extends HttpServlet { // create the plan String[] options = params.isEmpty() ? new String[0] : (String[]) params.toArray(new String[params.size()]); PackagedProgram program; - OptimizedPlan optPlan; + FlinkPlan optPlan; Client client; try { @@ -246,14 +248,18 @@ public class JobSubmissionServlet extends HttpServlet { String planName = uid + ".json"; File jsonFile = new File(this.planDumpDirectory, planName); - PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator(); - jsonGen.setEncodeForHTML(true); - jsonGen.dumpOptimizerPlanAsJSON(optPlan, jsonFile); - + if (optPlan instanceof StreamingPlan) { + ((StreamingPlan) optPlan).dumpStreamingPlanAsJSON(jsonFile); + } else { + PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator(); + jsonGen.setEncodeForHTML(true); + jsonGen.dumpOptimizerPlanAsJSON((OptimizedPlan) optPlan, jsonFile); + } + // submit the job only, if it should not be suspended if (!suspend) { try { - client.run(program, optPlan, false); + client.run(program,(OptimizedPlan) optPlan, false); } catch (Throwable t) { LOG.error("Error submitting job to the job-manager.", t); showErrorPage(resp, t.getMessage()); http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java index c6cada8..28b7d8a 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java @@ -45,7 +45,7 @@ public class ExecutionPlanCreationTest { InetSocketAddress mockJmAddress = new InetSocketAddress(mockAddress, 12345); Client client = new Client(mockJmAddress, new Configuration(), getClass().getClassLoader()); - OptimizedPlan op = client.getOptimizedPlan(prg, -1); + OptimizedPlan op = (OptimizedPlan) client.getOptimizedPlan(prg, -1); assertNotNull(op); PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator(); http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-compiler/src/main/java/org/apache/flink/compiler/plan/FlinkPlan.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/FlinkPlan.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/FlinkPlan.java new file mode 100644 index 0000000..eaa78b1 --- /dev/null +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/FlinkPlan.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.compiler.plan; + +/** + * A common interface for compiled Flink plans for both batch and streaming + * processing programs. + * + */ +public interface FlinkPlan { + +} http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-compiler/src/main/java/org/apache/flink/compiler/plan/OptimizedPlan.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/OptimizedPlan.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/OptimizedPlan.java index d555d93..00eb287 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/OptimizedPlan.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/OptimizedPlan.java @@ -30,7 +30,7 @@ import org.apache.flink.util.Visitor; * It works on this representation during its optimization. Finally, this plan is translated to a schedule * for the nephele runtime system. */ -public class OptimizedPlan implements Visitable<PlanNode> { +public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode> { /** * The data sources in the plan. http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-compiler/src/main/java/org/apache/flink/compiler/plan/StreamingPlan.java ---------------------------------------------------------------------- diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/StreamingPlan.java b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/StreamingPlan.java new file mode 100644 index 0000000..b2e2fa6 --- /dev/null +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/StreamingPlan.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.compiler.plan; + +import java.io.File; +import java.io.IOException; + +import org.apache.flink.runtime.jobgraph.JobGraph; + +/** + * Abstract class representing Flink Streaming plans + * + */ +public abstract class StreamingPlan implements FlinkPlan { + + public abstract JobGraph getJobGraph(); + + public abstract String getStreamingPlanAsJSON(); + + public abstract void dumpStreamingPlanAsJSON(File file) throws IOException; + +}
