Repository: flink
Updated Branches:
  refs/heads/release-1.2 3c63c9e01 -> 3e41ed1b1


Revert "[FLINK-5808] Move default parallelism to StreamingJobGraphGenerator"

This reverts commit b563f0ae2e7b7233e29e03fbb2cf18b0d853c0ca.

This fix was causing more problems than it was solving.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e41ed1b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e41ed1b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e41ed1b

Branch: refs/heads/release-1.2
Commit: 3e41ed1b1e9ee30f546374206d4759e009ced39b
Parents: fd98e8b
Author: Aljoscha Krettek <[email protected]>
Authored: Mon Apr 3 18:40:14 2017 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Tue Apr 4 13:42:17 2017 +0200

----------------------------------------------------------------------
 .../api/environment/LocalStreamEnvironment.java |  26 +-
 .../environment/RemoteStreamEnvironment.java    |   5 -
 .../environment/StreamContextEnvironment.java   |  13 +-
 .../environment/StreamExecutionEnvironment.java |  65 ++--
 .../api/environment/StreamPlanEnvironment.java  |  15 +-
 .../flink/streaming/api/graph/StreamGraph.java  |   6 +-
 .../api/graph/StreamGraphGenerator.java         |  12 +-
 .../api/graph/StreamingJobGraphGenerator.java   |  14 +-
 .../api/StreamExecutionEnvironmentTest.java     | 289 +++++++++++++++++
 .../StreamExecutionEnvironmentTest.java         | 317 -------------------
 .../graph/StreamingJobGraphGeneratorTest.java   |  20 +-
 .../operators/FoldApplyWindowFunctionTest.java  |   6 +-
 .../api/scala/StreamExecutionEnvironment.scala  |  28 +-
 .../streaming/api/scala/DataStreamTest.scala    |  11 +-
 .../streaming/util/TestStreamEnvironment.java   |   1 -
 .../accumulators/AccumulatorLiveITCase.java     |   4 -
 16 files changed, 400 insertions(+), 432 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index cb60552..f8c9c42 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -45,9 +45,6 @@ import org.slf4j.LoggerFactory;
 @Public
 public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 
-       /** The default parallelism used when creating a local environment */
-       private static int defaultLocalParallelism = 
Runtime.getRuntime().availableProcessors();
-
        private static final Logger LOG = 
LoggerFactory.getLogger(LocalStreamEnvironment.class);
        
        /** The configuration to use for the local cluster */
