Repository: flink
Updated Branches:
  refs/heads/master b263932e2 -> 82c420022


[FLINK-1434] [FLINK-1401] Streaming support added for webclient

Closes #334


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/82c42002
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/82c42002
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/82c42002

Branch: refs/heads/master
Commit: 82c420022ef3ffe3d7ad2172b4338ed12baf9e0e
Parents: b263932
Author: Gyula Fora <[email protected]>
Authored: Sun Jan 25 12:28:18 2015 +0100
Committer: Gyula Fora <[email protected]>
Committed: Tue Jan 27 23:31:25 2015 +0100

----------------------------------------------------------------------
 .../flink-streaming-core/pom.xml                |  6 ++
 .../apache/flink/streaming/api/StreamGraph.java | 96 +++++++++++++++++++-
 .../api/StreamingJobGraphGenerator.java         |  9 +-
 .../api/datastream/ConnectedDataStream.java     | 10 +-
 .../streaming/api/datastream/DataStream.java    | 23 +++--
 .../api/datastream/GroupedDataStream.java       |  4 +-
 .../api/datastream/StreamProjection.java        | 50 +++++-----
 .../api/datastream/WindowedDataStream.java      |  6 +-
 .../environment/StreamExecutionEnvironment.java | 28 ++++--
 .../api/environment/StreamPlanEnvironment.java  | 62 +++++++++++++
 .../api/invokable/StreamInvokable.java          |  4 +
 .../partitioner/FieldsPartitioner.java          |  2 +-
 .../partitioner/StreamPartitioner.java          |  2 +-
 .../api/scala/StreamExecutionEnvironment.scala  |  9 ++
 .../org/apache/flink/client/RemoteExecutor.java |  2 +-
 .../org/apache/flink/client/program/Client.java | 60 +++++++-----
 .../client/program/ContextEnvironment.java      |  2 +-
 .../flink/client/program/PackagedProgram.java   | 18 +++-
 .../flink/client/web/JobSubmissionServlet.java  | 18 ++--
 .../program/ExecutionPlanCreationTest.java      |  2 +-
 .../apache/flink/compiler/plan/FlinkPlan.java   | 28 ++++++
 .../flink/compiler/plan/OptimizedPlan.java      |  2 +-
 .../flink/compiler/plan/StreamingPlan.java      | 38 ++++++++
 23 files changed, 385 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/pom.xml 
b/flink-addons/flink-streaming/flink-streaming-core/pom.xml
index 9231cbf..a60bd01 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/pom.xml
+++ b/flink-addons/flink-streaming/flink-streaming-core/pom.xml
@@ -48,6 +48,12 @@ under the License.
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
+        
+               <dependency>
+                       <groupId>org.apache.sling</groupId>
+                       <artifactId>org.apache.sling.commons.json</artifactId>
+                       <version>2.0.6</version>
+               </dependency>
        </dependencies>
 
        <build>

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index c515e63..b5e43af 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -17,6 +17,10 @@
 
 package org.apache.flink.streaming.api;
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -27,6 +31,7 @@ import java.util.Set;
 
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.compiler.plan.StreamingPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.streaming.api.collector.OutputSelector;
@@ -39,18 +44,22 @@ import 
org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
 import org.apache.flink.streaming.api.streamvertex.StreamVertex;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.state.OperatorState;
+import org.apache.sling.commons.json.JSONArray;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Object for building Apache Flink stream processing graphs
  */
