Repository: flink Updated Branches: refs/heads/master d47778632 -> a5476cdcd
[Flink-8407] [DataStream] Setting the parallelism after a partitioning operation should be forbidden Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a5476cdc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a5476cdc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a5476cdc Branch: refs/heads/master Commit: a5476cdcdfe231bc8e06e7b453eeb1cbab57ed0d Parents: d477786 Author: Xingcan Cui <[email protected]> Authored: Fri Jan 26 10:28:58 2018 +0800 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Jan 31 11:08:43 2018 +0100 ---------------------------------------------------------------------- .../datastream/SingleOutputStreamOperator.java | 7 ----- .../streaming/api/scala/DataStreamTest.scala | 28 +++++++++++++++++++- 2 files changed, 27 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a5476cdc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 7d055ad..e2a6272 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -26,10 +26,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.transformations.SideOutputTransformation; import org.apache.flink.streaming.api.transformations.StreamTransformation; -import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.util.OutputTag; import org.apache.flink.util.Preconditions; @@ -404,11 +402,6 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> { // Miscellaneous // ------------------------------------------------------------------------ - @Override - protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) { - return new SingleOutputStreamOperator<>(this.getExecutionEnvironment(), new PartitionTransformation<>(this.getTransformation(), partitioner)); - } - /** * Sets the slot sharing group of this operation. Parallel instances of * operations that are in the same slot sharing group will be co-located in the same http://git-wip-us.apache.org/repos/asf/flink/blob/a5476cdc/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala index 6158c8e..805eb41 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala @@ -34,10 +34,16 @@ import org.apache.flink.streaming.runtime.partitioner._ import org.apache.flink.test.util.AbstractTestBase import org.apache.flink.util.Collector import org.junit.Assert._ -import org.junit.Test +import org.junit.{Rule, Test} +import org.junit.rules.ExpectedException class DataStreamTest extends AbstractTestBase { + private val expectedException = ExpectedException.none() + + @Rule + def thrownException = expectedException + @Test def testNaming(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment @@ -299,6 +305,26 @@ class DataStreamTest extends AbstractTestBase { assert(4 == env.getStreamGraph.getStreamNode(sink.getTransformation.getId).getParallelism) } + + /** + * Tests setting the parallelism after a partitioning operation (e.g., broadcast, rescale) + * should fail. + */ + @Test + def testParallelismFailAfterPartitioning(): Unit = { + val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment + + val src = env.fromElements(new Tuple2[Long, Long](0L, 0L)) + val map = src.map(_ => (0L, 0L)) + + // This could be replaced with other partitioning operations (e.g., rescale, shuffle, forward), + // which trigger the setConnectionType() method. + val broadcastStream = map.broadcast + thrownException.expect(classOf[UnsupportedOperationException]) + thrownException.expectMessage("cannot set the parallelism") + broadcastStream.setParallelism(1) + } + /** * Tests whether resource gets set. */