@@ -57,43 +54,24 @@ public class LocalStreamEnvironment extends 
StreamExecutionEnvironment {
         * Creates a new local stream environment that uses the default 
configuration.
         */
        public LocalStreamEnvironment() {
-               this(defaultLocalParallelism);
+               this(null);
        }
 
        /**
-        * Creates a new local stream environment that uses the default 
configuration.
-        */
-       public LocalStreamEnvironment(int parallelism) {
-               this(null, parallelism);
-       }
-
-
-       /**
         * Creates a new local stream environment that configures its local 
executor with the given configuration.
         *
         * @param config The configuration used to configure the local executor.
         */
        public LocalStreamEnvironment(Configuration config) {
-               this(config, defaultLocalParallelism);
-       }
-
-       /**
-        * Creates a new local stream environment that configures its local 
executor with the given configuration.
-        *
-        * @param config The configuration used to configure the local executor.
-        */
-       public LocalStreamEnvironment(Configuration config, int parallelism) {
-               super(parallelism);
                if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
                        throw new InvalidProgramException(
                                        "The LocalStreamEnvironment cannot be 
used when submitting a program through a client, " +
                                                        "or running in a 
TestEnvironment context.");
                }
-
+               
                this.conf = config == null ? new Configuration() : config;
        }
 
-
        /**
         * Executes the JobGraph of the on a mini cluster of CLusterUtil with a 
user
         * specified name.

http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 5684e28..333f9c0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -37,7 +37,6 @@ import 
org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,10 +129,6 @@ public class RemoteStreamEnvironment extends 
StreamExecutionEnvironment {
         *            The protocol must be supported by the {@link 
java.net.URLClassLoader}.
         */
        public RemoteStreamEnvironment(String host, int port, Configuration 
clientConfiguration, String[] jarFiles, URL[] globalClasspaths) {
-               super(GlobalConfiguration.loadConfiguration().getInteger(
-                               ConfigConstants.DEFAULT_PARALLELISM_KEY,
-                               ConfigConstants.DEFAULT_PARALLELISM));
-               
                if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
                        throw new InvalidProgramException(
                                        "The RemoteEnvironment cannot be used 
when submitting a program through a client, " +

http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 51078f2..49c5347 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -38,13 +38,14 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
        private final ContextEnvironment ctx;
 
        protected StreamContextEnvironment(ContextEnvironment ctx) {
-               // if the batch ContextEnvironment has a parallelism this must 
have come from
-               // the CLI Client. We should set that as our default parallelism
-               super(ctx.getParallelism() > 0 ? ctx.getParallelism() :
-                               
GlobalConfiguration.loadConfiguration().getInteger(
-                                               
ConfigConstants.DEFAULT_PARALLELISM_KEY,
-                                               
ConfigConstants.DEFAULT_PARALLELISM));
                this.ctx = ctx;
+               if (ctx.getParallelism() > 0) {
+                       setParallelism(ctx.getParallelism());
+               } else {
+                       
setParallelism(GlobalConfiguration.loadConfiguration().getInteger(
+                                       ConfigConstants.DEFAULT_PARALLELISM_KEY,
+                                       ConfigConstants.DEFAULT_PARALLELISM));
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 6ac3622..dab0a06 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -111,6 +111,9 @@ public abstract class StreamExecutionEnvironment {
        /** The environment of the context (local by default, cluster if 
invoked through command line) */
        private static StreamExecutionEnvironmentFactory 
contextEnvironmentFactory;
 
+       /** The default parallelism used when creating a local environment */
+       private static int defaultLocalParallelism = 
Runtime.getRuntime().availableProcessors();
+
        // 
------------------------------------------------------------------------
 
        /** The execution configuration for this environment */
@@ -131,23 +134,11 @@ public abstract class StreamExecutionEnvironment {
        /** The time characteristic used by the data streams */
        private TimeCharacteristic timeCharacteristic = 
DEFAULT_TIME_CHARACTERISTIC;
 
-       /** The parallelism to use when no parallelism is set on an operation. 
*/
-       private final int defaultParallelism;
-
 
        // 
--------------------------------------------------------------------------------------------
        // Constructor and Properties
        // 
--------------------------------------------------------------------------------------------
 
-
-       public StreamExecutionEnvironment() {
-               this(ConfigConstants.DEFAULT_PARALLELISM);
-       }
-
-       public StreamExecutionEnvironment(int defaultParallelism) {
-               this.defaultParallelism = defaultParallelism;
-       }
-
        /**
         * Gets the config object.
         */
@@ -1523,7 +1514,7 @@ public abstract class StreamExecutionEnvironment {
                if (transformations.size() <= 0) {
                        throw new IllegalStateException("No operators defined 
in streaming topology. Cannot execute.");
                }
-               return StreamGraphGenerator.generate(this, transformations, 
defaultParallelism);
+               return StreamGraphGenerator.generate(this, transformations);
        }
 
        /**
@@ -1611,7 +1602,7 @@ public abstract class StreamExecutionEnvironment {
         * @return A local execution environment.
         */
        public static LocalStreamEnvironment createLocalEnvironment() {
-               return new LocalStreamEnvironment();
+               return createLocalEnvironment(defaultLocalParallelism);
        }
 
        /**
@@ -1620,12 +1611,14 @@ public abstract class StreamExecutionEnvironment {
         * environment was created in. It will use the parallelism specified in 
the
         * parameter.
         *
-        * @param defaultParallelism The default parallelism for the local 
environment.
-        * 
+        * @param parallelism
+        *              The parallelism for the local environment.
         * @return A local execution environment with the specified parallelism.
         */
-       public static LocalStreamEnvironment createLocalEnvironment(int 
defaultParallelism) {
-               return new LocalStreamEnvironment(defaultParallelism);
+       public static LocalStreamEnvironment createLocalEnvironment(int 
parallelism) {
+               LocalStreamEnvironment env = new LocalStreamEnvironment();
+               env.setParallelism(parallelism);
+               return env;
        }
 
        /**
@@ -1634,13 +1627,16 @@ public abstract class StreamExecutionEnvironment {
         * environment was created in. It will use the parallelism specified in 
the
         * parameter.
         *
-        * @param defaultParallelism The parallelism for the local environment.
-        * @param configuration Pass a custom configuration into the cluster
-        *
+        * @param parallelism
+        *              The parallelism for the local environment.
+        *      @param configuration
+        *              Pass a custom configuration into the cluster
         * @return A local execution environment with the specified parallelism.
         */
-       public static LocalStreamEnvironment createLocalEnvironment(int 
defaultParallelism, Configuration configuration) {
-               return new LocalStreamEnvironment(configuration, 
defaultParallelism);
+       public static LocalStreamEnvironment createLocalEnvironment(int 
parallelism, Configuration configuration) {
+               LocalStreamEnvironment currentEnvironment = new 
LocalStreamEnvironment(configuration);
+               currentEnvironment.setParallelism(parallelism);
+               return currentEnvironment;
        }
 
        /**
@@ -1665,6 +1661,7 @@ public abstract class StreamExecutionEnvironment {
                conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
                LocalStreamEnvironment localEnv = new 
LocalStreamEnvironment(conf);
+               localEnv.setParallelism(defaultLocalParallelism);
 
                return localEnv;
        }
@@ -1750,6 +1747,28 @@ public abstract class StreamExecutionEnvironment {
                return new RemoteStreamEnvironment(host, port, clientConfig, 
jarFiles);
        }
 
+       /**
+        * Gets the default parallelism that will be used for the local 
execution environment created by
+        * {@link #createLocalEnvironment()}.
+        *
+        * @return The default local parallelism
+        */
+       @PublicEvolving
+       public static int getDefaultLocalParallelism() {
+               return defaultLocalParallelism;
+       }
+
+       /**
+        * Sets the default parallelism that will be used for the local 
execution
+        * environment created by {@link #createLocalEnvironment()}.
+        *
+        * @param parallelism The parallelism to use as the default local 
parallelism.
+        */
+       @PublicEvolving
+       public static void setDefaultLocalParallelism(int parallelism) {
+               defaultLocalParallelism = parallelism;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Methods to control the context and local environments for execution 
from packaged programs
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index 9c676c4..b1521f5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -32,11 +32,18 @@ public class StreamPlanEnvironment extends 
StreamExecutionEnvironment {
        private ExecutionEnvironment env;
 
        protected StreamPlanEnvironment(ExecutionEnvironment env) {
-               super(GlobalConfiguration.loadConfiguration().getInteger(
-                               ConfigConstants.DEFAULT_PARALLELISM_KEY,
-                               ConfigConstants.DEFAULT_PARALLELISM));
-
+               super();
                this.env = env;
+
+               int parallelism = env.getParallelism();
+               if (parallelism > 0) {
+                       setParallelism(parallelism);
+               } else {
+                       // determine parallelism
+                       
setParallelism(GlobalConfiguration.loadConfiguration().getInteger(
+                                       ConfigConstants.DEFAULT_PARALLELISM_KEY,
+                                       ConfigConstants.DEFAULT_PARALLELISM));
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 1b4acd0..2f80764 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -91,14 +91,12 @@ public class StreamGraph extends StreamingPlan {
        private AbstractStateBackend stateBackend;
        private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
 
-       private final int defaultParallelism;
 
-       public StreamGraph(StreamExecutionEnvironment environment, int 
defaultParallelism) {
+       public StreamGraph(StreamExecutionEnvironment environment) {
                this.environment = environment;
                this.executionConfig = environment.getConfig();
                this.checkpointConfig = environment.getCheckpointConfig();
 
-               this.defaultParallelism = defaultParallelism;
                // create an empty new stream graph.
                clear();
        }
@@ -598,7 +596,7 @@ public class StreamGraph extends StreamingPlan {
                                                        + "\nThe user can force 
enable state checkpoints with the reduced guarantees by calling: 
env.enableCheckpointing(interval,true)");
                }
 
-               StreamingJobGraphGenerator jobgraphGenerator = new 
StreamingJobGraphGenerator(this, defaultParallelism);
+               StreamingJobGraphGenerator jobgraphGenerator = new 
StreamingJobGraphGenerator(this);
 
                return jobgraphGenerator.createJobGraph();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 333e4f9..ddd0515 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -97,11 +97,12 @@ public class StreamGraphGenerator {
        // we have loops, i.e. feedback edges.
        private Map<StreamTransformation<?>, Collection<Integer>> 
alreadyTransformed;
 
+
        /**
         * Private constructor. The generator should only be invoked using 
{@link #generate}.
         */
-       private StreamGraphGenerator(StreamExecutionEnvironment env, int 
defaultParallelism) {
-               this.streamGraph = new StreamGraph(env, defaultParallelism);
+       private StreamGraphGenerator(StreamExecutionEnvironment env) {
+               this.streamGraph = new StreamGraph(env);
                this.streamGraph.setChaining(env.isChainingEnabled());
                this.streamGraph.setStateBackend(env.getStateBackend());
                this.env = env;
@@ -118,11 +119,8 @@ public class StreamGraphGenerator {
         *
         * @return The generated {@code StreamGraph}
         */
-       public static StreamGraph generate(
-                       StreamExecutionEnvironment env,
-                       List<StreamTransformation<?>> transformations,
-                       int defaultParallelism) {
-               return new StreamGraphGenerator(env, 
defaultParallelism).generateInternal(transformations);
+       public static StreamGraph generate(StreamExecutionEnvironment env, 
List<StreamTransformation<?>> transformations) {
+               return new 
StreamGraphGenerator(env).generateInternal(transformations);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index f87e51e..8877c80 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.graph;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -87,13 +86,10 @@ public class StreamingJobGraphGenerator {
        private final StreamGraphHasher defaultStreamGraphHasher;
        private final List<StreamGraphHasher> legacyStreamGraphHashers;
 
-       private final int defaultParallelism;
-
-       public StreamingJobGraphGenerator(StreamGraph streamGraph, int 
defaultParallelism) {
+       public StreamingJobGraphGenerator(StreamGraph streamGraph) {
                this.streamGraph = streamGraph;
                this.defaultStreamGraphHasher = new StreamGraphHasherV2();
                this.legacyStreamGraphHashers = Arrays.asList(new 
StreamGraphHasherV1(), new StreamGraphUserHashHasher());
-               this.defaultParallelism = defaultParallelism;
        }
 
        private void init() {
@@ -308,12 +304,12 @@ public class StreamingJobGraphGenerator {
 
                int parallelism = streamNode.getParallelism();
 
-               if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
-                       parallelism = defaultParallelism;
+               if (parallelism > 0) {
+                       jobVertex.setParallelism(parallelism);
+               } else {
+                       parallelism = jobVertex.getParallelism();
                }
 
-               jobVertex.setParallelism(parallelism);
-
                jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
 
                if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
new file mode 100644
index 0000000..3fc1344
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.SplittableIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class StreamExecutionEnvironmentTest {
+
+       @Test
+       public void fromElementsWithBaseTypeTest1() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.fromElements(ParentClass.class, new SubClass(1, "Java"), 
new ParentClass(1, "hello"));
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void fromElementsWithBaseTypeTest2() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.fromElements(SubClass.class, new SubClass(1, "Java"), new 
ParentClass(1, "hello"));
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testFromCollectionParallelism() {
+               try {
+                       TypeInformation<Integer> typeInfo = 
BasicTypeInfo.INT_TYPE_INFO;
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+                       DataStreamSource<Integer> dataStream1 = 
env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
+
+                       try {
+                               dataStream1.setParallelism(4);
+                               fail("should throw an exception");
+                       }
+                       catch (IllegalArgumentException e) {
+                               // expected
+                       }
+
+                       dataStream1.addSink(new DiscardingSink<Integer>());
+       
+                       DataStreamSource<Integer> dataStream2 = 
env.fromParallelCollection(new DummySplittableIterator<Integer>(),
+                                       typeInfo).setParallelism(4);
+
+                       dataStream2.addSink(new DiscardingSink<Integer>());
+
+                       env.getExecutionPlan();
+
+                       assertEquals("Parallelism of collection source must be 
1.", 1, 
env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism());
+                       assertEquals("Parallelism of parallel collection source 
must be 4.",
+                                       4, 
+                                       
env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testSources() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void run(SourceContext<Integer> ctx) throws 
Exception {
+                       }
+
+                       @Override
+                       public void cancel() {
+                       }
+               };
+               DataStreamSource<Integer> src1 = env.addSource(srcFun);
+               src1.addSink(new DiscardingSink<Integer>());
+               assertEquals(srcFun, getFunctionFromDataSource(src1));
+
+               List<Long> list = Arrays.asList(0L, 1L, 2L);
+
+               DataStreamSource<Long> src2 = env.generateSequence(0, 2);
+               assertTrue(getFunctionFromDataSource(src2) instanceof 
StatefulSequenceSource);
+
+               DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L);
+               assertTrue(getFunctionFromDataSource(src3) instanceof 
FromElementsFunction);
+
+               DataStreamSource<Long> src4 = env.fromCollection(list);
+               assertTrue(getFunctionFromDataSource(src4) instanceof 
FromElementsFunction);
+       }
+
+       @Test
+       public void testParallelismBounds() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void run(SourceContext<Integer> ctx) throws 
Exception {
+                       }
+
+                       @Override
+                       public void cancel() {
+                       }
+               };
+
+
+               SingleOutputStreamOperator<Object> operator =
+                               env.addSource(srcFun).flatMap(new 
FlatMapFunction<Integer, Object>() {
+
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void flatMap(Integer value, Collector<Object> 
out) throws Exception {
+
+                       }
+               });
+
+               // default value for max parallelism
+               Assert.assertEquals(-1, 
operator.getTransformation().getMaxParallelism());
+
+               // bounds for parallelism 1
+               try {
+                       operator.setParallelism(0);
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds for parallelism 2
+               operator.setParallelism(1);
+               Assert.assertEquals(1, operator.getParallelism());
+
+               // bounds for parallelism 3
+               operator.setParallelism(1 << 15);
+               Assert.assertEquals(1 << 15, operator.getParallelism());
+
+               // default value after generating
+               env.getStreamGraph().getJobGraph();
+               Assert.assertEquals(-1, 
operator.getTransformation().getMaxParallelism());
+
+               // configured value after generating
+               env.setMaxParallelism(42);
+               env.getStreamGraph().getJobGraph();
+               Assert.assertEquals(42, 
operator.getTransformation().getMaxParallelism());
+
+               // bounds configured parallelism 1
+               try {
+                       env.setMaxParallelism(0);
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds configured parallelism 2
+               try {
+                       env.setMaxParallelism(1 + (1 << 15));
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds for max parallelism 1
+               try {
+                       operator.setMaxParallelism(0);
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds for max parallelism 2
+               try {
+                       operator.setMaxParallelism(1 + (1 << 15));
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds for max parallelism 3
+               operator.setMaxParallelism(1);
+               Assert.assertEquals(1, 
operator.getTransformation().getMaxParallelism());
+
+               // bounds for max parallelism 4
+               operator.setMaxParallelism(1 << 15);
+               Assert.assertEquals(1 << 15, 
operator.getTransformation().getMaxParallelism());
+
+               // override config
+               env.getStreamGraph().getJobGraph();
+               Assert.assertEquals(1 << 15 , 
operator.getTransformation().getMaxParallelism());
+       }
+
+       /////////////////////////////////////////////////////////////
+       // Utilities
+       /////////////////////////////////////////////////////////////
+
+
+       private static StreamOperator<?> 
getOperatorFromDataStream(DataStream<?> dataStream) {
+               StreamExecutionEnvironment env = 
dataStream.getExecutionEnvironment();
+               StreamGraph streamGraph = env.getStreamGraph();
+               return 
streamGraph.getStreamNode(dataStream.getId()).getOperator();
+       }
+
+       @SuppressWarnings("unchecked")
+       private static <T> SourceFunction<T> 
getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) {
+               dataStreamSource.addSink(new DiscardingSink<T>());
+               AbstractUdfStreamOperator<?, ?> operator =
+                               (AbstractUdfStreamOperator<?, ?>) 
getOperatorFromDataStream(dataStreamSource);
+               return (SourceFunction<T>) operator.getUserFunction();
+       }
+
+       public static class DummySplittableIterator<T> extends 
SplittableIterator<T> {
+               private static final long serialVersionUID = 
1312752876092210499L;
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public Iterator<T>[] split(int numPartitions) {
+                       return (Iterator<T>[]) new Iterator<?>[0];
+               }
+
+               @Override
+               public int getMaximumNumberOfSplits() {
+                       return 0;
+               }
+
+               @Override
+               public boolean hasNext() {
+                       return false;
+               }
+
+               @Override
+               public T next() {
+                       throw new NoSuchElementException();
+               }
+
+               @Override
+               public void remove() {
+                       throw new UnsupportedOperationException();
+               }
+       }
+
+       public static class ParentClass {
+               int num;
+               String string;
+               public ParentClass(int num, String string) {
+                       this.num = num;
+                       this.string = string;
+               }
+       }
+
+       public static class SubClass extends ParentClass{
+               public SubClass(int num, String string) {
+                       super(num, string);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
deleted file mode 100644
index d29c833..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.environment;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.SplittableIterator;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-public class StreamExecutionEnvironmentTest {
-
-       @Test
-       public void fromElementsWithBaseTypeTest1() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.fromElements(ParentClass.class, new SubClass(1, "Java"), 
new ParentClass(1, "hello"));
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void fromElementsWithBaseTypeTest2() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.fromElements(SubClass.class, new SubClass(1, "Java"), new 
ParentClass(1, "hello"));
-       }
-
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testFromCollectionParallelism() {
-               try {
-                       TypeInformation<Integer> typeInfo = 
BasicTypeInfo.INT_TYPE_INFO;
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-                       DataStreamSource<Integer> dataStream1 = 
env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
-
-                       try {
-                               dataStream1.setParallelism(4);
-                               fail("should throw an exception");
-                       }
-                       catch (IllegalArgumentException e) {
-                               // expected
-                       }
-
-                       dataStream1.addSink(new DiscardingSink<Integer>());
-       
-                       DataStreamSource<Integer> dataStream2 = 
env.fromParallelCollection(new DummySplittableIterator<Integer>(),
-                                       typeInfo).setParallelism(4);
-
-                       dataStream2.addSink(new DiscardingSink<Integer>());
-
-                       env.getExecutionPlan();
-
-                       assertEquals("Parallelism of collection source must be 
1.", 1, 
env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism());
-                       assertEquals("Parallelism of parallel collection source 
must be 4.",
-                                       4, 
-                                       
env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testSources() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
-                       private static final long serialVersionUID = 1L;
-
-                       @Override
-                       public void run(SourceContext<Integer> ctx) throws 
Exception {
-                       }
-
-                       @Override
-                       public void cancel() {
-                       }
-               };
-               DataStreamSource<Integer> src1 = env.addSource(srcFun);
-               src1.addSink(new DiscardingSink<Integer>());
-               assertEquals(srcFun, getFunctionFromDataSource(src1));
-
-               List<Long> list = Arrays.asList(0L, 1L, 2L);
-
-               DataStreamSource<Long> src2 = env.generateSequence(0, 2);
-               assertTrue(getFunctionFromDataSource(src2) instanceof 
StatefulSequenceSource);
-
-               DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L);
-               assertTrue(getFunctionFromDataSource(src3) instanceof 
FromElementsFunction);
-
-               DataStreamSource<Long> src4 = env.fromCollection(list);
-               assertTrue(getFunctionFromDataSource(src4) instanceof 
FromElementsFunction);
-       }
-
-       @Test
-       public void testDefaultParallelismIsDefault() {
-               assertEquals(
-                               ExecutionConfig.PARALLELISM_DEFAULT,
-                               
StreamExecutionEnvironment.createLocalEnvironment().getParallelism());
-
-               assertEquals(
-                               ExecutionConfig.PARALLELISM_DEFAULT,
-                               
StreamExecutionEnvironment.createRemoteEnvironment("dummy", 
1234).getParallelism());
-
-               StreamExecutionEnvironment contextEnv = new 
StreamContextEnvironment(
-                               new ContextEnvironment(
-                                               mock(ClusterClient.class),
-                                               Collections.<URL>emptyList(),
-                                               Collections.<URL>emptyList(),
-                                               
this.getClass().getClassLoader(),
-                                               null));
-
-               assertEquals(
-                               ExecutionConfig.PARALLELISM_DEFAULT,
-                               contextEnv.getParallelism());
-       }
-
-       @Test
-       public void testParallelismBounds() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
-                       private static final long serialVersionUID = 1L;
-
-                       @Override
-                       public void run(SourceContext<Integer> ctx) throws 
Exception {
-                       }
-
-                       @Override
-                       public void cancel() {
-                       }
-               };
-
-
-               SingleOutputStreamOperator<Object> operator =
-                               env.addSource(srcFun).flatMap(new 
FlatMapFunction<Integer, Object>() {
-
-                       private static final long serialVersionUID = 1L;
-
-                       @Override
-                       public void flatMap(Integer value, Collector<Object> 
out) throws Exception {
-
-                       }
-               });
-
-               // default value for max parallelism
-               Assert.assertEquals(-1, 
operator.getTransformation().getMaxParallelism());
-
-               // bounds for parallelism 1
-               try {
-                       operator.setParallelism(0);
-                       Assert.fail();
-               } catch (IllegalArgumentException expected) {
-               }
-
-               // bounds for parallelism 2
-               operator.setParallelism(1);
-               Assert.assertEquals(1, operator.getParallelism());
-
-               // bounds for parallelism 3
-               operator.setParallelism(1 << 15);
-               Assert.assertEquals(1 << 15, operator.getParallelism());
-
-               // default value after generating
-               env.getStreamGraph().getJobGraph();
-               Assert.assertEquals(-1, 
operator.getTransformation().getMaxParallelism());
-
-               // configured value after generating
-               env.setMaxParallelism(42);
-               env.getStreamGraph().getJobGraph();
-               Assert.assertEquals(42, 
operator.getTransformation().getMaxParallelism());
-
-               // bounds configured parallelism 1
-               try {
-                       env.setMaxParallelism(0);
-                       Assert.fail();
-               } catch (IllegalArgumentException expected) {
-               }
-
-               // bounds configured parallelism 2
-               try {
-                       env.setMaxParallelism(1 + (1 << 15));
-                       Assert.fail();
-               } catch (IllegalArgumentException expected) {
-               }
-
-               // bounds for max parallelism 1
-               try {
-                       operator.setMaxParallelism(0);
-                       Assert.fail();
-               } catch (IllegalArgumentException expected) {
-               }
-
-               // bounds for max parallelism 2
-               try {
-                       operator.setMaxParallelism(1 + (1 << 15));
-                       Assert.fail();
-               } catch (IllegalArgumentException expected) {
-               }
-
-               // bounds for max parallelism 3
-               operator.setMaxParallelism(1);
-               Assert.assertEquals(1, 
operator.getTransformation().getMaxParallelism());
-
-               // bounds for max parallelism 4
-               operator.setMaxParallelism(1 << 15);
-               Assert.assertEquals(1 << 15, 
operator.getTransformation().getMaxParallelism());
-
-               // override config
-               env.getStreamGraph().getJobGraph();
-               Assert.assertEquals(1 << 15 , 
operator.getTransformation().getMaxParallelism());
-       }
-
-       /////////////////////////////////////////////////////////////
-       // Utilities
-       /////////////////////////////////////////////////////////////
-
-
-       private static StreamOperator<?> 
getOperatorFromDataStream(DataStream<?> dataStream) {
-               StreamExecutionEnvironment env = 
dataStream.getExecutionEnvironment();
-               StreamGraph streamGraph = env.getStreamGraph();
-               return 
streamGraph.getStreamNode(dataStream.getId()).getOperator();
-       }
-
-       @SuppressWarnings("unchecked")
-       private static <T> SourceFunction<T> 
getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) {
-               dataStreamSource.addSink(new DiscardingSink<T>());
-               AbstractUdfStreamOperator<?, ?> operator =
-                               (AbstractUdfStreamOperator<?, ?>) 
getOperatorFromDataStream(dataStreamSource);
-               return (SourceFunction<T>) operator.getUserFunction();
-       }
-
-       public static class DummySplittableIterator<T> extends 
SplittableIterator<T> {
-               private static final long serialVersionUID = 
1312752876092210499L;
-
-               @SuppressWarnings("unchecked")
-               @Override
-               public Iterator<T>[] split(int numPartitions) {
-                       return (Iterator<T>[]) new Iterator<?>[0];
-               }
-
-               @Override
-               public int getMaximumNumberOfSplits() {
-                       return 0;
-               }
-
-               @Override
-               public boolean hasNext() {
-                       return false;
-               }
-
-               @Override
-               public T next() {
-                       throw new NoSuchElementException();
-               }
-
-               @Override
-               public void remove() {
-                       throw new UnsupportedOperationException();
-               }
-       }
-
-       public static class ParentClass {
-               int num;
-               String string;
-               public ParentClass(int num, String string) {
-                       this.num = num;
-                       this.string = string;
-               }
-       }
-
-       public static class SubClass extends ParentClass{
-               public SubClass(int num, String string) {
-                       super(num, string);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 5b4dbf5..4d462d0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -42,7 +42,7 @@ import static org.junit.Assert.assertTrue;
 
 @SuppressWarnings("serial")
 public class StreamingJobGraphGeneratorTest extends TestLogger {
-
+       
        @Test
        public void testExecutionConfigSerialization() throws IOException, 
ClassNotFoundException {
                final long seed = System.currentTimeMillis();
@@ -50,12 +50,12 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-               StreamGraph streamingJob = new StreamGraph(env, 1 /* default 
parallelism */);
-               StreamingJobGraphGenerator compiler = new 
StreamingJobGraphGenerator(streamingJob, 1 /* default parallelism */);
-
+               StreamGraph streamingJob = new StreamGraph(env);
+               StreamingJobGraphGenerator compiler = new 
StreamingJobGraphGenerator(streamingJob);
+               
                boolean closureCleanerEnabled = r.nextBoolean(), 
forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), 
objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean();
                int dop = 1 + r.nextInt(10);
-
+               
                ExecutionConfig config = streamingJob.getExecutionConfig();
                if(closureCleanerEnabled) {
                        config.enableClosureCleaner();
@@ -83,7 +83,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                        config.disableSysoutLogging();
                }
                config.setParallelism(dop);
-
+               
                JobGraph jobGraph = compiler.createJobGraph();
 
                final String EXEC_CONFIG_KEY = "runtime.config";
@@ -108,7 +108,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                assertEquals(sysoutLoggingEnabled, 
executionConfig.isSysoutLoggingEnabled());
                assertEquals(dop, executionConfig.getParallelism());
        }
-
+       
        @Test
        public void testParallelismOneNotChained() {
 
@@ -164,10 +164,10 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
        @Test
        public void testDisabledCheckpointing() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               StreamGraph streamGraph = new StreamGraph(env, 1 /* default 
parallelism */);
+               StreamGraph streamGraph = new StreamGraph(env);
                assertFalse("Checkpointing enabled", 
streamGraph.getCheckpointConfig().isCheckpointingEnabled());
 
-               StreamingJobGraphGenerator jobGraphGenerator = new 
StreamingJobGraphGenerator(streamGraph, 1 /* default parallelism */);
+               StreamingJobGraphGenerator jobGraphGenerator = new 
StreamingJobGraphGenerator(streamGraph);
                JobGraph jobGraph = jobGraphGenerator.createJobGraph();
 
                JobSnapshottingSettings snapshottingSettings = 
jobGraph.getSnapshotSettings();
@@ -189,7 +189,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                                }
                        })
                        .print();
-               JobGraph jobGraph = new 
StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism 
*/).createJobGraph();
+               JobGraph jobGraph = new 
StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
 
                JobVertex sourceVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
                JobVertex mapPrintVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
index af413ad..91ec427 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
@@ -115,7 +115,7 @@ public class FoldApplyWindowFunctionTest {
 
                transformations.add(new OneInputTransformation<>(source, 
"test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
 
-               StreamGraph streamGraph = StreamGraphGenerator.generate(env, 
transformations, 1 /* default parallelism */);
+               StreamGraph streamGraph = StreamGraphGenerator.generate(env, 
transformations);
 
                List<Integer> result = new ArrayList<>();
                List<Integer> input = new ArrayList<>();
@@ -138,10 +138,6 @@ public class FoldApplyWindowFunctionTest {
 
        public static class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment {
 
-               public DummyStreamExecutionEnvironment() {
-                       super(1);
-               }
-
                @Override
                public JobExecutionResult execute(String jobName) throws 
Exception {
                        return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 60798e0..22f1264 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -673,6 +673,23 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
 object StreamExecutionEnvironment {
 
+  /**
+   * Sets the default parallelism that will be used for the local execution
+   * environment created by [[createLocalEnvironment()]].
+   *
+   * @param parallelism The default parallelism to use for local execution.
+   */
+  @PublicEvolving
+  def setDefaultLocalParallelism(parallelism: Int) : Unit =
+    JavaEnv.setDefaultLocalParallelism(parallelism)
+
+  /**
+   * Gets the default parallelism that will be used for the local execution 
environment created by
+   * [[createLocalEnvironment()]].
+   */
+  @PublicEvolving
+  def getDefaultLocalParallelism: Int = JavaEnv.getDefaultLocalParallelism
+  
   // --------------------------------------------------------------------------
   //  context environment
   // --------------------------------------------------------------------------
@@ -694,14 +711,13 @@ object StreamExecutionEnvironment {
   /**
    * Creates a local execution environment. The local execution environment 
will run the
    * program in a multi-threaded fashion in the same JVM as the environment 
was created in.
+   *
+   * This method sets the environment's default parallelism to given 
parameter, which
+   * defaults to the value set via [[setDefaultLocalParallelism(Int)]].
    */
-  def createLocalEnvironment(parallelism: Int = -1):
+  def createLocalEnvironment(parallelism: Int = 
JavaEnv.getDefaultLocalParallelism):
       StreamExecutionEnvironment = {
-    if (parallelism == -1) {
-      new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment())
-    } else {
-      new 
StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
-    }
+    new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index b498edc..adb59f2 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -255,10 +255,9 @@ class DataStreamTest extends 
StreamingMultipleProgramsTestBase {
     val sink = map.addSink(x => {})
 
     assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
-    // default parallelism is only actualized when transforming to JobGraph
-    assert(-1 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
+    assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
     assert(1 == 
env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
-    assert(-1 == 
env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
+    assert(10 == 
env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
 
     try {
       src.setParallelism(3)
@@ -273,11 +272,9 @@ class DataStreamTest extends 
StreamingMultipleProgramsTestBase {
     // the parallelism does not change since some windowing code takes the 
parallelism from
     // input operations and that cannot change dynamically
     assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
-    // setting a parallelism on the env/in the ExecutionConfig means that 
operators
-    // pick it up when being instantiated
-    assert(7 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
+    assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
     assert(1 == 
env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
-    assert(7 == 
env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
+    assert(10 == 
env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
 
     val parallelSource = env.generateSequence(0, 0)
     parallelSource.print()

http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 90d8790..64c68dc 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -36,7 +36,6 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
        
 
        public TestStreamEnvironment(LocalFlinkMiniCluster executor, int 
parallelism) {
-               super(parallelism);
                this.executor = Preconditions.checkNotNull(executor);
                setParallelism(parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3e41ed1b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 883f4b4..c56fa91 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -381,10 +381,6 @@ public class AccumulatorLiveITCase {
         */
        private static class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment {
 
-               public DummyStreamExecutionEnvironment() {
-                       super(1 /* default parallelism */);
-               }
-
                @Override
                public JobExecutionResult execute() throws Exception {
                        return execute("default");

Reply via email to