Repository: flink
Updated Branches:
  refs/heads/release-1.2 908376ba9 -> 993a2e2fa


http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index 3fb4513..3fc1344 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -17,19 +17,12 @@
 
 package org.apache.flink.streaming.api;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
@@ -38,10 +31,20 @@ import 
org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.SplittableIterator;
-
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class StreamExecutionEnvironmentTest {
 
        @Test
@@ -124,6 +127,102 @@ public class StreamExecutionEnvironmentTest {
                assertTrue(getFunctionFromDataSource(src4) instanceof 
FromElementsFunction);
        }
 
+       @Test
+       public void testParallelismBounds() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void run(SourceContext<Integer> ctx) throws 
Exception {
+                       }
+
+                       @Override
+                       public void cancel() {
+                       }
+               };
+
+
+               SingleOutputStreamOperator<Object> operator =
+                               env.addSource(srcFun).flatMap(new 
FlatMapFunction<Integer, Object>() {
+
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void flatMap(Integer value, Collector<Object> 
out) throws Exception {
+
+                       }
+               });
+
+               // default value for max parallelism
+               Assert.assertEquals(-1, 
operator.getTransformation().getMaxParallelism());
+
+               // bounds for parallelism 1
+               try {
+                       operator.setParallelism(0);
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds for parallelism 2
+               operator.setParallelism(1);
+               Assert.assertEquals(1, operator.getParallelism());
+
+               // bounds for parallelism 3
+               operator.setParallelism(1 << 15);
+               Assert.assertEquals(1 << 15, operator.getParallelism());
+
+               // default value after generating
+               env.getStreamGraph().getJobGraph();
+               Assert.assertEquals(-1, 
operator.getTransformation().getMaxParallelism());
+
+               // configured value after generating
+               env.setMaxParallelism(42);
+               env.getStreamGraph().getJobGraph();
+               Assert.assertEquals(42, 
operator.getTransformation().getMaxParallelism());
+
+               // bounds configured parallelism 1
+               try {
+                       env.setMaxParallelism(0);
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds configured parallelism 2
+               try {
+                       env.setMaxParallelism(1 + (1 << 15));
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds for max parallelism 1
+               try {
+                       operator.setMaxParallelism(0);
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds for max parallelism 2
+               try {
+                       operator.setMaxParallelism(1 + (1 << 15));
+                       Assert.fail();
+               } catch (IllegalArgumentException expected) {
+               }
+
+               // bounds for max parallelism 3
+               operator.setMaxParallelism(1);
+               Assert.assertEquals(1, 
operator.getTransformation().getMaxParallelism());
+
+               // bounds for max parallelism 4
+               operator.setMaxParallelism(1 << 15);
+               Assert.assertEquals(1 << 15, 
operator.getTransformation().getMaxParallelism());
+
+               // override config
+               env.getStreamGraph().getJobGraph();
+               Assert.assertEquals(1 << 15 , 
operator.getTransformation().getMaxParallelism());
+       }
+
        /////////////////////////////////////////////////////////////
        // Utilities
        /////////////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
index aa86304..5fdacd4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
@@ -22,13 +22,12 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.datastream.ConnectedStreams;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
@@ -37,7 +36,6 @@ import 
org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
-import 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -46,7 +44,6 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.util.EvenOddOutputSelector;
 import org.apache.flink.streaming.util.NoOpIntMap;
-
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -368,13 +365,9 @@ public class StreamGraphGeneratorTest {
 
                StreamGraph graph = env.getStreamGraph();
 
-               StreamNode keyedResult1Node = 
graph.getStreamNode(keyedResult1.getId());
-               StreamNode keyedResult2Node = 
graph.getStreamNode(keyedResult2.getId());
                StreamNode keyedResult3Node = 
graph.getStreamNode(keyedResult3.getId());
                StreamNode keyedResult4Node = 
graph.getStreamNode(keyedResult4.getId());
 
-               assertEquals(KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM, 
keyedResult1Node.getMaxParallelism());
-               assertEquals(KeyGroupRangeAssignment.DEFAULT_MAX_PARALLELISM, 
keyedResult2Node.getMaxParallelism());
                assertEquals(maxParallelism, 
keyedResult3Node.getMaxParallelism());
                assertEquals(maxParallelism, 
keyedResult4Node.getMaxParallelism());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/993a2e2f/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 8045d82..073632a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -122,19 +123,29 @@ public class RescalingITCase extends TestLogger {
 
        @Test
        public void testSavepointRescalingInKeyedState() throws Exception {
-               testSavepointRescalingKeyedState(false);
+               testSavepointRescalingKeyedState(false, false);
        }
 
        @Test
        public void testSavepointRescalingOutKeyedState() throws Exception {
-               testSavepointRescalingKeyedState(true);
+               testSavepointRescalingKeyedState(true, false);
+       }
+
+       @Test
+       public void testSavepointRescalingInKeyedStateDerivedMaxParallelism() 
throws Exception {
+               testSavepointRescalingKeyedState(false, true);
+       }
+
+       @Test
+       public void testSavepointRescalingOutKeyedStateDerivedMaxParallelism() 
throws Exception {
+               testSavepointRescalingKeyedState(true, true);
        }
 
        /**
         * Tests that a a job with purely keyed state can be restarted from a 
savepoint
         * with a different parallelism.
         */
-       public void testSavepointRescalingKeyedState(boolean scaleOut) throws 
Exception {
+       public void testSavepointRescalingKeyedState(boolean scaleOut, boolean 
deriveMaxParallelism) throws Exception {
                final int numberKeys = 42;
                final int numberElements = 1000;
                final int numberElements2 = 500;
@@ -194,7 +205,9 @@ public class RescalingITCase extends TestLogger {
 
                        jobID = null;
 
-                       JobGraph scaledJobGraph = 
createJobGraphWithKeyedState(parallelism2, maxParallelism, numberKeys, 
numberElements2, true, 100);
+                       int restoreMaxParallelism = deriveMaxParallelism ? 
ExecutionJobVertex.VALUE_NOT_SET : maxParallelism;
+
+                       JobGraph scaledJobGraph = 
createJobGraphWithKeyedState(parallelism2, restoreMaxParallelism, numberKeys, 
numberElements2, true, 100);
 
                        
scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
@@ -642,7 +655,9 @@ public class RescalingITCase extends TestLogger {
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(parallelism);
-               env.getConfig().setMaxParallelism(maxParallelism);
+               if (0 < maxParallelism) {
+                       env.getConfig().setMaxParallelism(maxParallelism);
+               }
                env.enableCheckpointing(checkpointingInterval);
                env.setRestartStrategy(RestartStrategies.noRestart());
 

Reply via email to