Repository: flink
Updated Branches:
  refs/heads/master d47778632 -> a5476cdcd


[Flink-8407] [DataStream] Setting the parallelism after a partitioning 
operation should be forbidden


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a5476cdc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a5476cdc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a5476cdc

Branch: refs/heads/master
Commit: a5476cdcdfe231bc8e06e7b453eeb1cbab57ed0d
Parents: d477786
Author: Xingcan Cui <[email protected]>
Authored: Fri Jan 26 10:28:58 2018 +0800
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Jan 31 11:08:43 2018 +0100

----------------------------------------------------------------------
 .../datastream/SingleOutputStreamOperator.java  |  7 -----
 .../streaming/api/scala/DataStreamTest.scala    | 28 +++++++++++++++++++-
 2 files changed, 27 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a5476cdc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 7d055ad..e2a6272 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -26,10 +26,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
 
@@ -404,11 +402,6 @@ public class SingleOutputStreamOperator<T> extends 
DataStream<T> {
        //  Miscellaneous
        // 
------------------------------------------------------------------------
 
-       @Override
-       protected DataStream<T> setConnectionType(StreamPartitioner<T> 
partitioner) {
-               return new 
SingleOutputStreamOperator<>(this.getExecutionEnvironment(), new 
PartitionTransformation<>(this.getTransformation(), partitioner));
-       }
-
        /**
         * Sets the slot sharing group of this operation. Parallel instances of
         * operations that are in the same slot sharing group will be 
co-located in the same

http://git-wip-us.apache.org/repos/asf/flink/blob/a5476cdc/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 6158c8e..805eb41 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -34,10 +34,16 @@ import org.apache.flink.streaming.runtime.partitioner._
 import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.util.Collector
 import org.junit.Assert._
-import org.junit.Test
+import org.junit.{Rule, Test}
+import org.junit.rules.ExpectedException
 
 class DataStreamTest extends AbstractTestBase {
 
+  private val expectedException = ExpectedException.none()
+
+  @Rule
+  def thrownException = expectedException
+
   @Test
   def testNaming(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -299,6 +305,26 @@ class DataStreamTest extends AbstractTestBase {
     assert(4 == 
env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism)
   }
 
+
+  /**
+    * Tests setting the parallelism after a partitioning operation (e.g., 
broadcast, rescale)
+    * should fail.
+    */
+  @Test
+  def testParallelismFailAfterPartitioning(): Unit = {
+    val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
+
+    val src = env.fromElements(new Tuple2[Long, Long](0L, 0L))
+    val map = src.map(_ => (0L, 0L))
+
+    // This could be replaced with other partitioning operations (e.g., 
rescale, shuffle, forward),
+    // which trigger the setConnectionType() method.
+    val broadcastStream = map.broadcast
+    thrownException.expect(classOf[UnsupportedOperationException])
+    thrownException.expectMessage("cannot set the parallelism")
+    broadcastStream.setParallelism(1)
+  }
+
   /**
    * Tests whether resource gets set.
    */

Reply via email to