Repository: flink
Updated Branches:
  refs/heads/master 42cc3a2a9 -> 345de772a


[FLINK-7567] [scala] Remove keepPartitioning parameter from DataStream.iterate()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/345de772
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/345de772
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/345de772

Branch: refs/heads/master
Commit: 345de772a9b6802beff2c9fa6e35da0884c9afd4
Parents: 42cc3a2
Author: Mikhail Lipkovich <mikhail_lipkov...@epam.com>
Authored: Thu Sep 7 17:05:22 2017 +0300
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Fri Sep 22 11:16:30 2017 +0200

----------------------------------------------------------------------
 .../api/transformations/FeedbackTransformation.java      |  3 ++-
 flink-streaming-scala/pom.xml                            |  4 ++++
 .../apache/flink/streaming/api/scala/DataStream.scala    | 11 +++++------
 3 files changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/345de772/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
index 03a4e52..2e4f8a9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/FeedbackTransformation.java
@@ -87,7 +87,8 @@ public class FeedbackTransformation<T> extends 
StreamTransformation<T> {
                        throw new UnsupportedOperationException(
                                        "Parallelism of the feedback stream 
must match the parallelism of the original" +
                                                        " stream. Parallelism 
of original stream: " + this.getParallelism() +
-                                                       "; parallelism of 
feedback stream: " + transform.getParallelism());
+                                                       "; parallelism of 
feedback stream: " + transform.getParallelism() +
+                                                       ". Parallelism can be 
modified using DataStream#setParallelism() method");
                }
 
                feedbackEdges.add(transform);

http://git-wip-us.apache.org/repos/asf/flink/blob/345de772/flink-streaming-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/pom.xml b/flink-streaming-scala/pom.xml
index df58d6d..316731a 100644
--- a/flink-streaming-scala/pom.xml
+++ b/flink-streaming-scala/pom.xml
@@ -228,6 +228,10 @@ under the License.
                                                <excludes 
combine.children="append">
                                                        <!-- Exclude generated 
classes from api compatibility checks -->
                                                        
<exclude>*\$\$anon\$*</exclude>
+
+                                                       <!-- Ignore method 
which was created automatically by Scala for default value calculation.
+                                                       Can be removed once 
https://github.com/siom79/japicmp/issues/176 will be fixed -->
+                                                       
<exclude>org.apache.flink.streaming.api.scala.DataStream#iterate\$default\$3()</exclude>
                                                </excludes>
                                        </parameter>
                                </configuration>

http://git-wip-us.apache.org/repos/asf/flink/blob/345de772/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 3875f66..63a2d5d 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -499,23 +499,22 @@ class DataStream[T](stream: JavaStream[T]) {
    * stepfunction: initialStream => (feedback, output)
    *
    * A common pattern is to use output splitting to create feedback and output 
DataStream.
-   * Please refer to the .split(...) method of the DataStream
+   * Please refer to the [[split]] method of the DataStream
    *
    * By default a DataStream with iteration will never terminate, but the user
    * can use the maxWaitTime parameter to set a max waiting time for the 
iteration head.
    * If no data received in the set time the stream terminates.
    *
-   * By default the feedback partitioning is set to match the input, to 
override this set
-   * the keepPartitioning flag to true
-   *
+   * Parallelism of the feedback stream must match the parallelism of the 
original stream.
+   * Please refer to the [[setParallelism]] method for parallelism modification
    */
   @PublicEvolving
   def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),
-                    maxWaitTimeMillis:Long = 0,
-                    keepPartitioning: Boolean = false) : DataStream[R] = {
+                    maxWaitTimeMillis:Long = 0) : DataStream[R] = {
     val iterativeStream = stream.iterate(maxWaitTimeMillis)
 
     val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
+
     iterativeStream.closeWith(feedback.javaStream)
     output
   }

Reply via email to