Repository: flink Updated Branches: refs/heads/master 42cc3a2a9 -> 345de772a
[FLINK-7567] [scala] Remove keepPartitioning parameter from DataStream.iterate() Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/345de772 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/345de772 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/345de772 Branch: refs/heads/master Commit: 345de772a9b6802beff2c9fa6e35da0884c9afd4 Parents: 42cc3a2 Author: Mikhail Lipkovich <mikhail_lipkov...@epam.com> Authored: Thu Sep 7 17:05:22 2017 +0300 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri Sep 22 11:16:30 2017 +0200 ---------------------------------------------------------------------- .../api/transformations/FeedbackTransformation.java | 3 ++- flink-streaming-scala/pom.xml | 4 ++++ .../apache/flink/streaming/api/scala/DataStream.scala | 11 +++++------ 3 files changed, 11 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/345de772/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java index 03a4e52..2e4f8a9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java @@ -87,7 +87,8 @@ public class FeedbackTransformation<T> extends StreamTransformation<T> { throw new UnsupportedOperationException( "Parallelism of the feedback stream must match the parallelism of the original" + " stream. Parallelism of original stream: " + this.getParallelism() + - "; parallelism of feedback stream: " + transform.getParallelism()); + "; parallelism of feedback stream: " + transform.getParallelism() + + ". Parallelism can be modified using DataStream#setParallelism() method"); } feedbackEdges.add(transform); http://git-wip-us.apache.org/repos/asf/flink/blob/345de772/flink-streaming-scala/pom.xml ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml index df58d6d..316731a 100644 --- a/flink-streaming-scala/pom.xml +++ b/flink-streaming-scala/pom.xml @@ -228,6 +228,10 @@ under the License. <excludes combine.children="append"> <!-- Exclude generated classes from api compatibility checks --> <exclude>*\$\$anon\$*</exclude> + + <!-- Ignore method which was created automatically by Scala for default value calculation. + Can be removed once https://github.com/siom79/japicmp/issues/176 will be fixed --> + <exclude>org.apache.flink.streaming.api.scala.DataStream#iterate\$default\$3()</exclude> </excludes> </parameter> </configuration> http://git-wip-us.apache.org/repos/asf/flink/blob/345de772/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 3875f66..63a2d5d 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -499,23 +499,22 @@ class DataStream[T](stream: JavaStream[T]) { * stepfunction: initialStream => (feedback, output) * * A common pattern is to use output splitting to create feedback and output DataStream. - * Please refer to the .split(...) method of the DataStream + * Please refer to the [[split]] method of the DataStream * * By default a DataStream with iteration will never terminate, but the user * can use the maxWaitTime parameter to set a max waiting time for the iteration head. * If no data received in the set time the stream terminates. * - * By default the feedback partitioning is set to match the input, to override this set - * the keepPartitioning flag to true - * + * Parallelism of the feedback stream must match the parallelism of the original stream. + * Please refer to the [[setParallelism]] method for parallelism modification */ @PublicEvolving def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]), - maxWaitTimeMillis:Long = 0, - keepPartitioning: Boolean = false) : DataStream[R] = { + maxWaitTimeMillis:Long = 0) : DataStream[R] = { val iterativeStream = stream.iterate(maxWaitTimeMillis) val (feedback, output) = stepFunction(new DataStream[T](iterativeStream)) + iterativeStream.closeWith(feedback.javaStream) output }