Repository: flink Updated Branches: refs/heads/release-1.2 908376ba9 -> 993a2e2fa
http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java index 3fb4513..3fc1344 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java @@ -17,19 +17,12 @@ package org.apache.flink.streaming.api; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.NoSuchElementException; - +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.source.FromElementsFunction; @@ -38,10 +31,20 @@ import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.util.Collector; import org.apache.flink.util.SplittableIterator; - +import org.junit.Assert; import org.junit.Test; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + public class StreamExecutionEnvironmentTest { @Test @@ -124,6 +127,102 @@ public class StreamExecutionEnvironmentTest { assertTrue(getFunctionFromDataSource(src4) instanceof FromElementsFunction); } + @Test + public void testParallelismBounds() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SourceFunction<Integer> srcFun = new SourceFunction<Integer>() { + private static final long serialVersionUID = 1L; + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + } + + @Override + public void cancel() { + } + }; + + + SingleOutputStreamOperator<Object> operator = + env.addSource(srcFun).flatMap(new FlatMapFunction<Integer, Object>() { + + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(Integer value, Collector<Object> out) throws Exception { + + } + }); + + // default value for max parallelism + Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism()); + + // bounds for parallelism 1 + try { + operator.setParallelism(0); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds for parallelism 2 + operator.setParallelism(1); + Assert.assertEquals(1, operator.getParallelism()); + + // bounds for parallelism 3 + operator.setParallelism(1 << 15); + Assert.assertEquals(1 << 15, operator.getParallelism()); + + // default value after generating + env.getStreamGraph().getJobGraph(); + Assert.assertEquals(-1, operator.getTransformation().getMaxParallelism()); + + // configured value after generating + env.setMaxParallelism(42); + env.getStreamGraph().getJobGraph(); + Assert.assertEquals(42, operator.getTransformation().getMaxParallelism()); + + // bounds configured parallelism 1 + try { + env.setMaxParallelism(0); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds configured parallelism 2 + try { + env.setMaxParallelism(1 + (1 << 15)); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds for max parallelism 1 + try { + operator.setMaxParallelism(0); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds for max parallelism 2 + try { + operator.setMaxParallelism(1 + (1 << 15)); + Assert.fail(); + } catch (IllegalArgumentException expected) { + } + + // bounds for max parallelism 3 + operator.setMaxParallelism(1); + Assert.assertEquals(1, operator.getTransformation().getMaxParallelism()); + + // bounds for max parallelism 4 + operator.setMaxParallelism(1 << 15); + Assert.assertEquals(1 << 15, operator.getTransformation().getMaxParallelism()); + + // override config + env.getStreamGraph().getJobGraph(); + Assert.assertEquals(1 << 15 , operator.getTransformation().getMaxParallelism()); + } + ///////////////////////////////////////////////////////////// // Utilities ///////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index aa86304..5fdacd4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -22,13 +22,12 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.co.CoMapFunction; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; @@ -37,7 +36,6 @@ import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner; -import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; @@ -46,7 +44,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.util.EvenOddOutputSelector; import org.apache.flink.streaming.util.NoOpIntMap; - import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -368,13 +365,9 @@ public class StreamGraphGeneratorTest { StreamGraph graph = env.getStreamGraph(); - StreamNode keyedResult1Node = graph.getStreamNode(keyedResult1.getId()); - StreamNode keyedResult2Node = graph.getStreamNode(keyedResult2.getId()); StreamNode keyedResult3Node = graph.getStreamNode(keyedResult3.getId()); StreamNode keyedResult4Node = graph.getStreamNode(keyedResult4.getId()); - assertEquals(KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM, keyedResult1Node.getMaxParallelism()); - assertEquals(KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM, keyedResult2Node.getMaxParallelism()); assertEquals(maxParallelism, keyedResult3Node.getMaxParallelism()); assertEquals(maxParallelism, keyedResult4Node.getMaxParallelism()); } http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java index 8045d82..073632a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java @@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; @@ -122,19 +123,29 @@ public class RescalingITCase extends TestLogger { @Test public void testSavepointRescalingInKeyedState() throws Exception { - testSavepointRescalingKeyedState(false); + testSavepointRescalingKeyedState(false, false); } @Test public void testSavepointRescalingOutKeyedState() throws Exception { - testSavepointRescalingKeyedState(true); + testSavepointRescalingKeyedState(true, false); + } + + @Test + public void testSavepointRescalingInKeyedStateDerivedMaxParallelism() throws Exception { + testSavepointRescalingKeyedState(false, true); + } + + @Test + public void testSavepointRescalingOutKeyedStateDerivedMaxParallelism() throws Exception { + testSavepointRescalingKeyedState(true, true); } /** * Tests that a a job with purely keyed state can be restarted from a savepoint * with a different parallelism. */ - public void testSavepointRescalingKeyedState(boolean scaleOut) throws Exception { + public void testSavepointRescalingKeyedState(boolean scaleOut, boolean deriveMaxParallelism) throws Exception { final int numberKeys = 42; final int numberElements = 1000; final int numberElements2 = 500; @@ -194,7 +205,9 @@ public class RescalingITCase extends TestLogger { jobID = null; - JobGraph scaledJobGraph = createJobGraphWithKeyedState(parallelism2, maxParallelism, numberKeys, numberElements2, true, 100); + int restoreMaxParallelism = deriveMaxParallelism ? ExecutionJobVertex.VALUE_NOT_SET : maxParallelism; + + JobGraph scaledJobGraph = createJobGraphWithKeyedState(parallelism2, restoreMaxParallelism, numberKeys, numberElements2, true, 100); scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath)); @@ -642,7 +655,9 @@ public class RescalingITCase extends TestLogger { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(parallelism); - env.getConfig().setMaxParallelism(maxParallelism); + if (0 < maxParallelism) { + env.getConfig().setMaxParallelism(maxParallelism); + } env.enableCheckpointing(checkpointingInterval); env.setRestartStrategy(RestartStrategies.noRestart());