Repository: flink
Updated Branches:
  refs/heads/release-1.2 edd1065c7 -> 99fb80be7


[FLINK-5808] Move default parallelism to StreamingJobGraphGenerator

Before, it was set on the ExecutionConfig for some stream execution
environments and later for others. Now, we don't set the default
parallelism on the ExecutionConfig but instead set it at the latest
possible point, in the StreamingJobGraphGenerator.

This also adds tests that verify that we don't set the default
parallelism on the ExecutionConfig.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b563f0ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b563f0ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b563f0ae

Branch: refs/heads/release-1.2
Commit: b563f0ae2e7b7233e29e03fbb2cf18b0d853c0ca
Parents: edd1065
Author: Aljoscha Krettek <[email protected]>
Authored: Fri Mar 10 13:30:21 2017 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Sat Mar 18 07:43:41 2017 +0100

----------------------------------------------------------------------
 .../api/environment/LocalStreamEnvironment.java |  26 +-
 .../environment/RemoteStreamEnvironment.java    |   5 +
 .../environment/StreamContextEnvironment.java   |  13 +-
 .../environment/StreamExecutionEnvironment.java |  65 ++--
 .../api/environment/StreamPlanEnvironment.java  |  15 +-
 .../flink/streaming/api/graph/StreamGraph.java  |   6 +-
 .../api/graph/StreamGraphGenerator.java         |  12 +-
 .../api/graph/StreamingJobGraphGenerator.java   |  14 +-
 .../api/StreamExecutionEnvironmentTest.java     | 289 -----------------
 .../StreamExecutionEnvironmentTest.java         | 317 +++++++++++++++++++
 .../graph/StreamingJobGraphGeneratorTest.java   |  20 +-
 .../operators/FoldApplyWindowFunctionTest.java  |   6 +-
 .../api/scala/StreamExecutionEnvironment.scala  |  28 +-
 .../streaming/api/scala/DataStreamTest.scala    |  11 +-
 .../streaming/util/TestStreamEnvironment.java   |   1 +
 .../accumulators/AccumulatorLiveITCase.java     |   4 +
 16 files changed, 432 insertions(+), 400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index f8c9c42..cb60552 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -45,6 +45,9 @@ import org.slf4j.LoggerFactory;
 @Public
 public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 
+       /** The default parallelism used when creating a local environment */
+       private static int defaultLocalParallelism = 
Runtime.getRuntime().availableProcessors();
+
        private static final Logger LOG = 
LoggerFactory.getLogger(LocalStreamEnvironment.class);
        
        /** The configuration to use for the local cluster */
