Repository: flink Updated Branches: refs/heads/release-1.2 edd1065c7 -> 99fb80be7
[FLINK-5808] Move default parallelism to StreamingJobGraphGenerator Before, it was set on the ExecutionConfig for some stream execution environments and later for others. Now, we don't set the default parallelism on the ExecutionConfig but instead set it at the latest possible point, in the StreamingJobGraphGenerator. This also adds tests that verify that we don't set the default parallelism on the ExecutionConfig. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b563f0ae Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b563f0ae Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b563f0ae Branch: refs/heads/release-1.2 Commit: b563f0ae2e7b7233e29e03fbb2cf18b0d853c0ca Parents: edd1065 Author: Aljoscha Krettek <[email protected]> Authored: Fri Mar 10 13:30:21 2017 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Sat Mar 18 07:43:41 2017 +0100 ---------------------------------------------------------------------- .../api/environment/LocalStreamEnvironment.java | 26 +- .../environment/RemoteStreamEnvironment.java | 5 + .../environment/StreamContextEnvironment.java | 13 +- .../environment/StreamExecutionEnvironment.java | 65 ++-- .../api/environment/StreamPlanEnvironment.java | 15 +- .../flink/streaming/api/graph/StreamGraph.java | 6 +- .../api/graph/StreamGraphGenerator.java | 12 +- .../api/graph/StreamingJobGraphGenerator.java | 14 +- .../api/StreamExecutionEnvironmentTest.java | 289 ----------------- .../StreamExecutionEnvironmentTest.java | 317 +++++++++++++++++++ .../graph/StreamingJobGraphGeneratorTest.java | 20 +- .../operators/FoldApplyWindowFunctionTest.java | 6 +- .../api/scala/StreamExecutionEnvironment.scala | 28 +- .../streaming/api/scala/DataStreamTest.scala | 11 +- .../streaming/util/TestStreamEnvironment.java | 1 + .../accumulators/AccumulatorLiveITCase.java | 4 + 16 files changed, 432 insertions(+), 400 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java index f8c9c42..cb60552 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java @@ -45,6 +45,9 @@ import org.slf4j.LoggerFactory; @Public public class LocalStreamEnvironment extends StreamExecutionEnvironment { + /** The default parallelism used when creating a local environment */ + private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors(); + private static final Logger LOG = LoggerFactory.getLogger(LocalStreamEnvironment.class); /** The configuration to use for the local cluster */ @@ -54,24 +57,43 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment { * Creates a new local stream environment that uses the default configuration. */ public LocalStreamEnvironment() { - this(null); + this(defaultLocalParallelism); } /** + * Creates a new local stream environment that uses the default configuration. + */ + public LocalStreamEnvironment(int parallelism) { + this(null, parallelism); + } + + + /** * Creates a new local stream environment that configures its local executor with the given configuration. * * @param config The configuration used to configure the local executor. */ public LocalStreamEnvironment(Configuration config) { + this(config, defaultLocalParallelism); + } + + /** + * Creates a new local stream environment that configures its local executor with the given configuration. + * + * @param config The configuration used to configure the local executor. + */ + public LocalStreamEnvironment(Configuration config, int parallelism) { + super(parallelism); if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) { throw new InvalidProgramException( "The LocalStreamEnvironment cannot be used when submitting a program through a client, " + "or running in a TestEnvironment context."); } - + this.conf = config == null ? new Configuration() : config; } + /** * Executes the JobGraph of the on a mini cluster of CLusterUtil with a user * specified name. http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java index 333f9c0..5684e28 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java @@ -37,6 +37,7 @@ import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.streaming.api.graph.StreamGraph; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -129,6 +130,10 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment { * The protocol must be supported by the {@link java.net.URLClassLoader}. */ public RemoteStreamEnvironment(String host, int port, Configuration clientConfiguration, String[] jarFiles, URL[] globalClasspaths) { + super(GlobalConfiguration.loadConfiguration().getInteger( + ConfigConstants.DEFAULT_PARALLELISM_KEY, + ConfigConstants.DEFAULT_PARALLELISM)); + if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) { throw new InvalidProgramException( "The RemoteEnvironment cannot be used when submitting a program through a client, " + http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java index 49c5347..51078f2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java @@ -38,14 +38,13 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment { private final ContextEnvironment ctx; protected StreamContextEnvironment(ContextEnvironment ctx) { + // if the batch ContextEnvironment has a parallelism this must have come from + // the CLI Client. We should set that as our default parallelism + super(ctx.getParallelism() > 0 ? ctx.getParallelism() : + GlobalConfiguration.loadConfiguration().getInteger( + ConfigConstants.DEFAULT_PARALLELISM_KEY, + ConfigConstants.DEFAULT_PARALLELISM)); this.ctx = ctx; - if (ctx.getParallelism() > 0) { - setParallelism(ctx.getParallelism()); - } else { - setParallelism(GlobalConfiguration.loadConfiguration().getInteger( - ConfigConstants.DEFAULT_PARALLELISM_KEY, - ConfigConstants.DEFAULT_PARALLELISM)); - } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index dab0a06..6ac3622 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -111,9 +111,6 @@ public abstract class StreamExecutionEnvironment { /** The environment of the context (local by default, cluster if invoked through command line) */ private static StreamExecutionEnvironmentFactory contextEnvironmentFactory; - /** The default parallelism used when creating a local environment */ - private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors(); - // ------------------------------------------------------------------------ /** The execution configuration for this environment */ @@ -134,11 +131,23 @@ public abstract class StreamExecutionEnvironment { /** The time characteristic used by the data streams */ private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC; + /** The parallelism to use when no parallelism is set on an operation. */ + private final int defaultParallelism; + // -------------------------------------------------------------------------------------------- // Constructor and Properties // -------------------------------------------------------------------------------------------- + + public StreamExecutionEnvironment() { + this(ConfigConstants.DEFAULT_PARALLELISM); + } + + public StreamExecutionEnvironment(int defaultParallelism) { + this.defaultParallelism = defaultParallelism; + } + /** * Gets the config object. */ @@ -1514,7 +1523,7 @@ public abstract class StreamExecutionEnvironment { if (transformations.size() <= 0) { throw new IllegalStateException("No operators defined in streaming topology. Cannot execute."); } - return StreamGraphGenerator.generate(this, transformations); + return StreamGraphGenerator.generate(this, transformations, defaultParallelism); } /** @@ -1602,7 +1611,7 @@ public abstract class StreamExecutionEnvironment { * @return A local execution environment. */ public static LocalStreamEnvironment createLocalEnvironment() { - return createLocalEnvironment(defaultLocalParallelism); + return new LocalStreamEnvironment(); } /** @@ -1611,14 +1620,12 @@ public abstract class StreamExecutionEnvironment { * environment was created in. It will use the parallelism specified in the * parameter. * - * @param parallelism - * The parallelism for the local environment. + * @param defaultParallelism The default parallelism for the local environment. + * * @return A local execution environment with the specified parallelism. */ - public static LocalStreamEnvironment createLocalEnvironment(int parallelism) { - LocalStreamEnvironment env = new LocalStreamEnvironment(); - env.setParallelism(parallelism); - return env; + public static LocalStreamEnvironment createLocalEnvironment(int defaultParallelism) { + return new LocalStreamEnvironment(defaultParallelism); } /** @@ -1627,16 +1634,13 @@ public abstract class StreamExecutionEnvironment { * environment was created in. It will use the parallelism specified in the * parameter. * - * @param parallelism - * The parallelism for the local environment. - * @param configuration - * Pass a custom configuration into the cluster + * @param defaultParallelism The parallelism for the local environment. + * @param configuration Pass a custom configuration into the cluster + * * @return A local execution environment with the specified parallelism. */ - public static LocalStreamEnvironment createLocalEnvironment(int parallelism, Configuration configuration) { - LocalStreamEnvironment currentEnvironment = new LocalStreamEnvironment(configuration); - currentEnvironment.setParallelism(parallelism); - return currentEnvironment; + public static LocalStreamEnvironment createLocalEnvironment(int defaultParallelism, Configuration configuration) { + return new LocalStreamEnvironment(configuration, defaultParallelism); } /** @@ -1661,7 +1665,6 @@ public abstract class StreamExecutionEnvironment { conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); LocalStreamEnvironment localEnv = new LocalStreamEnvironment(conf); - localEnv.setParallelism(defaultLocalParallelism); return localEnv; } @@ -1747,28 +1750,6 @@ public abstract class StreamExecutionEnvironment { return new RemoteStreamEnvironment(host, port, clientConfig, jarFiles); } - /** - * Gets the default parallelism that will be used for the local execution environment created by - * {@link #createLocalEnvironment()}. - * - * @return The default local parallelism - */ - @PublicEvolving - public static int getDefaultLocalParallelism() { - return defaultLocalParallelism; - } - - /** - * Sets the default parallelism that will be used for the local execution - * environment created by {@link #createLocalEnvironment()}. - * - * @param parallelism The parallelism to use as the default local parallelism. - */ - @PublicEvolving - public static void setDefaultLocalParallelism(int parallelism) { - defaultLocalParallelism = parallelism; - } - // -------------------------------------------------------------------------------------------- // Methods to control the context and local environments for execution from packaged programs // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java index b1521f5..9c676c4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java @@ -32,18 +32,11 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment { private ExecutionEnvironment env; protected StreamPlanEnvironment(ExecutionEnvironment env) { - super(); - this.env = env; + super(GlobalConfiguration.loadConfiguration().getInteger( + ConfigConstants.DEFAULT_PARALLELISM_KEY, + ConfigConstants.DEFAULT_PARALLELISM)); - int parallelism = env.getParallelism(); - if (parallelism > 0) { - setParallelism(parallelism); - } else { - // determine parallelism - setParallelism(GlobalConfiguration.loadConfiguration().getInteger( - ConfigConstants.DEFAULT_PARALLELISM_KEY, - ConfigConstants.DEFAULT_PARALLELISM)); - } + this.env = env; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 2f80764..1b4acd0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -91,12 +91,14 @@ public class StreamGraph extends StreamingPlan { private AbstractStateBackend stateBackend; private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs; + private final int defaultParallelism; - public StreamGraph(StreamExecutionEnvironment environment) { + public StreamGraph(StreamExecutionEnvironment environment, int defaultParallelism) { this.environment = environment; this.executionConfig = environment.getConfig(); this.checkpointConfig = environment.getCheckpointConfig(); + this.defaultParallelism = defaultParallelism; // create an empty new stream graph. clear(); } @@ -596,7 +598,7 @@ public class StreamGraph extends StreamingPlan { + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)"); } - StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this); + StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this, defaultParallelism); return jobgraphGenerator.createJobGraph(); } http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index ddd0515..333e4f9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -97,12 +97,11 @@ public class StreamGraphGenerator { // we have loops, i.e. feedback edges. private Map<StreamTransformation<?>, Collection<Integer>> alreadyTransformed; - /** * Private constructor. The generator should only be invoked using {@link #generate}. */ - private StreamGraphGenerator(StreamExecutionEnvironment env) { - this.streamGraph = new StreamGraph(env); + private StreamGraphGenerator(StreamExecutionEnvironment env, int defaultParallelism) { + this.streamGraph = new StreamGraph(env, defaultParallelism); this.streamGraph.setChaining(env.isChainingEnabled()); this.streamGraph.setStateBackend(env.getStateBackend()); this.env = env; @@ -119,8 +118,11 @@ public class StreamGraphGenerator { * * @return The generated {@code StreamGraph} */ - public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) { - return new StreamGraphGenerator(env).generateInternal(transformations); + public static StreamGraph generate( + StreamExecutionEnvironment env, + List<StreamTransformation<?>> transformations, + int defaultParallelism) { + return new StreamGraphGenerator(env, defaultParallelism).generateInternal(transformations); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 8877c80..f87e51e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.graph; import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.tuple.Tuple2; @@ -86,10 +87,13 @@ public class StreamingJobGraphGenerator { private final StreamGraphHasher defaultStreamGraphHasher; private final List<StreamGraphHasher> legacyStreamGraphHashers; - public StreamingJobGraphGenerator(StreamGraph streamGraph) { + private final int defaultParallelism; + + public StreamingJobGraphGenerator(StreamGraph streamGraph, int defaultParallelism) { this.streamGraph = streamGraph; this.defaultStreamGraphHasher = new StreamGraphHasherV2(); this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphHasherV1(), new StreamGraphUserHashHasher()); + this.defaultParallelism = defaultParallelism; } private void init() { @@ -304,12 +308,12 @@ public class StreamingJobGraphGenerator { int parallelism = streamNode.getParallelism(); - if (parallelism > 0) { - jobVertex.setParallelism(parallelism); - } else { - parallelism = jobVertex.getParallelism(); + if (parallelism == ExecutionConfig.PARALLELISM_DEFAULT) { + parallelism = defaultParallelism; } + jobVertex.setParallelism(parallelism); + jobVertex.setMaxParallelism(streamNode.getMaxParallelism()); if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java deleted file mode 100644 index 3fc1344..0000000 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java +++ /dev/null @@ -1,289 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; -import org.apache.flink.streaming.api.functions.source.FromElementsFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; -import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; -import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.util.Collector; -import org.apache.flink.util.SplittableIterator; -import org.junit.Assert; -import org.junit.Test; - -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class StreamExecutionEnvironmentTest { - - @Test - public void fromElementsWithBaseTypeTest1() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.fromElements(ParentClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello")); - } - - @Test(expected = IllegalArgumentException.class) - public void fromElementsWithBaseTypeTest2() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.fromElements(SubClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello")); - } - - @Test - @SuppressWarnings("unchecked") - public void testFromCollectionParallelism() { - try { - TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO; - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - DataStreamSource<Integer> dataStream1 = env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo); - - try { - dataStream1.setParallelism(4); - fail("should throw an exception"); - } - catch (IllegalArgumentException e) { - // expected - } - - dataStream1.addSink(new DiscardingSink<Integer>()); - - DataStreamSource<Integer> dataStream2 = env.fromParallelCollection(new DummySplittableIterator<Integer>(), - typeInfo).setParallelism(4); - - dataStream2.addSink(new DiscardingSink<Integer>()); - - env.getExecutionPlan(); - - assertEquals("Parallelism of collection source must be 1.", 1, env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism()); - assertEquals("Parallelism of parallel collection source must be 4.", - 4, - env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testSources() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - SourceFunction<Integer> srcFun = new SourceFunction<Integer>() { - private static final long serialVersionUID = 1L; - - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - } - - @Override - public void cancel() { - } - }; - DataStreamSource<Integer> src1 = env.addSource(srcFun); - src1.addSink(new DiscardingSink<Integer>()); - assertEquals(srcFun, getFunctionFromDataSource(src1)); - - List<Long> list = Arrays.asList(0L, 1L, 2L); - - DataStreamSource<Long> src2 = env.generateSequence(0, 2); - assertTrue(getFunctionFromDataSource(src2) instanceof StatefulSequenceSource); - - DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L); - assertTrue(getFunctionFromDataSource(src3) instanceof FromElementsFunction); - - DataStreamSource<Long> src4 = env.fromCollection(list); - assertTrue(getFunctionFromDataSource(src4) instanceof FromElementsFunction); - } - - @Test - public void testParallelismBounds() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - SourceFunction<Integer> srcFun = new SourceFunction<Integer>() { - private static final long serialVersionUID = 1L; - - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - } - - @Override - public void cancel() { - } - }; - - - SingleOutputStreamOperator<Object> operator = - env.addSource(srcFun).flatMap(new FlatMapFunction<Integer, Object>() { - - private static final long serialVersionUID = 1L; - - @Override - public void flatMap(Integer value, Collector<Object> out) throws Exception { - - } - }); - - // default value for max parallelism - Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism()); - - // bounds for parallelism 1 - try { - operator.setParallelism(0); - Assert.fail(); - } catch (IllegalArgumentException expected) { - } - - // bounds for parallelism 2 - operator.setParallelism(1); - Assert.assertEquals(1, operator.getParallelism()); - - // bounds for parallelism 3 - operator.setParallelism(1 << 15); - Assert.assertEquals(1 << 15, operator.getParallelism()); - - // default value after generating - env.getStreamGraph().getJobGraph(); - Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism()); - - // configured value after generating - env.setMaxParallelism(42); - env.getStreamGraph().getJobGraph(); - Assert.assertEquals(42, operator.getTransformation().getMaxParallelism()); - - // bounds configured parallelism 1 - try { - env.setMaxParallelism(0); - Assert.fail(); - } catch (IllegalArgumentException expected) { - } - - // bounds configured parallelism 2 - try { - env.setMaxParallelism(1 + (1 << 15)); - Assert.fail(); - } catch (IllegalArgumentException expected) { - } - - // bounds for max parallelism 1 - try { - operator.setMaxParallelism(0); - Assert.fail(); - } catch (IllegalArgumentException expected) { - } - - // bounds for max parallelism 2 - try { - operator.setMaxParallelism(1 + (1 << 15)); - Assert.fail(); - } catch (IllegalArgumentException expected) { - } - - // bounds for max parallelism 3 - operator.setMaxParallelism(1); - Assert.assertEquals(1, operator.getTransformation().getMaxParallelism()); - - // bounds for max parallelism 4 - operator.setMaxParallelism(1 << 15); - Assert.assertEquals(1 << 15, operator.getTransformation().getMaxParallelism()); - - // override config - env.getStreamGraph().getJobGraph(); - Assert.assertEquals(1 << 15 , operator.getTransformation().getMaxParallelism()); - } - - ///////////////////////////////////////////////////////////// - // Utilities - ///////////////////////////////////////////////////////////// - - - private static StreamOperator<?> getOperatorFromDataStream(DataStream<?> dataStream) { - StreamExecutionEnvironment env = dataStream.getExecutionEnvironment(); - StreamGraph streamGraph = env.getStreamGraph(); - return streamGraph.getStreamNode(dataStream.getId()).getOperator(); - } - - @SuppressWarnings("unchecked") - private static <T> SourceFunction<T> getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) { - dataStreamSource.addSink(new DiscardingSink<T>()); - AbstractUdfStreamOperator<?, ?> operator = - (AbstractUdfStreamOperator<?, ?>) getOperatorFromDataStream(dataStreamSource); - return (SourceFunction<T>) operator.getUserFunction(); - } - - public static class DummySplittableIterator<T> extends SplittableIterator<T> { - private static final long serialVersionUID = 1312752876092210499L; - - @SuppressWarnings("unchecked") - @Override - public Iterator<T>[] split(int numPartitions) { - return (Iterator<T>[]) new Iterator<?>[0]; - } - - @Override - public int getMaximumNumberOfSplits() { - return 0; - } - - @Override - public boolean hasNext() { - return false; - } - - @Override - public T next() { - throw new NoSuchElementException(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - } - - public static class ParentClass { - int num; - String string; - public ParentClass(int num, String string) { - this.num = num; - this.string = string; - } - } - - public static class SubClass extends ParentClass{ - public SubClass(int num, String string) { - super(num, string); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java new file mode 100644 index 0000000..d29c833 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.environment; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ContextEnvironment; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.source.FromElementsFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.util.Collector; +import org.apache.flink.util.SplittableIterator; +import org.junit.Assert; +import org.junit.Test; + +import java.net.URL; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +public class StreamExecutionEnvironmentTest { + + @Test + public void fromElementsWithBaseTypeTest1() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.fromElements(ParentClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello")); + } + + @Test(expected = IllegalArgumentException.class) + public void fromElementsWithBaseTypeTest2() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.fromElements(SubClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello")); + } + + @Test + @SuppressWarnings("unchecked") + public void testFromCollectionParallelism() { + try { + TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO; + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource<Integer> dataStream1 = env.fromCollection(new DummySplittableIterator<Integer>(), typeInfo); + + try { + dataStream1.setParallelism(4); + fail("should throw an exception"); + } + catch (IllegalArgumentException e) { + // expected + } + + dataStream1.addSink(new DiscardingSink<Integer>()); + + DataStreamSource<Integer> dataStream2 = env.fromParallelCollection(new DummySplittableIterator<Integer>(), + typeInfo).setParallelism(4); + + dataStream2.addSink(new DiscardingSink<Integer>()); + + env.getExecutionPlan(); + + assertEquals("Parallelism of collection source must be 1.", 1, env.getStreamGraph().getStreamNode(dataStream1.getId()).getParallelism()); + assertEquals("Parallelism of parallel collection source must be 4.", + 4, + env.getStreamGraph().getStreamNode(dataStream2.getId()).getParallelism()); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + @Test + public void testSources() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SourceFunction<Integer> srcFun = new SourceFunction<Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + } + + @Override + public void cancel() { + } + }; + DataStreamSource<Integer> src1 = env.addSource(srcFun); + src1.addSink(new DiscardingSink<Integer>()); + assertEquals(srcFun, getFunctionFromDataSource(src1)); + + List<Long> list = Arrays.asList(0L, 1L, 2L); + + DataStreamSource<Long> src2 = env.generateSequence(0, 2); + assertTrue(getFunctionFromDataSource(src2) instanceof StatefulSequenceSource); + + DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L); + assertTrue(getFunctionFromDataSource(src3) instanceof FromElementsFunction); + + DataStreamSource<Long> src4 = env.fromCollection(list); + assertTrue(getFunctionFromDataSource(src4) instanceof FromElementsFunction); + } + + @Test + public void testDefaultParallelismIsDefault() { + assertEquals( + ExecutionConfig.PARALLELISM_DEFAULT, + StreamExecutionEnvironment.createLocalEnvironment().getParallelism()); + + assertEquals( + ExecutionConfig.PARALLELISM_DEFAULT, + StreamExecutionEnvironment.createRemoteEnvironment("dummy", 1234).getParallelism()); + + StreamExecutionEnvironment contextEnv = new StreamContextEnvironment( + new ContextEnvironment( + mock(ClusterClient.class), + Collections.<URL>emptyList(), + Collections.<URL>emptyList(), + this.getClass().getClassLoader(), + null)); + + assertEquals( + ExecutionConfig.PARALLELISM_DEFAULT, + contextEnv.getParallelism()); + } + + @Test + public void testParallelismBounds() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SourceFunction<Integer> srcFun = new SourceFunction<Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + } + + @Override + public void cancel() { + } + }; + + + SingleOutputStreamOperator<Object> operator = + env.addSource(srcFun).flatMap(new FlatMapFunction<Integer, Object>() { + + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(Integer value, Collector<Object> out) throws Exception { + + } + }); + + // default value for max parallelism + Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism()); + + // bounds for parallelism 1 + try { + operator.setParallelism(0); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds for parallelism 2 + operator.setParallelism(1); + Assert.assertEquals(1, operator.getParallelism()); + + // bounds for parallelism 3 + operator.setParallelism(1 << 15); + Assert.assertEquals(1 << 15, operator.getParallelism()); + + // default value after generating + env.getStreamGraph().getJobGraph(); + Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism()); + + // configured value after generating + env.setMaxParallelism(42); + env.getStreamGraph().getJobGraph(); + Assert.assertEquals(42, operator.getTransformation().getMaxParallelism()); + + // bounds configured parallelism 1 + try { + env.setMaxParallelism(0); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds configured parallelism 2 + try { + env.setMaxParallelism(1 + (1 << 15)); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds for max parallelism 1 + try { + operator.setMaxParallelism(0); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds for max parallelism 2 + try { + operator.setMaxParallelism(1 + (1 << 15)); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds for max parallelism 3 + operator.setMaxParallelism(1); + Assert.assertEquals(1, operator.getTransformation().getMaxParallelism()); + + // bounds for max parallelism 4 + operator.setMaxParallelism(1 << 15); + Assert.assertEquals(1 << 15, operator.getTransformation().getMaxParallelism()); + + // override config + env.getStreamGraph().getJobGraph(); + Assert.assertEquals(1 << 15 , operator.getTransformation().getMaxParallelism()); + } + + ///////////////////////////////////////////////////////////// + // Utilities + ///////////////////////////////////////////////////////////// + + + private static StreamOperator<?> getOperatorFromDataStream(DataStream<?> dataStream) { + StreamExecutionEnvironment env = dataStream.getExecutionEnvironment(); + StreamGraph streamGraph = env.getStreamGraph(); + return streamGraph.getStreamNode(dataStream.getId()).getOperator(); + } + + @SuppressWarnings("unchecked") + private static <T> SourceFunction<T> getFunctionFromDataSource(DataStreamSource<T> dataStreamSource) { + dataStreamSource.addSink(new DiscardingSink<T>()); + AbstractUdfStreamOperator<?, ?> operator = + (AbstractUdfStreamOperator<?, ?>) getOperatorFromDataStream(dataStreamSource); + return (SourceFunction<T>) operator.getUserFunction(); + } + + public static class DummySplittableIterator<T> extends SplittableIterator<T> { + private static final long serialVersionUID = 1312752876092210499L; + + @SuppressWarnings("unchecked") + @Override + public Iterator<T>[] split(int numPartitions) { + return (Iterator<T>[]) new Iterator<?>[0]; + } + + @Override + public int getMaximumNumberOfSplits() { + return 0; + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public T next() { + throw new NoSuchElementException(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + public static class ParentClass { + int num; + String string; + public ParentClass(int num, String string) { + this.num = num; + this.string = string; + } + } + + public static class SubClass extends ParentClass{ + public SubClass(int num, String string) { + super(num, string); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 4d462d0..5b4dbf5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -42,7 +42,7 @@ import static org.junit.Assert.assertTrue; @SuppressWarnings("serial") public class StreamingJobGraphGeneratorTest extends TestLogger { - + @Test public void testExecutionConfigSerialization() throws IOException, ClassNotFoundException { final long seed = System.currentTimeMillis(); @@ -50,12 +50,12 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamGraph streamingJob = new StreamGraph(env); - StreamingJobGraphGenerator compiler = new StreamingJobGraphGenerator(streamingJob); - + StreamGraph streamingJob = new StreamGraph(env, 1 /* default parallelism */); + StreamingJobGraphGenerator compiler = new StreamingJobGraphGenerator(streamingJob, 1 /* default parallelism */); + boolean closureCleanerEnabled = r.nextBoolean(), forceAvroEnabled = r.nextBoolean(), forceKryoEnabled = r.nextBoolean(), objectReuseEnabled = r.nextBoolean(), sysoutLoggingEnabled = r.nextBoolean(); int dop = 1 + r.nextInt(10); - + ExecutionConfig config = streamingJob.getExecutionConfig(); if(closureCleanerEnabled) { config.enableClosureCleaner(); @@ -83,7 +83,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { config.disableSysoutLogging(); } config.setParallelism(dop); - + JobGraph jobGraph = compiler.createJobGraph(); final String EXEC_CONFIG_KEY = "runtime.config"; @@ -108,7 +108,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { assertEquals(sysoutLoggingEnabled, executionConfig.isSysoutLoggingEnabled()); assertEquals(dop, executionConfig.getParallelism()); } - + @Test public void testParallelismOneNotChained() { @@ -164,10 +164,10 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { @Test public void testDisabledCheckpointing() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - StreamGraph streamGraph = new StreamGraph(env); + StreamGraph streamGraph = new StreamGraph(env, 1 /* default parallelism */); assertFalse("Checkpointing enabled", streamGraph.getCheckpointConfig().isCheckpointingEnabled()); - StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph); + StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph, 1 /* default parallelism */); JobGraph jobGraph = jobGraphGenerator.createJobGraph(); JobSnapshottingSettings snapshottingSettings = jobGraph.getSnapshotSettings(); @@ -189,7 +189,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { } }) .print(); - JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph()).createJobGraph(); + JobGraph jobGraph = new StreamingJobGraphGenerator(env.getStreamGraph(), 1 /* default parallelism */).createJobGraph(); JobVertex sourceVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(0); JobVertex mapPrintVertex = jobGraph.getVerticesSortedTopologicallyFromSources().get(1); http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java index 91ec427..af413ad 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java @@ -115,7 +115,7 @@ public class FoldApplyWindowFunctionTest { transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1)); - StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations); + StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations, 1 /* default parallelism */); List<Integer> result = new ArrayList<>(); List<Integer> input = new ArrayList<>(); @@ -138,6 +138,10 @@ public class FoldApplyWindowFunctionTest { public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { + public DummyStreamExecutionEnvironment() { + super(1); + } + @Override public JobExecutionResult execute(String jobName) throws Exception { return null; http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 22f1264..60798e0 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -673,23 +673,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { object StreamExecutionEnvironment { - /** - * Sets the default parallelism that will be used for the local execution - * environment created by [[createLocalEnvironment()]]. - * - * @param parallelism The default parallelism to use for local execution. - */ - @PublicEvolving - def setDefaultLocalParallelism(parallelism: Int) : Unit = - JavaEnv.setDefaultLocalParallelism(parallelism) - - /** - * Gets the default parallelism that will be used for the local execution environment created by - * [[createLocalEnvironment()]]. - */ - @PublicEvolving - def getDefaultLocalParallelism: Int = JavaEnv.getDefaultLocalParallelism - // -------------------------------------------------------------------------- // context environment // -------------------------------------------------------------------------- @@ -711,13 +694,14 @@ object StreamExecutionEnvironment { /** * Creates a local execution environment. The local execution environment will run the * program in a multi-threaded fashion in the same JVM as the environment was created in. - * - * This method sets the environment's default parallelism to given parameter, which - * defaults to the value set via [[setDefaultLocalParallelism(Int)]]. */ - def createLocalEnvironment(parallelism: Int = JavaEnv.getDefaultLocalParallelism): + def createLocalEnvironment(parallelism: Int = -1): StreamExecutionEnvironment = { - new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism)) + if (parallelism == -1) { + new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment()) + } else { + new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism)) + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala index adb59f2..b498edc 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala @@ -255,9 +255,10 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { val sink = map.addSink(x => {}) assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism) - assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism) + // default parallelism is only actualized when transforming to JobGraph + assert(-1 == env.getStreamGraph.getStreamNode(map.getId).getParallelism) assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism) - assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism) + assert(-1 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism) try { src.setParallelism(3) @@ -272,9 +273,11 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { // the parallelism does not change since some windowing code takes the parallelism from // input operations and that cannot change dynamically assert(1 == env.getStreamGraph.getStreamNode(src.getId).getParallelism) - assert(10 == env.getStreamGraph.getStreamNode(map.getId).getParallelism) + // setting a parallelism on the env/in the ExecutionConfig means that operators + // pick it up when being instantiated + assert(7 == env.getStreamGraph.getStreamNode(map.getId).getParallelism) assert(1 == env.getStreamGraph.getStreamNode(windowed.getId).getParallelism) - assert(10 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism) + assert(7 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism) val parallelSource = env.generateSequence(0, 0) parallelSource.print() http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index 64c68dc..90d8790 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -36,6 +36,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment { public TestStreamEnvironment(LocalFlinkMiniCluster executor, int parallelism) { + super(parallelism); this.executor = Preconditions.checkNotNull(executor); setParallelism(parallelism); } http://git-wip-us.apache.org/repos/asf/flink/blob/b563f0ae/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java index c56fa91..883f4b4 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java @@ -381,6 +381,10 @@ public class AccumulatorLiveITCase { */ private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { + public DummyStreamExecutionEnvironment() { + super(1 /* default parallelism */); + } + @Override public JobExecutionResult execute() throws Exception { return execute("default");