-public class StreamGraph {
+public class StreamGraph extends StreamingPlan {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(StreamGraph.class);
        private final static String DEAFULT_JOB_NAME = "Flink Streaming Job";
 
        protected boolean chaining = true;
+       private String jobName = DEAFULT_JOB_NAME;
 
        // Graph attributes
        private Map<String, Integer> operatorParallelisms;
@@ -440,7 +449,7 @@ public class StreamGraph {
         * Gets the assembled {@link JobGraph} and adds a default name for it.
         */
        public JobGraph getJobGraph() {
-               return getJobGraph(DEAFULT_JOB_NAME);
+               return getJobGraph(jobName);
        }
 
        /**
@@ -452,11 +461,16 @@ public class StreamGraph {
         */
        public JobGraph getJobGraph(String jobGraphName) {
 
+               this.jobName = jobGraphName;
                StreamingJobGraphGenerator optimizer = new 
StreamingJobGraphGenerator(this);
 
                return optimizer.createJobGraph(jobGraphName);
        }
 
+       public void setJobName(String jobName) {
+               this.jobName = jobName;
+       }
+
        public void setChaining(boolean chaining) {
                this.chaining = chaining;
        }
@@ -525,4 +539,82 @@ public class StreamGraph {
                return iterationTimeouts.get(vertexName);
        }
 
+       public String getOperatorName(String vertexName) {
+               return operatorNames.get(vertexName);
+       }
+
+       @Override
+       public String getStreamingPlanAsJSON() {
+
+               try {
+                       JSONObject json = new JSONObject();
+                       JSONArray nodes = new JSONArray();
+
+                       json.put("nodes", nodes);
+
+                       for (String id : operatorNames.keySet()) {
+                               JSONObject node = new JSONObject();
+                               nodes.put(node);
+
+                               node.put("id", Integer.valueOf(id));
+                               node.put("type", getOperatorName(id));
+
+                               if (sources.contains(id)) {
+                                       node.put("pact", "Data Source");
+                               } else {
+                                       node.put("pact", "Data Stream");
+                               }
+
+                               node.put("contents", getOperatorName(id) + " at 
"
+                                               + 
getInvokable(id).getUserFunction().getClass().getSimpleName());
+                               node.put("parallelism", getParallelism(id));
+
+                               int numIn = getInEdges(id).size();
+                               if (numIn > 0) {
+
+                                       JSONArray inputs = new JSONArray();
+                                       node.put("predecessors", inputs);
+
+                                       for (int i = 0; i < numIn; i++) {
+
+                                               String inID = 
getInEdges(id).get(i);
+
+                                               JSONObject input = new 
JSONObject();
+                                               inputs.put(input);
+
+                                               input.put("id", 
Integer.valueOf(inID));
+                                               input.put("ship_strategy", 
getOutPartitioner(inID, id).getStrategy());
+                                               if (i == 0) {
+                                                       input.put("side", 
"first");
+                                               } else if (i == 1) {
+                                                       input.put("side", 
"second");
+                                               }
+                                       }
+                               }
+
+                       }
+                       return json.toString();
+               } catch (JSONException e) {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("JSON plan creation failed: {}", e);
+                       }
+                       return "";
+               }
+
+       }
+
+       @Override
+       public void dumpStreamingPlanAsJSON(File file) throws IOException {
+               PrintWriter pw = null;
+               try {
+                       pw = new PrintWriter(new FileOutputStream(file), false);
+                       pw.write(getStreamingPlanAsJSON());
+                       pw.flush();
+
+               } finally {
+                       if (pw != null) {
+                               pw.close();
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index fccb1e1..d9a1b66 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -141,13 +141,13 @@ public class StreamingJobGraphGenerator {
                }
        }
 
-       private String createChainedName(String vertexName, List<String> 
chainedOutputs) {
+       private String createChainedName(String vertexID, List<String> 
chainedOutputs) {
+               String vertexName = streamGraph.getOperatorName(vertexID);
                if (chainedOutputs.size() > 1) {
                        List<String> outputChainedNames = new 
ArrayList<String>();
                        for (String chainable : chainedOutputs) {
                                
outputChainedNames.add(chainedNames.get(chainable));
                        }
-
                        return vertexName + " -> (" + 
StringUtils.join(outputChainedNames, ", ") + ")";
                } else if (chainedOutputs.size() == 1) {
                        return vertexName + " -> " + 
chainedNames.get(chainedOutputs.get(0));
@@ -162,7 +162,10 @@ public class StreamingJobGraphGenerator {
                AbstractJobVertex vertex = new 
AbstractJobVertex(chainedNames.get(vertexName));
 
                
vertex.setInvokableClass(streamGraph.getJobVertexClass(vertexName));
-               vertex.setParallelism(streamGraph.getParallelism(vertexName));
+               if (streamGraph.getParallelism(vertexName) > 0) {
+                       
vertex.setParallelism(streamGraph.getParallelism(vertexName));
+               }
+
                if (LOG.isDebugEnabled()) {
                        LOG.debug("Parallelism set: {} for {}", 
streamGraph.getParallelism(vertexName),
                                        vertexName);

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index db8649b..a6eade8 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -244,7 +244,7 @@ public class ConnectedDataStream<IN1, IN2> {
                TypeInformation<OUT> outTypeInfo = 
TypeExtractor.createTypeInfo(CoMapFunction.class,
                                coMapper.getClass(), 2, null, null);
 
-               return addCoFunction("coMap", outTypeInfo, new 
CoMapInvokable<IN1, IN2, OUT>(
+               return addCoFunction("Co-Map", outTypeInfo, new 
CoMapInvokable<IN1, IN2, OUT>(
                                clean(coMapper)));
 
        }
@@ -269,7 +269,7 @@ public class ConnectedDataStream<IN1, IN2> {
                TypeInformation<OUT> outTypeInfo = 
TypeExtractor.createTypeInfo(CoFlatMapFunction.class,
                                coFlatMapper.getClass(), 2, null, null);
 
-               return addCoFunction("coFlatMap", outTypeInfo, new 
CoFlatMapInvokable<IN1, IN2, OUT>(
+               return addCoFunction("Co-Flat Map", outTypeInfo, new 
CoFlatMapInvokable<IN1, IN2, OUT>(
                                clean(coFlatMapper)));
        }
 
@@ -294,7 +294,7 @@ public class ConnectedDataStream<IN1, IN2> {
                TypeInformation<OUT> outTypeInfo = 
TypeExtractor.createTypeInfo(CoReduceFunction.class,
                                coReducer.getClass(), 2, null, null);
 
-               return addCoFunction("coReduce", outTypeInfo, 
getReduceInvokable(clean(coReducer)));
+               return addCoFunction("Co-Reduce", outTypeInfo, 
getReduceInvokable(clean(coReducer)));
 
        }
 
@@ -361,7 +361,7 @@ public class ConnectedDataStream<IN1, IN2> {
                TypeInformation<OUT> outTypeInfo = 
TypeExtractor.createTypeInfo(CoWindowFunction.class,
                                coWindowFunction.getClass(), 2, null, null);
 
-               return addCoFunction("coWindowReduce", outTypeInfo, new 
CoWindowInvokable<IN1, IN2, OUT>(
+               return addCoFunction("Co-Window", outTypeInfo, new 
CoWindowInvokable<IN1, IN2, OUT>(
                                clean(coWindowFunction), windowSize, 
slideInterval, timestamp1, timestamp2));
 
        }
@@ -390,7 +390,7 @@ public class ConnectedDataStream<IN1, IN2> {
                        throw new IllegalArgumentException("Slide interval must 
be positive");
                }
 
-               return addCoFunction("coWindowReduce", outTypeInfo, new 
CoWindowInvokable<IN1, IN2, OUT>(
+               return addCoFunction("Co-Window", outTypeInfo, new 
CoWindowInvokable<IN1, IN2, OUT>(
                                clean(coWindowFunction), windowSize, 
slideInterval, timestamp1, timestamp2));
 
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index c2f5d44..8e87b27 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -95,6 +95,7 @@ public class DataStream<OUT> {
        protected static Integer counter = 0;
        protected final StreamExecutionEnvironment environment;
        protected final String id;
+       protected final String type;
        protected int degreeOfParallelism;
        protected List<String> userDefinedNames;
        protected StreamPartitioner<OUT> partitioner;
@@ -122,7 +123,8 @@ public class DataStream<OUT> {
                }
 
                counter++;
-               this.id = operatorType + "-" + counter.toString();
+               this.id = counter.toString();
+               this.type = operatorType;
                this.environment = environment;
                this.degreeOfParallelism = environment.getDegreeOfParallelism();
                this.streamGraph = environment.getStreamGraph();
@@ -142,6 +144,7 @@ public class DataStream<OUT> {
        public DataStream(DataStream<OUT> dataStream) {
                this.environment = dataStream.environment;
                this.id = dataStream.id;
+               this.type = dataStream.type;
                this.degreeOfParallelism = dataStream.degreeOfParallelism;
                this.userDefinedNames = new 
ArrayList<String>(dataStream.userDefinedNames);
                this.partitioner = dataStream.partitioner;
@@ -465,7 +468,7 @@ public class DataStream<OUT> {
 
                TypeInformation<R> outType = 
TypeExtractor.getMapReturnTypes(clean(mapper), getType());
 
-               return transform("map", outType, new MapInvokable<OUT, 
R>(clean(mapper)));
+               return transform("Map", outType, new MapInvokable<OUT, 
R>(clean(mapper)));
        }
 
        /**
@@ -489,7 +492,7 @@ public class DataStream<OUT> {
                TypeInformation<R> outType = 
TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
                                getType());
 
-               return transform("flatMap", outType, new FlatMapInvokable<OUT, 
R>(clean(flatMapper)));
+               return transform("Flat Map", outType, new FlatMapInvokable<OUT, 
R>(clean(flatMapper)));
 
        }
 
@@ -506,7 +509,7 @@ public class DataStream<OUT> {
         */
        public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> 
reducer) {
 
-               return transform("reduce", getType(), new 
StreamReduceInvokable<OUT>(clean(reducer)));
+               return transform("Reduce", getType(), new 
StreamReduceInvokable<OUT>(clean(reducer)));
 
        }
 
@@ -525,7 +528,7 @@ public class DataStream<OUT> {
         * @return The filtered DataStream.
         */
        public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT> 
filter) {
-               return transform("filter", getType(), new 
FilterInvokable<OUT>(clean(filter)));
+               return transform("Filter", getType(), new 
FilterInvokable<OUT>(clean(filter)));
 
        }
 
@@ -834,7 +837,7 @@ public class DataStream<OUT> {
        public SingleOutputStreamOperator<Long, ?> count() {
                TypeInformation<Long> outTypeInfo = 
TypeExtractor.getForObject(Long.valueOf(0));
 
-               return transform("counter", outTypeInfo, new 
CounterInvokable<OUT>());
+               return transform("Count", outTypeInfo, new 
CounterInvokable<OUT>());
        }
 
        /**
@@ -1090,12 +1093,12 @@ public class DataStream<OUT> {
 
                StreamReduceInvokable<OUT> invokable = new 
StreamReduceInvokable<OUT>(aggregate);
 
-               SingleOutputStreamOperator<OUT, ?> returnStream = 
transform("reduce", getType(), invokable);
+               SingleOutputStreamOperator<OUT, ?> returnStream = 
transform("Aggregation", getType(),
+                               invokable);
 
                return returnStream;
        }
 
-
        /**
         * Method for passing user defined invokables along with the type
         * informations that will transform the DataStream.
@@ -1179,8 +1182,8 @@ public class DataStream<OUT> {
                DataStreamSink<OUT> returnStream = new 
DataStreamSink<OUT>(environment, "sink", getType(),
                                sinkInvokable);
 
-               streamGraph.addStreamVertex(returnStream.getId(), 
sinkInvokable, getType(), null, "sink",
-                               returnStream.getParallelism());
+               streamGraph.addStreamVertex(returnStream.getId(), 
sinkInvokable, getType(), null,
+                               "Stream Sink", returnStream.getParallelism());
 
                this.connectGraph(this.copy(), returnStream.getId(), 0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 4a5c0c2..c871c20 100755
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -66,7 +66,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
         */
        @Override
        public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> 
reducer) {
-               return transform("groupReduce", getType(), new 
GroupedReduceInvokable<OUT>(clean(reducer),
+               return transform("Grouped Reduce", getType(), new 
GroupedReduceInvokable<OUT>(clean(reducer),
                                keySelector));
        }
 
@@ -186,7 +186,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> 
{
                GroupedReduceInvokable<OUT> invokable = new 
GroupedReduceInvokable<OUT>(clean(aggregate),
                                keySelector);
 
-               SingleOutputStreamOperator<OUT, ?> returnStream = 
transform("groupReduce", getType(),
+               SingleOutputStreamOperator<OUT, ?> returnStream = 
transform("Grouped Aggregation", getType(),
                                invokable);
 
                return returnStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
index c8ad533..915ca30 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
@@ -83,7 +83,7 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple1<T0>> outType = 
(TypeInformation<Tuple1<T0>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.transform("projection", outType, new 
ProjectInvokable<IN, Tuple1<T0>>(
+               return dataStream.transform("Projection", outType, new 
ProjectInvokable<IN, Tuple1<T0>>(
                                fieldIndexes, outType));
        }
 
@@ -111,7 +111,7 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple2<T0, T1>> outType = 
(TypeInformation<Tuple2<T0, T1>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.transform("projection", outType,
+               return dataStream.transform("Projection", outType,
                                new ProjectInvokable<IN, Tuple2<T0, 
T1>>(fieldIndexes, outType));
        }
 
@@ -141,7 +141,7 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple3<T0, T1, T2>> outType = 
(TypeInformation<Tuple3<T0, T1, T2>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.transform("projection", outType,
+               return dataStream.transform("Projection", outType,
                                new ProjectInvokable<IN, Tuple3<T0, T1, 
T2>>(fieldIndexes, outType));
        }
 
@@ -173,7 +173,7 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple4<T0, T1, T2, T3>> outType = 
(TypeInformation<Tuple4<T0, T1, T2, T3>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.transform("projection", outType,
+               return dataStream.transform("Projection", outType,
                                new ProjectInvokable<IN, Tuple4<T0, T1, T2, 
T3>>(fieldIndexes, outType));
        }
 
@@ -206,7 +206,7 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple5<T0, T1, T2, T3, T4>> outType = 
(TypeInformation<Tuple5<T0, T1, T2, T3, T4>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.transform("projection", outType,
+               return dataStream.transform("Projection", outType,
                                new ProjectInvokable<IN, Tuple5<T0, T1, T2, T3, 
T4>>(fieldIndexes, outType));
        }
 
@@ -243,7 +243,7 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>> outType = 
(TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.transform("projection", outType,
+               return dataStream.transform("Projection", outType,
                                new ProjectInvokable<IN, Tuple6<T0, T1, T2, T3, 
T4, T5>>(fieldIndexes, outType));
        }
 
@@ -283,7 +283,7 @@ public class StreamProjection<IN> {
                TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>> outType = 
(TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
                return dataStream
-                               .transform("projection", outType,
+                               .transform("Projection", outType,
                                                new ProjectInvokable<IN, 
Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes,
                                                                outType));
        }
@@ -325,7 +325,7 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> outType 
= (TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.transform("projection", outType,
+               return dataStream.transform("Projection", outType,
                                new ProjectInvokable<IN, Tuple8<T0, T1, T2, T3, 
T4, T5, T6, T7>>(fieldIndexes,
                                                outType));
        }
@@ -369,7 +369,7 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> 
outType = (TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>) 
extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.transform("projection", outType,
+               return dataStream.transform("Projection", outType,
                                new ProjectInvokable<IN, Tuple9<T0, T1, T2, T3, 
T4, T5, T6, T7, T8>>(fieldIndexes,
                                                outType));
        }
@@ -415,7 +415,7 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, 
T9>> outType = (TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, 
T9>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.transform("projection", outType,
+               return dataStream.transform("Projection", outType,
                                new ProjectInvokable<IN, Tuple10<T0, T1, T2, 
T3, T4, T5, T6, T7, T8, T9>>(
                                                fieldIndexes, outType));
        }
@@ -465,7 +465,7 @@ public class StreamProjection<IN> {
                @SuppressWarnings("unchecked")
                TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, 
T10>> outType = (TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, 
T9, T10>>) extractFieldTypes(
                                fieldIndexes, types, inTypeInfo);
-               return dataStream.transform("projection", outType,
+               return dataStream.transform("Projection", outType,
                                new ProjectInvokable<IN, Tuple11<T0, T1, T2, 
T3, T4, T5, T6, T7, T8, T9, T10>>(
                                                fieldIndexes, outType));
        }
@@ -519,7 +519,7 @@ public class StreamProjection<IN> {
                                fieldIndexes, types, inTypeInfo);
                return dataStream
                                .transform(
-                                               "projection",
+                                               "Projection",
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(
                                                                fieldIndexes, 
outType));
@@ -576,7 +576,7 @@ public class StreamProjection<IN> {
                                fieldIndexes, types, inTypeInfo);
                return dataStream
                                .transform(
-                                               "projection",
+                                               "Projection",
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(
                                                                fieldIndexes, 
outType));
@@ -635,7 +635,7 @@ public class StreamProjection<IN> {
                                fieldIndexes, types, inTypeInfo);
                return dataStream
                                .transform(
-                                               "projection",
+                                               "Projection",
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(
                                                                fieldIndexes, 
outType));
@@ -697,7 +697,7 @@ public class StreamProjection<IN> {
                                fieldIndexes, types, inTypeInfo);
                return dataStream
                                .transform(
-                                               "projection",
+                                               "Projection",
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(
                                                                fieldIndexes, 
outType));
@@ -761,7 +761,7 @@ public class StreamProjection<IN> {
                                fieldIndexes, types, inTypeInfo);
                return dataStream
                                .transform(
-                                               "projection",
+                                               "Projection",
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(
                                                                fieldIndexes, 
outType));
@@ -827,7 +827,7 @@ public class StreamProjection<IN> {
                                fieldIndexes, types, inTypeInfo);
                return dataStream
                                .transform(
-                                               "projection",
+                                               "Projection",
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, 
T16>>(
                                                                fieldIndexes, 
outType));
@@ -895,7 +895,7 @@ public class StreamProjection<IN> {
                                fieldIndexes, types, inTypeInfo);
                return dataStream
                                .transform(
-                                               "projection",
+                                               "Projection",
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, 
T16, T17>>(
                                                                fieldIndexes, 
outType));
@@ -966,7 +966,7 @@ public class StreamProjection<IN> {
                                fieldIndexes, types, inTypeInfo);
                return dataStream
                                .transform(
-                                               "projection",
+                                               "Projection",
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, 
T16, T17, T18>>(
                                                                fieldIndexes, 
outType));
@@ -1039,7 +1039,7 @@ public class StreamProjection<IN> {
                                fieldIndexes, types, inTypeInfo);
                return dataStream
                                .transform(
-                                               "projection",
+                                               "Projection",
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, 
T16, T17, T18, T19>>(
                                                                fieldIndexes, 
outType));
@@ -1115,7 +1115,7 @@ public class StreamProjection<IN> {
                                fieldIndexes, types, inTypeInfo);
                return dataStream
                                .transform(
-                                               "projection",
+                                               "Projection",
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, 
T16, T17, T18, T19, T20>>(
                                                                fieldIndexes, 
outType));
@@ -1193,7 +1193,7 @@ public class StreamProjection<IN> {
                                fieldIndexes, types, inTypeInfo);
                return dataStream
                                .transform(
-                                               "projection",
+                                               "Projection",
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, 
T16, T17, T18, T19, T20, T21>>(
                                                                fieldIndexes, 
outType));
@@ -1274,7 +1274,7 @@ public class StreamProjection<IN> {
                                fieldIndexes, types, inTypeInfo);
                return dataStream
                                .transform(
-                                               "projection",
+                                               "Projection",
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, 
T16, T17, T18, T19, T20, T21, T22>>(
                                                                fieldIndexes, 
outType));
@@ -1357,7 +1357,7 @@ public class StreamProjection<IN> {
                                fieldIndexes, types, inTypeInfo);
                return dataStream
                                .transform(
-                                               "projection",
+                                               "Projection",
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, 
T16, T17, T18, T19, T20, T21, T22, T23>>(
                                                                fieldIndexes, 
outType));
@@ -1442,7 +1442,7 @@ public class StreamProjection<IN> {
                                fieldIndexes, types, inTypeInfo);
                return dataStream
                                .transform(
-                                               "projection",
+                                               "Projection",
 
                                                outType,
                                                new ProjectInvokable<IN, 
Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, 
T16, T17, T18, T19, T20, T21, T22, T23, T24>>(

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index e81395d..4fe356b 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -231,7 +231,7 @@ public class WindowedDataStream<OUT> {
         * @return The transformed DataStream
         */
        public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> 
reduceFunction) {
-               return dataStream.transform("WindowReduce", getType(),
+               return dataStream.transform("Window-Reduce", getType(),
                                getReduceInvokable(reduceFunction));
        }
 
@@ -279,7 +279,7 @@ public class WindowedDataStream<OUT> {
        public <R> SingleOutputStreamOperator<R, ?> reduceGroup(
                        GroupReduceFunction<OUT, R> reduceFunction, 
TypeInformation<R> outType) {
 
-               return dataStream.transform("WindowReduce", outType,
+               return dataStream.transform("Window-Reduce", outType,
                                getReduceGroupInvokable(reduceFunction));
        }
 
@@ -507,7 +507,7 @@ public class WindowedDataStream<OUT> {
        private SingleOutputStreamOperator<OUT, ?> 
aggregate(AggregationFunction<OUT> aggregator) {
                StreamInvokable<OUT, OUT> invokable = 
getReduceInvokable(aggregator);
 
-               SingleOutputStreamOperator<OUT, ?> returnStream = 
dataStream.transform("windowReduce",
+               SingleOutputStreamOperator<OUT, ?> returnStream = 
dataStream.transform("Window-Aggregation",
                                getType(), invokable);
 
                return returnStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index e0cdd02..352e0fd 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -32,7 +32,9 @@ import org.apache.flink.api.java.io.TextInputFormat;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.ContextEnvironment;
+import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.StreamGraph;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -267,7 +269,7 @@ public abstract class StreamExecutionEnvironment {
 
                SourceFunction<OUT> function = new 
FromElementsFunction<OUT>(data);
 
-               return addSource(function, outTypeInfo, "elements");
+               return addSource(function, outTypeInfo, "Elements source");
        }
 
        /**
@@ -294,7 +296,7 @@ public abstract class StreamExecutionEnvironment {
                TypeInformation<OUT> outTypeInfo = 
TypeExtractor.getForObject(data.iterator().next());
                SourceFunction<OUT> function = new 
FromElementsFunction<OUT>(data);
 
-               return addSource(function, outTypeInfo, "collection");
+               return addSource(function, outTypeInfo, "Collection Source");
        }
 
        /**
@@ -313,7 +315,7 @@ public abstract class StreamExecutionEnvironment {
         */
        public DataStreamSource<String> socketTextStream(String hostname, int 
port, char delimiter) {
                return addSource(new SocketTextStreamFunction(hostname, port, 
delimiter), null,
-                               "socketStream");
+                               "Socket Stream");
        }
 
        /**
@@ -345,13 +347,13 @@ public abstract class StreamExecutionEnvironment {
                if (from > to) {
                        throw new IllegalArgumentException("Start of sequence 
must not be greater than the end");
                }
-               return addSource(new GenSequenceFunction(from, to), null, 
"sequence");
+               return addSource(new GenSequenceFunction(from, to), null, 
"Sequence Source");
        }
 
        private DataStreamSource<String> addFileSource(InputFormat<String, ?> 
inputFormat,
                        TypeInformation<String> typeInfo) {
                FileSourceFunction function = new 
FileSourceFunction(inputFormat, typeInfo);
-               DataStreamSource<String> returnStream = addSource(function, 
null, "fileSource");
+               DataStreamSource<String> returnStream = addSource(function, 
null, "File Source");
                streamGraph.setInputFormat(returnStream.getId(), inputFormat);
                return returnStream;
        }
@@ -397,7 +399,7 @@ public abstract class StreamExecutionEnvironment {
         */
        public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> 
function,
                        TypeInformation<OUT> outTypeInfo) {
-               return addSource(function, outTypeInfo, 
function.getClass().getName());
+               return addSource(function, outTypeInfo, "Custom Source");
        }
 
        /**
@@ -462,6 +464,8 @@ public abstract class StreamExecutionEnvironment {
                        ContextEnvironment ctx = (ContextEnvironment) env;
                        return createContextEnvironment(ctx.getClient(), 
ctx.getJars(),
                                        ctx.getDegreeOfParallelism());
+               } else if (env instanceof OptimizerPlanEnvironment | env 
instanceof PreviewPlanEnvironment) {
+                       return new StreamPlanEnvironment(env);
                } else {
                        return createLocalEnvironment();
                }
@@ -592,4 +596,16 @@ public abstract class StreamExecutionEnvironment {
                return streamGraph;
        }
 
+       /**
+        * Creates the plan with which the system will execute the program, and
+        * returns it as a String using a JSON representation of the execution 
data
+        * flow graph. Note that this needs to be called, before the plan is
+        * executed.
+        * 
+        * @return The execution plan of the program, as a JSON String.
+        */
+       public String getExecutionPlan() {
+               return getStreamGraph().getStreamingPlanAsJSON();
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
new file mode 100644
index 0000000..412dcd0
--- /dev/null
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+
+public class StreamPlanEnvironment extends StreamExecutionEnvironment {
+
+       private ExecutionEnvironment env;
+
+       protected StreamPlanEnvironment(ExecutionEnvironment env) {
+               super();
+               this.env = env;
+
+               int dop = env.getDegreeOfParallelism();
+               if (dop > 0) {
+                       setDegreeOfParallelism(dop);
+               } else {
+                       setDegreeOfParallelism(GlobalConfiguration.getInteger(
+                                       
ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
+                                       
ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE));
+               }
+       }
+
+       @Override
+       public void execute() throws Exception {
+               execute("");
+       }
+
+       @Override
+       public void execute(String jobName) throws Exception {
+
+               streamGraph.setJobName(jobName);
+
+               if (env instanceof OptimizerPlanEnvironment) {
+                       ((OptimizerPlanEnvironment) env).setPlan(streamGraph);
+               } else if (env instanceof PreviewPlanEnvironment) {
+                       ((PreviewPlanEnvironment) 
env).setPreview(streamGraph.getStreamingPlanAsJSON());
+               }
+               throw new Client.ProgramAbortException();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index 614b67f..6cee5f2 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -179,4 +179,8 @@ public abstract class StreamInvokable<IN, OUT> implements 
Serializable {
        public static enum ChainingStrategy {
                ALWAYS, NEVER, HEAD;
        }
+
+       public Function getUserFunction() {
+               return userFunction;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
index e61555d..6bc036e 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/FieldsPartitioner.java
@@ -35,7 +35,7 @@ public class FieldsPartitioner<T> extends 
StreamPartitioner<T> {
        KeySelector<T, ?> keySelector;
 
        public FieldsPartitioner(KeySelector<T, ?> keySelector) {
-               super(PartitioningStrategy.FIELDS);
+               super(PartitioningStrategy.GROUPBY);
                this.keySelector = keySelector;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
index e8a06b4..19a8dba 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
+++ 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/partitioner/StreamPartitioner.java
@@ -27,7 +27,7 @@ public abstract class StreamPartitioner<T> implements
 
        public enum PartitioningStrategy {
 
-               FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, FIELDS;
+               FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY;
 
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index e0f50f8..08c7fa3 100644
--- 
a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -203,6 +203,15 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    */
   def execute(jobName: String) = javaEnv.execute(jobName)
 
+  /**
+   * Creates the plan with which the system will execute the program, and
+   * returns it as a String using a JSON representation of the execution data
+   * flow graph. Note that this needs to be called, before the plan is
+   * executed.
+   *
+   */
+  def getExecutionPlan() = javaEnv.getStreamGraph().getStreamingPlanAsJSON();
+
 }
 
 object StreamExecutionEnvironment {

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java 
b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index f8a9b32..1eb6be1 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -103,7 +103,7 @@ public class RemoteExecutor extends PlanExecutor {
                JobWithJars p = new JobWithJars(plan, this.jarFiles);
                Client c = new Client(this.address, new Configuration(), 
p.getUserCodeClassLoader());
                
-               OptimizedPlan op = c.getOptimizedPlan(p, -1);
+               OptimizedPlan op = (OptimizedPlan) c.getOptimizedPlan(p, -1);
                PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
                return jsonGen.getOptimizerPlanAsJSON(op);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java 
b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index d8f1bf7..f1444ff 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -26,20 +26,18 @@ import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import org.apache.flink.runtime.messages.JobManagerMessages.SubmissionFailure;
-import org.apache.flink.runtime.messages.JobManagerMessages.SubmissionResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.compiler.CompilerException;
 import org.apache.flink.compiler.DataStatistics;
 import org.apache.flink.compiler.PactCompiler;
 import org.apache.flink.compiler.contextcheck.ContextChecker;
 import org.apache.flink.compiler.costs.DefaultCostEstimator;
+import org.apache.flink.compiler.plan.FlinkPlan;
 import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.StreamingPlan;
 import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
 import org.apache.flink.configuration.ConfigConstants;
@@ -49,13 +47,17 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.JobManagerMessages.SubmissionFailure;
+import org.apache.flink.runtime.messages.JobManagerMessages.SubmissionResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import scala.Tuple2;
 import scala.concurrent.duration.FiniteDuration;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+
+import com.google.common.base.Preconditions;
 
 /**
  * Encapsulates the functionality necessary to submit a program to a remote 
cluster.
@@ -133,10 +135,10 @@ public class Client {
        
        public String getOptimizedPlanAsJson(PackagedProgram prog, int 
parallelism) throws CompilerException, ProgramInvocationException {
                PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
-               return jsonGen.getOptimizerPlanAsJSON(getOptimizedPlan(prog, 
parallelism));
+               return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) 
getOptimizedPlan(prog, parallelism));
        }
        
-       public OptimizedPlan getOptimizedPlan(PackagedProgram prog, int 
parallelism) throws CompilerException, ProgramInvocationException {
+       public FlinkPlan getOptimizedPlan(PackagedProgram prog, int 
parallelism) throws CompilerException, ProgramInvocationException {
                
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
                if (prog.isUsingProgramEntryPoint()) {
                        return getOptimizedPlan(prog.getPlanWithJars(), 
parallelism);
@@ -189,7 +191,7 @@ public class Client {
                }
        }
        
-       public OptimizedPlan getOptimizedPlan(Plan p, int parallelism) throws 
CompilerException {
+       public FlinkPlan getOptimizedPlan(Plan p, int parallelism) throws 
CompilerException {
                if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
                        p.setDefaultParallelism(parallelism);
                }
@@ -208,25 +210,31 @@ public class Client {
         * @throws CompilerException Thrown, if the compiler encounters an 
illegal situation.
         * @throws ProgramInvocationException Thrown, if the program could not 
be instantiated from its jar file.
         */
-       public OptimizedPlan getOptimizedPlan(JobWithJars prog, int 
parallelism) throws CompilerException, ProgramInvocationException {
+       public FlinkPlan getOptimizedPlan(JobWithJars prog, int parallelism) 
throws CompilerException, ProgramInvocationException {
                return getOptimizedPlan(prog.getPlan(), parallelism);
        }
        
-       public JobGraph getJobGraph(PackagedProgram prog, OptimizedPlan 
optPlan) throws ProgramInvocationException {
+       public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) 
throws ProgramInvocationException {
                return getJobGraph(optPlan, prog.getAllLibraries());
        }
        
-       private JobGraph getJobGraph(OptimizedPlan optPlan, List<File> 
jarFiles) {
-               NepheleJobGraphGenerator gen = new NepheleJobGraphGenerator();
-               JobGraph job = gen.compileJobGraph(optPlan);
-               
+       private JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) {
+               JobGraph job = null;
+
+               if (optPlan instanceof StreamingPlan) {
+                       job = ((StreamingPlan) optPlan).getJobGraph();
+               } else {
+                       NepheleJobGraphGenerator gen = new 
NepheleJobGraphGenerator();
+                       job = gen.compileJobGraph((OptimizedPlan) optPlan);
+               }
+
                for (File jar : jarFiles) {
                        job.addJar(new Path(jar.getAbsolutePath()));
                }
-               
+
                return job;
        }
-       
+
        public JobExecutionResult run(final PackagedProgram prog, int 
parallelism, boolean wait) throws ProgramInvocationException {
                
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
                if (prog.isUsingProgramEntryPoint()) {
@@ -287,7 +295,7 @@ public class Client {
         *                                    on the nephele system failed.
         */
        public JobExecutionResult run(JobWithJars prog, int parallelism, 
boolean wait) throws CompilerException, ProgramInvocationException {
-               return run(getOptimizedPlan(prog, parallelism), 
prog.getJarFiles(), wait);
+               return run((OptimizedPlan) getOptimizedPlan(prog, parallelism), 
prog.getJarFiles(), wait);
        }
        
 
@@ -346,11 +354,11 @@ public class Client {
        
        // 
--------------------------------------------------------------------------------------------
        
-       private static final class OptimizerPlanEnvironment extends 
ExecutionEnvironment {
+       public static final class OptimizerPlanEnvironment extends 
ExecutionEnvironment {
                
                private final PactCompiler compiler;
                
-               private OptimizedPlan optimizerPlan;
+               private FlinkPlan optimizerPlan;
                
                
                private OptimizerPlanEnvironment(PactCompiler compiler) {
@@ -385,6 +393,10 @@ public class Client {
                        };
                        initializeContextEnvironment(factory);
                }
+               
+               public void setPlan(FlinkPlan plan){
+                       this.optimizerPlan = plan;
+               }
        }
        
        public static final class ProgramAbortException extends Error {

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 891ec1b..702514a 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -59,7 +59,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
        public String getExecutionPlan() throws Exception {
                Plan p = createProgramPlan("unnamed job");
                
-               OptimizedPlan op = this.client.getOptimizedPlan(p, 
getDegreeOfParallelism());
+               OptimizedPlan op = (OptimizedPlan) 
this.client.getOptimizedPlan(p, getDegreeOfParallelism());
                
                PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
                return gen.getOptimizerPlanAsJSON(op);

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index b70a83c..f72baff 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -44,12 +44,12 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.Program;
 import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.compiler.PactCompiler;
 import org.apache.flink.compiler.dag.DataSinkNode;
 import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.util.InstantiationUtil;
 
 /**
  * This class encapsulates represents a program, packaged in a jar file. It 
supplies
@@ -271,7 +271,9 @@ public class PackagedProgram {
                        catch (Throwable t) {
                                // the invocation gets aborted with the preview 
plan
                                if (env.previewPlan != null) {
-                                       previewPlan =  env.previewPlan;
+                                       previewPlan = env.previewPlan;
+                               } else if (env.preview != null) {
+                                       return env.preview;
                                } else {
                                        throw new 
ProgramInvocationException("The program caused an error: ", t);
                                }
@@ -290,7 +292,7 @@ public class PackagedProgram {
                else {
                        throw new RuntimeException();
                }
-               
+
                PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
                StringWriter string = new StringWriter(1024);
                PrintWriter pw = null;
@@ -301,6 +303,7 @@ public class PackagedProgram {
                        pw.close();
                }
                return string.toString();
+
        }
 
        /**
@@ -690,10 +693,12 @@ public class PackagedProgram {
                private List<DataSinkNode> previewPlan;
                private Plan plan;
                
+               private String preview = null;
+               
                @Override
                public JobExecutionResult execute(String jobName) throws 
Exception {
                        this.plan = createProgramPlan(jobName);
-                       this.previewPlan = 
PactCompiler.createPreOptimizedPlan(plan);
+                       this.previewPlan = 
PactCompiler.createPreOptimizedPlan((Plan) plan);
                        
                        // do not go on with anything now!
                        throw new Client.ProgramAbortException();
@@ -721,5 +726,10 @@ public class PackagedProgram {
                public Plan getPlan() {
                        return this.plan;
                }
+               
+               public void setPreview(String preview) {
+                       this.preview = preview;
+               }
+
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
 
b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
index 7eddb61..62414bf 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
@@ -40,7 +40,9 @@ import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.compiler.CompilerException;
+import org.apache.flink.compiler.plan.FlinkPlan;
 import org.apache.flink.compiler.plan.OptimizedPlan;
+import org.apache.flink.compiler.plan.StreamingPlan;
 import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -166,7 +168,7 @@ public class JobSubmissionServlet extends HttpServlet {
                        // create the plan
                        String[] options = params.isEmpty() ? new String[0] : 
(String[]) params.toArray(new String[params.size()]);
                        PackagedProgram program;
-                       OptimizedPlan optPlan;
+                       FlinkPlan optPlan;
                        Client client;
                        
                        try {
@@ -246,14 +248,18 @@ public class JobSubmissionServlet extends HttpServlet {
                                String planName = uid + ".json";
                                File jsonFile = new 
File(this.planDumpDirectory, planName);
                                
-                               PlanJSONDumpGenerator jsonGen = new 
PlanJSONDumpGenerator();
-                               jsonGen.setEncodeForHTML(true);
-                               jsonGen.dumpOptimizerPlanAsJSON(optPlan, 
jsonFile);
-
+                               if (optPlan instanceof StreamingPlan) {
+                                       ((StreamingPlan) 
optPlan).dumpStreamingPlanAsJSON(jsonFile);
+                               } else {
+                                       PlanJSONDumpGenerator jsonGen = new 
PlanJSONDumpGenerator();
+                                       jsonGen.setEncodeForHTML(true);
+                                       
jsonGen.dumpOptimizerPlanAsJSON((OptimizedPlan) optPlan, jsonFile);
+                               }
+                               
                                // submit the job only, if it should not be 
suspended
                                if (!suspend) {
                                        try {
-                                               client.run(program, optPlan, 
false);
+                                               
client.run(program,(OptimizedPlan) optPlan, false);
                                        } catch (Throwable t) {
                                                LOG.error("Error submitting job 
to the job-manager.", t);
                                                showErrorPage(resp, 
t.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
index c6cada8..28b7d8a 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java
@@ -45,7 +45,7 @@ public class ExecutionPlanCreationTest {
                        InetSocketAddress mockJmAddress = new 
InetSocketAddress(mockAddress, 12345);
                        
                        Client client = new Client(mockJmAddress, new 
Configuration(), getClass().getClassLoader());
-                       OptimizedPlan op = client.getOptimizedPlan(prg, -1);
+                       OptimizedPlan op = (OptimizedPlan) 
client.getOptimizedPlan(prg, -1);
                        assertNotNull(op);
                        
                        PlanJSONDumpGenerator dumper = new 
PlanJSONDumpGenerator();

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-compiler/src/main/java/org/apache/flink/compiler/plan/FlinkPlan.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/FlinkPlan.java 
b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/FlinkPlan.java
new file mode 100644
index 0000000..eaa78b1
--- /dev/null
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/FlinkPlan.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.plan;
+
+/**
+ * A common interface for compiled Flink plans for both batch and streaming
+ * processing programs.
+ * 
+ */
+public interface FlinkPlan {
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-compiler/src/main/java/org/apache/flink/compiler/plan/OptimizedPlan.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/OptimizedPlan.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/OptimizedPlan.java
index d555d93..00eb287 100644
--- 
a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/OptimizedPlan.java
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/OptimizedPlan.java
@@ -30,7 +30,7 @@ import org.apache.flink.util.Visitor;
  * It works on this representation during its optimization. Finally, this plan 
is translated to a schedule
  * for the nephele runtime system.
  */
-public class OptimizedPlan implements Visitable<PlanNode> {
+public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode>  {
        
        /**
         * The data sources in the plan.

http://git-wip-us.apache.org/repos/asf/flink/blob/82c42002/flink-compiler/src/main/java/org/apache/flink/compiler/plan/StreamingPlan.java
----------------------------------------------------------------------
diff --git 
a/flink-compiler/src/main/java/org/apache/flink/compiler/plan/StreamingPlan.java
 
b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/StreamingPlan.java
new file mode 100644
index 0000000..b2e2fa6
--- /dev/null
+++ 
b/flink-compiler/src/main/java/org/apache/flink/compiler/plan/StreamingPlan.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.compiler.plan;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+/**
+ * Abstract class representing Flink Streaming plans
+ * 
+ */
+public abstract class StreamingPlan implements FlinkPlan {
+
+       public abstract JobGraph getJobGraph();
+
+       public abstract String getStreamingPlanAsJSON();
+
+       public abstract void dumpStreamingPlanAsJSON(File file) throws 
IOException;
+
+}

Reply via email to