@@ -54,24 +57,43 @@ public class LocalStreamEnvironment extends 
StreamExecutionEnvironment {
         * Creates a new local stream environment that uses the default 
configuration.
         */
        public LocalStreamEnvironment() {
-               this(null);
+               this(defaultLocalParallelism);
        }
 
        /**
+        * Creates a new local stream environment that uses the default 
configuration.
+        */
+       public LocalStreamEnvironment(int parallelism) {
+               this(null, parallelism);
+       }
+
+
+       /**
         * Creates a new local stream environment that configures its local 
executor with the given configuration.
         *
         * @param config The configuration used to configure the local executor.
         */
        public LocalStreamEnvironment(Configuration config) {
+               this(config, defaultLocalParallelism);
+       }
+
+       /**
+        * Creates a new local stream environment that configures its local 
executor with the given configuration.
+        *
+        * @param config The configuration used to configure the local executor.
+        */
+       public LocalStreamEnvironment(Configuration config, int parallelism) {
+               super(parallelism);
                if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
                        throw new InvalidProgramException(
                                        "The LocalStreamEnvironment cannot be 
used when submitting a program through a client, " +
                                                        "or running in a 
TestEnvironment context.");
                }
-               
+
                this.conf = config == null ? new Configuration() : config;
        }
 
+
        /**
         * Executes the JobGraph of the on a mini cluster of CLusterUtil with a 
user
         * specified name.

http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 333f9c0..5684e28 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -129,6 +130,10 @@ public class RemoteStreamEnvironment extends 
StreamExecutionEnvironment {
         *            The protocol must be supported by the {@link 
java.net.URLClassLoader}.
         */
        public RemoteStreamEnvironment(String host, int port, Configuration 
clientConfiguration, String[] jarFiles, URL[] globalClasspaths) {
+               super(GlobalConfiguration.loadConfiguration().getInteger(
+                               ConfigConstants.DEFAULT_PARALLELISM_KEY,
+                               ConfigConstants.DEFAULT_PARALLELISM));
+               
                if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
                        throw new InvalidProgramException(
                                        "The RemoteEnvironment cannot be used 
when submitting a program through a client, " +

http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 49c5347..51078f2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -38,14 +38,13 @@ public class StreamContextEnvironment extends 
StreamExecutionEnvironment {
        private final ContextEnvironment ctx;
 
        protected StreamContextEnvironment(ContextEnvironment ctx) {
+               // if the batch ContextEnvironment has a parallelism this must 
have come from
+               // the CLI Client. We should set that as our default parallelism
+               super(ctx.getParallelism() > 0 ? ctx.getParallelism() :
+                               
GlobalConfiguration.loadConfiguration().getInteger(
+                                               
ConfigConstants.DEFAULT_PARALLELISM_KEY,
+                                               
ConfigConstants.DEFAULT_PARALLELISM));
                this.ctx = ctx;
-               if (ctx.getParallelism() > 0) {
-                       setParallelism(ctx.getParallelism());
-               } else {
-                       
setParallelism(GlobalConfiguration.loadConfiguration().getInteger(
-                                       ConfigConstants.DEFAULT_PARALLELISM_KEY,
-                                       ConfigConstants.DEFAULT_PARALLELISM));
-               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index dab0a06..6ac3622 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -111,9 +111,6 @@ public abstract class StreamExecutionEnvironment {
        /** The environment of the context (local by default, cluster if 
invoked through command line) */
        private static StreamExecutionEnvironmentFactory 
contextEnvironmentFactory;
 
-       /** The default parallelism used when creating a local environment */
-       private static int defaultLocalParallelism = 
Runtime.getRuntime().availableProcessors();
-
        // 
------------------------------------------------------------------------
 
        /** The execution configuration for this environment */
@@ -134,11 +131,23 @@ public abstract class StreamExecutionEnvironment {
        /** The time characteristic used by the data streams */
        private TimeCharacteristic timeCharacteristic = 
DEFAULT_TIME_CHARACTERISTIC;
 
+       /** The parallelism to use when no parallelism is set on an operation. 
*/
+       private final int defaultParallelism;
+
 
        // 
--------------------------------------------------------------------------------------------
        // Constructor and Properties
        // 
--------------------------------------------------------------------------------------------
 
+
+       public StreamExecutionEnvironment() {
+               this(ConfigConstants.DEFAULT_PARALLELISM);
+       }
+
+       public StreamExecutionEnvironment(int defaultParallelism) {
+               this.defaultParallelism = defaultParallelism;
+       }
+
        /**
         * Gets the config object.
         */
@@ -1514,7 +1523,7 @@ public abstract class StreamExecutionEnvironment {
                if (transformations.size() <= 0) {
                        throw new IllegalStateException("No operators defined 
in streaming topology. Cannot execute.");
                }
-               return StreamGraphGenerator.generate(this, transformations);
+               return StreamGraphGenerator.generate(this, transformations, 
defaultParallelism);
        }
 
        /**
@@ -1602,7 +1611,7 @@ public abstract class StreamExecutionEnvironment {
         * @return A local execution environment.
         */
        public static LocalStreamEnvironment createLocalEnvironment() {
-               return createLocalEnvironment(defaultLocalParallelism);
+               return new LocalStreamEnvironment();
        }
 
        /**
@@ -1611,14 +1620,12 @@ public abstract class StreamExecutionEnvironment {
         * environment was created in. It will use the parallelism specified in 
the
         * parameter.
         *
-        * @param parallelism
-        *              The parallelism for the local environment.
+        * @param defaultParallelism The default parallelism for the local 
environment.
+        * 
         * @return A local execution environment with the specified parallelism.
         */
-       public static LocalStreamEnvironment createLocalEnvironment(int 
parallelism) {
-               LocalStreamEnvironment env = new LocalStreamEnvironment();
-               env.setParallelism(parallelism);
-               return env;
+       public static LocalStreamEnvironment createLocalEnvironment(int 
defaultParallelism) {
+               return new LocalStreamEnvironment(defaultParallelism);
        }
 
        /**
@@ -1627,16 +1634,13 @@ public abstract class StreamExecutionEnvironment {
         * environment was created in. It will use the parallelism specified in 
the
         * parameter.
         *
-        * @param parallelism
-        *              The parallelism for the local environment.
-        *      @param configuration
-        *              Pass a custom configuration into the cluster
+        * @param defaultParallelism The parallelism for the local environment.
+        * @param configuration Pass a custom configuration into the cluster
+        *
         * @return A local execution environment with the specified parallelism.
         */
-       public static LocalStreamEnvironment createLocalEnvironment(int 
parallelism, Configuration configuration) {
-               LocalStreamEnvironment currentEnvironment = new 
LocalStreamEnvironment(configuration);
-               currentEnvironment.setParallelism(parallelism);
-               return currentEnvironment;
+       public static LocalStreamEnvironment createLocalEnvironment(int 
defaultParallelism, Configuration configuration) {
+               return new LocalStreamEnvironment(configuration, 
defaultParallelism);
        }
 
        /**
@@ -1661,7 +1665,6 @@ public abstract class StreamExecutionEnvironment {
                conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
                LocalStreamEnvironment localEnv = new 
LocalStreamEnvironment(conf);
-               localEnv.setParallelism(defaultLocalParallelism);
 
                return localEnv;
        }
@@ -1747,28 +1750,6 @@ public abstract class StreamExecutionEnvironment {
                return new RemoteStreamEnvironment(host, port, clientConfig, 
jarFiles);
        }
 
-       /**
-        * Gets the default parallelism that will be used for the local 
execution environment created by
-        * {@link #createLocalEnvironment()}.
-        *
-        * @return The default local parallelism
-        */
-       @PublicEvolving
-       public static int getDefaultLocalParallelism() {
-               return defaultLocalParallelism;
-       }
-
-       /**
-        * Sets the default parallelism that will be used for the local 
execution
-        * environment created by {@link #createLocalEnvironment()}.
-        *
-        * @param parallelism The parallelism to use as the default local 
parallelism.
-        */
-       @PublicEvolving
-       public static void setDefaultLocalParallelism(int parallelism) {
-               defaultLocalParallelism = parallelism;
-       }
-
        // 
--------------------------------------------------------------------------------------------
        //  Methods to control the context and local environments for execution 
from packaged programs
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index b1521f5..9c676c4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -32,18 +32,11 @@ public class StreamPlanEnvironment extends 
StreamExecutionEnvironment {
        private ExecutionEnvironment env;
 
        protected StreamPlanEnvironment(ExecutionEnvironment env) {
-               super();
-               this.env = env;
+               super(GlobalConfiguration.loadConfiguration().getInteger(
+                               ConfigConstants.DEFAULT_PARALLELISM_KEY,
+                               ConfigConstants.DEFAULT_PARALLELISM));
 
-               int parallelism = env.getParallelism();
-               if (parallelism > 0) {
-                       setParallelism(parallelism);
-               } else {
-                       // determine parallelism
-                       
setParallelism(GlobalConfiguration.loadConfiguration().getInteger(
-                                       ConfigConstants.DEFAULT_PARALLELISM_KEY,
-                                       ConfigConstants.DEFAULT_PARALLELISM));
-               }
+               this.env = env;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 2f80764..1b4acd0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -91,12 +91,14 @@ public class StreamGraph extends StreamingPlan {
        private AbstractStateBackend stateBackend;
        private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
 
+       private final int defaultParallelism;
 
-       public StreamGraph(StreamExecutionEnvironment environment) {
+       public StreamGraph(StreamExecutionEnvironment environment, int 
defaultParallelism) {
                this.environment = environment;
                this.executionConfig = environment.getConfig();
                this.checkpointConfig = environment.getCheckpointConfig();
 
+               this.defaultParallelism = defaultParallelism;
                // create an empty new stream graph.
                clear();
        }
@@ -596,7 +598,7 @@ public class StreamGraph extends StreamingPlan {
                                                        + "\nThe user can force 
enable state checkpoints with the reduced guarantees by calling: 
env.enableCheckpointing(interval,true)");
                }
 
-               StreamingJobGraphGenerator jobgraphGenerator = new 
StreamingJobGraphGenerator(this);
+               StreamingJobGraphGenerator jobgraphGenerator = new 
StreamingJobGraphGenerator(this, defaultParallelism);
 
                return jobgraphGenerator.createJobGraph();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index ddd0515..333e4f9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -97,12 +97,11 @@ public class StreamGraphGenerator {
        // we have loops, i.e. feedback edges.
        private Map<StreamTransformation<?>, Collection<Integer>> 
alreadyTransformed;
 
-
        /**
         * Private constructor. The generator should only be invoked using 
{@link #generate}.
         */
-       private StreamGraphGenerator(StreamExecutionEnvironment env) {
-               this.streamGraph = new StreamGraph(env);
+       private StreamGraphGenerator(StreamExecutionEnvironment env, int 
defaultParallelism) {
+               this.streamGraph = new StreamGraph(env, defaultParallelism);
                this.streamGraph.setChaining(env.isChainingEnabled());
                this.streamGraph.setStateBackend(env.getStateBackend());
                this.env = env;
@@ -119,8 +118,11 @@ public class StreamGraphGenerator {
         *
         * @return The generated {@code StreamGraph}
         */
-       public static StreamGraph generate(StreamExecutionEnvironment env, 
List<StreamTransformation<?>> transformations) {
-               return new 
StreamGraphGenerator(env).generateInternal(transformations);
+       public static StreamGraph generate(
+                       StreamExecutionEnvironment env,
+                       List<StreamTransformation<?>> transformations,
+                       int defaultParallelism) {
+               return new StreamGraphGenerator(env, 
defaultParallelism).generateInternal(transformations);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 8877c80..f87e51e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.graph;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -86,10 +87,13 @@ public class StreamingJobGraphGenerator {
        private final StreamGraphHasher defaultStreamGraphHasher;
        private final List<StreamGraphHasher> legacyStreamGraphHashers;
 
-       public StreamingJobGraphGenerator(StreamGraph streamGraph) {
+       private final int defaultParallelism;
+
+       public StreamingJobGraphGenerator(StreamGraph streamGraph, int 
defaultParallelism) {
                this.streamGraph = streamGraph;
                this.defaultStreamGraphHasher = new StreamGraphHasherV2();
                this.legacyStreamGraphHashers = Arrays.asList(new 
StreamGraphHasherV1(), new StreamGraphUserHashHasher());
+               this.defaultParallelism = defaultParallelism;
        }
 
        private void init() {
@@ -304,12 +308,12 @@ public class StreamingJobGraphGenerator {
 
                int parallelism = streamNode.getParallelism();
 
-               if (parallelism > 0) {
-                       jobVertex.setParallelism(parallelism);
-               } else {
-                       parallelism = jobVertex.getParallelism();
+               if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
+                       parallelism = defaultParallelism;
                }
 
+               jobVertex.setParallelism(parallelism);
+
                jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
 
                if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
deleted file mode 100644
index 3fc1344..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
-import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
-import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.SplittableIterator;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class StreamExecutionEnvironmentTest {
-
-       @Test
-       public void fromElementsWithBaseTypeTest1() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.fromElements(ParentClass.class, new SubClass(1, "Java"), 
new ParentClass(1, "hello"));
-       }
-
-       @Test(expected = IllegalArgumentException.class)
-       public void fromElementsWithBaseTypeTest2() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.fromElements(SubClass.class, new SubClass(1, "Java"), new 
ParentClass(1, "hello"));
-       }
-
-       @Test
-       @SuppressWarnings("unchecked")
-       public void testFromCollectionParallelism() {
-               try {
-                       TypeInformation<Integer> typeInfo = 
BasicTypeInfo.INT_TYPE_INFO;
-                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-                       DataStreamSource<Integer> dataStream1 = 
env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
-
-                       try {
-                               dataStream1.setParallelism(4);
-                               fail("should throw an exception");
-                       }
-                       catch (IllegalArgumentException e) {
-                               // expected
-                       }
-
-                       dataStream1.addSink(new DiscardingSink<Integer>());
-       
-                       DataStreamSource<Integer> dataStream2 = 
env.fromParallelCollection(new DummySplittableIterator<Integer>(),
-                                       typeInfo).setParallelism(4);
-
-                       dataStream2.addSink(new DiscardingSink<Integer>());
-
-                       env.getExecutionPlan();
-
-                       assertEquals("Parallelism of collection source must be 
1.", 1, 
env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism());
-                       assertEquals("Parallelism of parallel collection source 
must be 4.",
-                                       4, 
-                                       
env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism());
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testSources() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
-                       private static final long serialVersionUID = 1L;
-
-                       @Override
-                       public void run(SourceContext<Integer> ctx) throws 
Exception {
-                       }
-
-                       @Override
-                       public void cancel() {
-                       }
-               };
-               DataStreamSource<Integer> src1 = env.addSource(srcFun);
-               src1.addSink(new DiscardingSink<Integer>());
-               assertEquals(srcFun, getFunctionFromDataSource(src1));
-
-               List<Long> list = Arrays.asList(0L, 1L, 2L);
-
-               DataStreamSource<Long> src2 = env.generateSequence(0, 2);
-               assertTrue(getFunctionFromDataSource(src2) instanceof 
StatefulSequenceSource);
-
-               DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L);
-               assertTrue(getFunctionFromDataSource(src3) instanceof 
FromElementsFunction);
-
-               DataStreamSource<Long> src4 = env.fromCollection(list);
-               assertTrue(getFunctionFromDataSource(src4) instanceof 
FromElementsFunction);
-       }
-
-       @Test
-       public void testParallelismBounds() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-               SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
-                       private static final long serialVersionUID = 1L;
-
-                       @Override
-                       public void run(SourceContext<Integer> ctx) throws 
Exception {
-                       }
-
-                       @Override
-                       public void cancel() {
-                       }
-               };
-
-
-               SingleOutputStreamOperator<Object> operator =
-                               env.addSource(srcFun).flatMap(new 
FlatMapFunction<Integer, Object>() {
-
-                       private static final long serialVersionUID = 1L;
-
-                       @Override
-                       public void flatMap(Integer value, Collector<Object> 
out) throws Exception {
-
-                       }
-               });
-
-               // default value for max parallelism
-               Assert.assertEquals(-1, 
operator.getTransformation().getMaxParallelism());
-
-               // bounds for parallelism 1
-               try {
-                       operator.setParallelism(0);
-                       Assert.fail();
-               } catch (IllegalArgumentException expected) {
-               }
-
-               // bounds for parallelism 2
-               operator.setParallelism(1);
-               Assert.assertEquals(1, operator.getParallelism());
-
-               // bounds for parallelism 3
-               operator.setParallelism(1 << 15);
-               Assert.assertEquals(1 << 15, operator.getParallelism());
-
-               // default value after generating
-               env.getStreamGraph().getJobGraph();
-               Assert.assertEquals(-1, 
operator.getTransformation().getMaxParallelism());
-
-               // configured value after generating
-               env.setMaxParallelism(42);
-               env.getStreamGraph().getJobGraph();
-               Assert.assertEquals(42, 
operator.getTransformation().getMaxParallelism());
-
-               // bounds configured parallelism 1
-               try {
-                       env.setMaxParallelism(0);
-                       Assert.fail();
-               } catch (IllegalArgumentException expected) {
-               }
-
-               // bounds configured parallelism 2
-               try {
-                       env.setMaxParallelism(1 + (1 << 15));
-                       Assert.fail();
-               } catch (IllegalArgumentException expected) {
-               }
-
-               // bounds for max parallelism 1
-               try {
-                       operator.setMaxParallelism(0);
-                       Assert.fail();
-               } catch (IllegalArgumentException expected) {
-               }
-
-               // bounds for max parallelism 2
-               try {
-                       operator.setMaxParallelism(1 + (1 << 15));
-                       Assert.fail();
-               } catch (IllegalArgumentException expected) {
-               }
-
-               // bounds for max parallelism 3
-               operator.setMaxParallelism(1);
-               Assert.assertEquals(1, 
operator.getTransformation().getMaxParallelism());
-
-               // bounds for max parallelism 4
-               operator.setMaxParallelism(1 << 15);
-               Assert.assertEquals(1 << 15, 
operator.getTransformation().getMaxParallelism());
-
-               // override config
-               env.getStreamGraph().getJobGraph();
-               Assert.assertEquals(1 << 15 , 
operator.getTransformation().getMaxParallelism());
-       }
-
-       /////////////////////////////////////////////////////////////
-       // Utilities
-       /////////////////////////////////////////////////////////////
-
-
-       private static StreamOperator<?> 
getOperatorFromDataStream(DataStream<?> dataStream) {
-               StreamExecutionEnvironment env = 
dataStream.getExecutionEnvironment();
-               StreamGraph streamGraph = env.getStreamGraph();
-               return 
streamGraph.getStreamNode(dataStream.getId()).getOperator();
-       }
-
-       @SuppressWarnings("unchecked")
-       private static <T> SourceFunction<T> 
getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) {
-               dataStreamSource.addSink(new DiscardingSink<T>());
-               AbstractUdfStreamOperator<?, ?> operator =
-                               (AbstractUdfStreamOperator<?, ?>) 
getOperatorFromDataStream(dataStreamSource);
-               return (SourceFunction<T>) operator.getUserFunction();
-       }
-
-       public static class DummySplittableIterator<T> extends 
SplittableIterator<T> {
-               private static final long serialVersionUID = 
1312752876092210499L;
-
-               @SuppressWarnings("unchecked")
-               @Override
-               public Iterator<T>[] split(int numPartitions) {
-                       return (Iterator<T>[]) new Iterator<?>[0];
-               }
-
-               @Override
-               public int getMaximumNumberOfSplits() {
-                       return 0;
-               }
-
-               @Override
-               public boolean hasNext() {
-                       return false;
-               }
-
-               @Override
-               public T next() {
-                       throw new NoSuchElementException();
-               }
-
-               @Override
-               public void remove() {
-                       throw new UnsupportedOperationException();
-               }
-       }
-
-       public static class ParentClass {
-               int num;
-               String string;
-               public ParentClass(int num, String string) {
-                       this.num = num;
-                       this.string = string;
-               }
-       }
-
-       public static class SubClass extends ParentClass{
-               public SubClass(int num, String string) {
-                       super(num, string);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
new file mode 100644
index 0000000..d29c833
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ContextEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.SplittableIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+public class StreamExecutionEnvironmentTest {
+
+       @Test
+       public void fromElementsWithBaseTypeTest1() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.fromElements(ParentClass.class, new SubClass(1, "Java"), 
new ParentClass(1, "hello"));
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void fromElementsWithBaseTypeTest2() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.fromElements(SubClass.class, new SubClass(1, "Java"), new 
ParentClass(1, "hello"));
+       }
+
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testFromCollectionParallelism() {
+               try {
+                       TypeInformation<Integer> typeInfo = 
BasicTypeInfo.INT_TYPE_INFO;
+                       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+                       DataStreamSource<Integer> dataStream1 = 
env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo);
+
+                       try {
+                               dataStream1.setParallelism(4);
+                               fail("should throw an exception");
+                       }
+                       catch (IllegalArgumentException e) {
+                               // expected
+                       }
+
+                       dataStream1.addSink(new DiscardingSink<Integer>());
+       
+                       DataStreamSource<Integer> dataStream2 = 
env.fromParallelCollection(new DummySplittableIterator<Integer>(),
+                                       typeInfo).setParallelism(4);
+
+                       dataStream2.addSink(new DiscardingSink<Integer>());
+
+                       env.getExecutionPlan();
+
+                       assertEquals("Parallelism of collection source must be 
1.", 1, 
env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism());
+                       assertEquals("Parallelism of parallel collection source 
must be 4.",
+                                       4, 
+                                       
env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+
+       @Test
+       public void testSources() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void run(SourceContext<Integer> ctx) throws 
Exception {
+                       }
+
+                       @Override
+                       public void cancel() {
+                       }
+               };
+               DataStreamSource<Integer> src1 = env.addSource(srcFun);
+               src1.addSink(new DiscardingSink<Integer>());
+               assertEquals(srcFun, getFunctionFromDataSource(src1));
+
+               List<Long> list = Arrays.asList(0L, 1L, 2L);
+
+               DataStreamSource<Long> src2 = env.generateSequence(0, 2);
+               assertTrue(getFunctionFromDataSource(src2) instanceof 
StatefulSequenceSource);
+
+               DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L);
+               assertTrue(getFunctionFromDataSource(src3) instanceof 
FromElementsFunction);
+
+               DataStreamSource<Long> src4 = env.fromCollection(list);
+               assertTrue(getFunctionFromDataSource(src4) instanceof 
FromElementsFunction);
+       }
+
+       @Test
+       public void testDefaultParallelismIsDefault() {
+               assertEquals(
+                               ExecutionConfig.PARALLELISM_DEFAULT,
+                               
StreamExecutionEnvironment.createLocalEnvironment().getParallelism());
+
+               assertEquals(
+                               ExecutionConfig.PARALLELISM_DEFAULT,
+                               
StreamExecutionEnvironment.createRemoteEnvironment("dummy", 
1234).getParallelism());
+
+               StreamExecutionEnvironment contextEnv = new 
StreamContextEnvironment(
+                               new ContextEnvironment(
+                                               mock(ClusterClient.class),
+                                               Collections.<URL>emptyList(),
+                                               Collections.<URL>emptyList(),
+                                               
this.getClass().getClassLoader(),
+                                               null));
+
+               assertEquals(
+                               ExecutionConfig.PARALLELISM_DEFAULT,
+                               contextEnv.getParallelism());
+       }
+
+       @Test
+       public void testParallelismBounds() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void run(SourceContext<Integer> ctx) throws 
Exception {
+                       }
+
+                       @Override
+                       public void cancel() {
+                       }
+               };
+
+
+               SingleOutputStreamOperator<Object> operator =
+                               env.addSource(srcFun).flatMap(new 
FlatMapFunction<Integer, Object>() {
+
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void flatMap(Integer value, Collector<Object> 
out) throws Exception {
+
+                       }
+               });
+
+               // default value for max parallelism
+               Assert.assertEquals(-1, 
operator.getTransformation().getMaxParallelism());
+
+               // bounds for parallelism 1
+               try {
+                       operator.setParallelism(0);
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds for parallelism 2
+               operator.setParallelism(1);
+               Assert.assertEquals(1, operator.getParallelism());
+
+               // bounds for parallelism 3
+               operator.setParallelism(1 << 15);
+               Assert.assertEquals(1 << 15, operator.getParallelism());
+
+               // default value after generating
+               env.getStreamGraph().getJobGraph();
+               Assert.assertEquals(-1, 
operator.getTransformation().getMaxParallelism());
+
+               // configured value after generating
+               env.setMaxParallelism(42);
+               env.getStreamGraph().getJobGraph();
+               Assert.assertEquals(42, 
operator.getTransformation().getMaxParallelism());
+
+               // bounds configured parallelism 1
+               try {
+                       env.setMaxParallelism(0);
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds configured parallelism 2
+               try {
+                       env.setMaxParallelism(1 + (1 << 15));
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds for max parallelism 1
+               try {
+                       operator.setMaxParallelism(0);
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds for max parallelism 2
+               try {
+                       operator.setMaxParallelism(1 + (1 << 15));
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds for max parallelism 3
+               operator.setMaxParallelism(1);
+               Assert.assertEquals(1, 
operator.getTransformation().getMaxParallelism());
+
+               // bounds for max parallelism 4
+               operator.setMaxParallelism(1 << 15);
+               Assert.assertEquals(1 << 15, 
operator.getTransformation().getMaxParallelism());
+
+               // override config
+               env.getStreamGraph().getJobGraph();
+               Assert.assertEquals(1 << 15 , 
operator.getTransformation().getMaxParallelism());
+       }
+
+       /////////////////////////////////////////////////////////////
+       // Utilities
+       /////////////////////////////////////////////////////////////
+
+
+       private static StreamOperator<?> 
getOperatorFromDataStream(DataStream<?> dataStream) {
+               StreamExecutionEnvironment env = 
dataStream.getExecutionEnvironment();
+               StreamGraph streamGraph = env.getStreamGraph();
+               return 
streamGraph.getStreamNode(dataStream.getId()).getOperator();
+       }
+
+       @SuppressWarnings("unchecked")
+       private static <T> SourceFunction<T> 
getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) {
+               dataStreamSource.addSink(new DiscardingSink<T>());
+               AbstractUdfStreamOperator<?, ?> operator =
+                               (AbstractUdfStreamOperator<?, ?>) 
getOperatorFromDataStream(dataStreamSource);
+               return (SourceFunction<T>) operator.getUserFunction();
+       }
+
+       public static class DummySplittableIterator<T> extends 
SplittableIterator<T> {
+               private static final long serialVersionUID = 
1312752876092210499L;
+
+               @SuppressWarnings("unchecked")
+               @Override
+               public Iterator<T>[] split(int numPartitions) {
+                       return (Iterator<T>[]) new Iterator<?>[0];
+               }
+
+               @Override
+               public int getMaximumNumberOfSplits() {
+                       return 0;
+               }
+
+               @Override
+               public boolean hasNext() {
+                       return false;
+               }
+
+               @Override
+               public T next() {
+                       throw new NoSuchElementException();
+               }
+
+               @Override
+               public void remove() {
+                       throw new UnsupportedOperationException();
+               }
+       }
+
+       public static class ParentClass {
+               int num;
+               String string;
+               public ParentClass(int num, String string) {
+                       this.num = num;
+                       this.string = string;
+               }
+       }
+
+       public static class SubClass extends ParentClass{
+               public SubClass(int num, String string) {
+                       super(num, string);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 4d462d0..5b4dbf5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -42,7 +42,7 @@ import static org.junit.Assert.assertTrue;
 
 @SuppressWarnings("serial")
 public class StreamingJobGraphGeneratorTest extends TestLogger {
-       
+
        @Test
        public void testExecutionConfigSerialization() throws IOException, 
ClassNotFoundException {
                final long seed = System.currentTimeMillis();
@@ -50,12 +50,12 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
-               StreamGraph streamingJob = new StreamGraph(env);
-               StreamingJobGraphGenerator compiler = new 
StreamingJobGraphGenerator(streamingJob);
-               
+               StreamGraph streamingJob = new StreamGraph(env, 1 /* default 
parallelism */);
+               StreamingJobGraphGenerator compiler = new 
StreamingJobGraphGenerator(streamingJob, 1 /* default parallelism */);
+
                boolean closureCleanerEnabled = r.nextBoolean(), 
forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), 
objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean();
                int dop = 1 + r.nextInt(10);
-               
+
                ExecutionConfig config = streamingJob.getExecutionConfig();
                if(closureCleanerEnabled) {
                        config.enableClosureCleaner();
@@ -83,7 +83,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                        config.disableSysoutLogging();
                }
                config.setParallelism(dop);
-               
+
                JobGraph jobGraph = compiler.createJobGraph();
 
                final String EXEC_CONFIG_KEY = "runtime.config";
@@ -108,7 +108,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                assertEquals(sysoutLoggingEnabled, 
executionConfig.isSysoutLoggingEnabled());
                assertEquals(dop, executionConfig.getParallelism());
        }
-       
+
        @Test
        public void testParallelismOneNotChained() {
 
@@ -164,10 +164,10 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
        @Test
        public void testDisabledCheckpointing() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               StreamGraph streamGraph = new StreamGraph(env);
+               StreamGraph streamGraph = new StreamGraph(env, 1 /* default 
parallelism */);
                assertFalse("Checkpointing enabled", 
streamGraph.getCheckpointConfig().isCheckpointingEnabled());
 
-               StreamingJobGraphGenerator jobGraphGenerator = new 
StreamingJobGraphGenerator(streamGraph);
+               StreamingJobGraphGenerator jobGraphGenerator = new 
StreamingJobGraphGenerator(streamGraph, 1 /* default parallelism */);
                JobGraph jobGraph = jobGraphGenerator.createJobGraph();
 
                JobSnapshottingSettings snapshottingSettings = 
jobGraph.getSnapshotSettings();
@@ -189,7 +189,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                                }
                        })
                        .print();
-               JobGraph jobGraph = new 
StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph();
+               JobGraph jobGraph = new 
StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism 
*/).createJobGraph();
 
                JobVertex sourceVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(0);
                JobVertex mapPrintVertex = 
jobGraph.getVerticesSortedTopologicallyFromSources().get(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
index 91ec427..af413ad 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java
@@ -115,7 +115,7 @@ public class FoldApplyWindowFunctionTest {
 
                transformations.add(new OneInputTransformation<>(source, 
"test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
 
-               StreamGraph streamGraph = StreamGraphGenerator.generate(env, 
transformations);
+               StreamGraph streamGraph = StreamGraphGenerator.generate(env, 
transformations, 1 /* default parallelism */);
 
                List<Integer> result = new ArrayList<>();
                List<Integer> input = new ArrayList<>();
@@ -138,6 +138,10 @@ public class FoldApplyWindowFunctionTest {
 
        public static class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment {
 
+               public DummyStreamExecutionEnvironment() {
+                       super(1);
+               }
+
                @Override
                public JobExecutionResult execute(String jobName) throws 
Exception {
                        return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 22f1264..60798e0 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -673,23 +673,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 
 object StreamExecutionEnvironment {
 
-  /**
-   * Sets the default parallelism that will be used for the local execution
-   * environment created by [[createLocalEnvironment()]].
-   *
-   * @param parallelism The default parallelism to use for local execution.
-   */
-  @PublicEvolving
-  def setDefaultLocalParallelism(parallelism: Int) : Unit =
-    JavaEnv.setDefaultLocalParallelism(parallelism)
-
-  /**
-   * Gets the default parallelism that will be used for the local execution 
environment created by
-   * [[createLocalEnvironment()]].
-   */
-  @PublicEvolving
-  def getDefaultLocalParallelism: Int = JavaEnv.getDefaultLocalParallelism
-  
   // --------------------------------------------------------------------------
   //  context environment
   // --------------------------------------------------------------------------
@@ -711,13 +694,14 @@ object StreamExecutionEnvironment {
   /**
    * Creates a local execution environment. The local execution environment 
will run the
    * program in a multi-threaded fashion in the same JVM as the environment 
was created in.
-   *
-   * This method sets the environment's default parallelism to given 
parameter, which
-   * defaults to the value set via [[setDefaultLocalParallelism(Int)]].
    */
-  def createLocalEnvironment(parallelism: Int = 
JavaEnv.getDefaultLocalParallelism):
+  def createLocalEnvironment(parallelism: Int = -1):
       StreamExecutionEnvironment = {
-    new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
+    if (parallelism == -1) {
+      new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment())
+    } else {
+      new 
StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index adb59f2..b498edc 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -255,9 +255,10 @@ class DataStreamTest extends 
StreamingMultipleProgramsTestBase {
     val sink = map.addSink(x => {})
 
     assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
+    // default parallelism is only actualized when transforming to JobGraph
+    assert(-1 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
     assert(1 == 
env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
-    assert(10 == 
env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
+    assert(-1 == 
env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
 
     try {
       src.setParallelism(3)
@@ -272,9 +273,11 @@ class DataStreamTest extends 
StreamingMultipleProgramsTestBase {
     // the parallelism does not change since some windowing code takes the 
parallelism from
     // input operations and that cannot change dynamically
     assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism)
-    assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
+    // setting a parallelism on the env/in the ExecutionConfig means that 
operators
+    // pick it up when being instantiated
+    assert(7 == env.getStreamGraph.getStreamNode(map.getId).getParallelism)
     assert(1 == 
env.getStreamGraph.getStreamNode(windowed.getId).getParallelism)
-    assert(10 == 
env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
+    assert(7 == 
env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
 
     val parallelSource = env.generateSequence(0, 0)
     parallelSource.print()

http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 64c68dc..90d8790 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -36,6 +36,7 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
        
 
        public TestStreamEnvironment(LocalFlinkMiniCluster executor, int 
parallelism) {
+               super(parallelism);
                this.executor = Preconditions.checkNotNull(executor);
                setParallelism(parallelism);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index c56fa91..883f4b4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -381,6 +381,10 @@ public class AccumulatorLiveITCase {
         */
        private static class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment {
 
+               public DummyStreamExecutionEnvironment() {
+                       super(1 /* default parallelism */);
+               }
+
                @Override
                public JobExecutionResult execute() throws Exception {
                        return execute("default");

Reply via email to