becketqin commented on a change in pull request #5:
URL: https://github.com/apache/flink-ml/pull/5#discussion_r719152625



##########
File path: 
flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/DataStreamList.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.iteration;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/** An utility class to maintain a list of {@link DataStream}, which might 
have different types. */

Review comment:
       A utility class....

##########
File path: 
flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/IterationBodyResult.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.iteration;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import javax.annotation.Nullable;
+
+/** The result of an iteration, specifying the feedbacks and the outputs. */
+public class IterationBodyResult {
+
+    /**
+     * A list of feedback variable streams. These streams will only be used 
during the iteration
+     * execution and will not be returned to the caller of the iteration body. 
It is assumed that
+     * the method which executes the iteration body will feed the records of 
the feedback variable
+     * streams back to the corresponding input variable streams.
+     */
+    private final DataStreamList feedbackVariableStreams;
+
+    /**
+     * A list of output streams. These streams will be returned to the caller 
of the methods that
+     * execute the iteration body.
+     */
+    private final DataStreamList outputStreams;
+
+    /**
+     * An optional termination criteria stream. If this stream is not null, it 
will be used together
+     * with the feedback variable streams to determine when the iteration 
should terminate.
+     */
+    private final @Nullable DataStream<?> terminationCriteria;
+
+    public IterationBodyResult(
+            DataStreamList feedbackVariableStreams, DataStreamList 
outputStreams) {
+        this(feedbackVariableStreams, outputStreams, null);
+    }
+
+    public IterationBodyResult(
+            DataStreamList feedbackVariableStreams,
+            DataStreamList outputStreams,
+            @Nullable DataStream<?> terminationCriteria) {
+        this.feedbackVariableStreams = feedbackVariableStreams;
+        this.outputStreams = outputStreams;
+        this.terminationCriteria = terminationCriteria;
+    }
+
+    public DataStreamList getFeedbackVariableStreams() {

Review comment:
       I guess this is probably more of a code convention of Flink. So getter 
method seems fine here.

##########
File path: 
flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/Iterations.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.iteration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** A helper class to create iterations. */
+@PublicEvolving
+public class Iterations {

Review comment:
       The java doc in this class is critical as users will probably start to 
learn iterations from here. Can we make the java doc here more structured and 
well-formatted? For example, add some necessary overview and use bullet points 
instead of number indices in a long paragraph.

##########
File path: flink-ml-iteration/src/test/resources/log4j2-test.properties
##########
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = INFO

Review comment:
       Should the default logging level for testing be OFF?

##########
File path: 
flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/Iterations.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.iteration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** A helper class to create iterations. */
+@PublicEvolving
+public class Iterations {
+
+    /**
+     * This method uses an iteration body to process records in unbounded data 
streams.
+     *
+     * <p>This method invokes the iteration body with the following 
parameters: 1) The 1st parameter
+     * is a list of input variable streams, which are created as the union of 
the initial variable
+     * streams and the corresponding feedback variable streams (returned by 
the iteration body). 2)
+     * The 2nd parameter is the data streams given to this method.
+     *
+     * <p>The epoch values are determined as described below. See 
IterationListener for how the
+     * epoch values are used. 1) All records in the initial variable streams 
and initial data
+     * streams has epoch=0. 2) For any record emitted by this operator into a 
non-feedback stream,
+     * the epoch of this emitted record = the epoch of the input record that 
triggers this emission.
+     * If this record is emitted by onEpochWatermarkIncremented(), then the 
epoch of this record =
+     * epochWatermark. 3) For any record emitted by this operator into a 
feedback variable stream,
+     * the epoch of the emitted record = the epoch of the input record that 
triggers this emission +
+     * 1. If this record is emitted by onEpochWatermarkIncremented(), then the 
epoch of this record
+     * = epochWatermark.
+     *
+     * <p>The iteration would not terminate if at least one of its inputs is 
unbounded. Otherwise it
+     * will terminated after all the inputs are terminated and no more records 
are iterating.
+     *
+     * <p>Required: 1) The parallelism of any stream in the initial variable 
streams must equal to
+     * the parallelism of the stream at the same index of the feedback 
variable streams returned by
+     * the IterationBody.
+     *
+     * @param initVariableStreams The initial variable streams. These streams 
will be merged with
+     *     the feedback variable streams before being used as the 1st 
parameter to invoke the
+     *     iteration body.
+     * @param dataStreams The data streams. These streams will be used as the 
2nd parameter to
+     *     invoke the iteration body.
+     * @param body The computation logic which takes variable/data streams and 
returns
+     *     variable/output streams.
+     * @return The list of output streams returned by the iteration boy.
+     */
+    public static DataStreamList iterateUnboundedStreams(
+            DataStreamList initVariableStreams, DataStreamList dataStreams, 
IterationBody body) {
+        return null;
+    }
+
+    /**
+     * This method uses an iteration body to process records in some bounded 
data streams
+     * iteratively until a termination criteria is reached (e.g. the given 
number of rounds is
+     * completed or no further variable update is needed). Because this method 
does not replay
+     * records in the data streams, the iteration body needs to cache those 
records in order to
+     * visit those records repeatedly.
+     *
+     * <p>This method invokes the iteration body with the following 
parameters: 1) The 1st parameter
+     * is a list of input variable streams, which are created as the union of 
the initial variable
+     * streams and the corresponding feedback variable streams (returned by 
the iteration body). 2)
+     * The 2nd parameter is the data streams given to this method.
+     *
+     * <p>The epoch values are determined as described below. See 
IterationListener for how the
+     * epoch values are used. 1) All records in the initial variable streams 
has epoch=0. 2) All
+     * records in the data streams has epoch=0. 3) For any record emitted by 
this operator into a
+     * non-feedback stream, the epoch of this emitted record = the epoch of 
the input record that

Review comment:
       Can we just say output stream instead of non-feedback stream to make it 
consistent with that in the `IterationBodyResult`?

##########
File path: 
flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/IterationBody.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.iteration;
+
+/** The builder of the subgraph that will be executed inside the iteration. */
+public interface IterationBody {

Review comment:
       Actually, should we add public evolving to all the user facing APIs?

##########
File path: flink-ml-iteration/pom.xml
##########
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-ml-parent</artifactId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-ml-iteration</artifactId>

Review comment:
       I have the same question here. The only reason I can think of is that 
later on we may find that iteration is not only useful fo ML but also for Graph 
Computing, and other use cases therefore want to add that into the flink-core. 
In this case, I would suggest we do not include the ML in package name to begin 
with, to avoid future package path change.

##########
File path: 
flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/Iterations.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.iteration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** A helper class to create iterations. */
+@PublicEvolving
+public class Iterations {
+
+    /**
+     * This method uses an iteration body to process records in unbounded data 
streams.
+     *
+     * <p>This method invokes the iteration body with the following 
parameters: 1) The 1st parameter
+     * is a list of input variable streams, which are created as the union of 
the initial variable
+     * streams and the corresponding feedback variable streams (returned by 
the iteration body). 2)
+     * The 2nd parameter is the data streams given to this method.
+     *
+     * <p>The epoch values are determined as described below. See 
IterationListener for how the
+     * epoch values are used. 1) All records in the initial variable streams 
and initial data
+     * streams has epoch=0. 2) For any record emitted by this operator into a 
non-feedback stream,
+     * the epoch of this emitted record = the epoch of the input record that 
triggers this emission.
+     * If this record is emitted by onEpochWatermarkIncremented(), then the 
epoch of this record =
+     * epochWatermark. 3) For any record emitted by this operator into a 
feedback variable stream,
+     * the epoch of the emitted record = the epoch of the input record that 
triggers this emission +
+     * 1. If this record is emitted by onEpochWatermarkIncremented(), then the 
epoch of this record
+     * = epochWatermark.

Review comment:
       Before we jump in and explain how epoch is determined, can we first give 
some context on what the epoch stands for and used for? For example:
   
   ```
   Each of the records involved in the iteration has an epoch, including the 
records in the variable stream, feedback stream, output stream and the side 
output emitted by IterationListener. The iterations use the record epoch to 
mark the progress of the iteration. Different types of iterations assign the 
epoch slightly different. 
   ```
   Also I think an example would actually help a lot. We may refer the java doc 
to the FLIP where we have diagrams that are easier to understand.

##########
File path: 
flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/Iterations.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.iteration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** A helper class to create iterations. */
+@PublicEvolving
+public class Iterations {
+
+    /**
+     * This method uses an iteration body to process records in unbounded data 
streams.
+     *
+     * <p>This method invokes the iteration body with the following 
parameters: 1) The 1st parameter
+     * is a list of input variable streams, which are created as the union of 
the initial variable
+     * streams and the corresponding feedback variable streams (returned by 
the iteration body). 2)
+     * The 2nd parameter is the data streams given to this method.

Review comment:
       It seems a little weird that we describe the implementation of the 
method in such detail. It is also somewhat confusing when we claim the argument 
(variable streams) passed to a method actually relies on the return value 
(feedback streams) of the same method.

##########
File path: 
flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/Iterations.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.iteration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** A helper class to create iterations. */
+@PublicEvolving
+public class Iterations {
+
+    /**
+     * This method uses an iteration body to process records in unbounded data 
streams.
+     *
+     * <p>This method invokes the iteration body with the following 
parameters: 1) The 1st parameter
+     * is a list of input variable streams, which are created as the union of 
the initial variable
+     * streams and the corresponding feedback variable streams (returned by 
the iteration body). 2)
+     * The 2nd parameter is the data streams given to this method.
+     *
+     * <p>The epoch values are determined as described below. See 
IterationListener for how the
+     * epoch values are used. 1) All records in the initial variable streams 
and initial data
+     * streams has epoch=0. 2) For any record emitted by this operator into a 
non-feedback stream,
+     * the epoch of this emitted record = the epoch of the input record that 
triggers this emission.
+     * If this record is emitted by onEpochWatermarkIncremented(), then the 
epoch of this record =
+     * epochWatermark. 3) For any record emitted by this operator into a 
feedback variable stream,
+     * the epoch of the emitted record = the epoch of the input record that 
triggers this emission +
+     * 1. If this record is emitted by onEpochWatermarkIncremented(), then the 
epoch of this record
+     * = epochWatermark.
+     *
+     * <p>The iteration would not terminate if at least one of its inputs is 
unbounded. Otherwise it
+     * will terminated after all the inputs are terminated and no more records 
are iterating.
+     *
+     * <p>Required: 1) The parallelism of any stream in the initial variable 
streams must equal to
+     * the parallelism of the stream at the same index of the feedback 
variable streams returned by
+     * the IterationBody.
+     *
+     * @param initVariableStreams The initial variable streams. These streams 
will be merged with
+     *     the feedback variable streams before being used as the 1st 
parameter to invoke the
+     *     iteration body.
+     * @param dataStreams The data streams. These streams will be used as the 
2nd parameter to
+     *     invoke the iteration body.
+     * @param body The computation logic which takes variable/data streams and 
returns
+     *     variable/output streams.
+     * @return The list of output streams returned by the iteration boy.
+     */
+    public static DataStreamList iterateUnboundedStreams(
+            DataStreamList initVariableStreams, DataStreamList dataStreams, 
IterationBody body) {
+        return null;
+    }
+
+    /**
+     * This method uses an iteration body to process records in some bounded 
data streams
+     * iteratively until a termination criteria is reached (e.g. the given 
number of rounds is
+     * completed or no further variable update is needed). Because this method 
does not replay
+     * records in the data streams, the iteration body needs to cache those 
records in order to
+     * visit those records repeatedly.
+     *
+     * <p>This method invokes the iteration body with the following 
parameters: 1) The 1st parameter
+     * is a list of input variable streams, which are created as the union of 
the initial variable
+     * streams and the corresponding feedback variable streams (returned by 
the iteration body). 2)
+     * The 2nd parameter is the data streams given to this method.
+     *
+     * <p>The epoch values are determined as described below. See 
IterationListener for how the
+     * epoch values are used. 1) All records in the initial variable streams 
has epoch=0. 2) All
+     * records in the data streams has epoch=0. 3) For any record emitted by 
this operator into a
+     * non-feedback stream, the epoch of this emitted record = the epoch of 
the input record that
+     * triggers this emission. If this record is emitted by 
onEpochWatermarkIncremented(), then the
+     * epoch of this record = epochWatermark. 4) For any record emitted by 
this operator into a
+     * feedback variable stream, the epoch of the emitted record = the epoch 
of the input record
+     * that triggers this emission + 1.
+     *
+     * <p>Suppose there is a coordinator operator which takes all feedback 
variable streams (emitted
+     * by the iteration body) and the termination criteria stream (if not 
null) as inputs. The
+     * execution of the graph created by the iteration body will terminate 
when all input streams
+     * have been fully consumed AND any of the following conditions is met: 1) 
The termination
+     * criteria stream is not null. And the coordinator operator has not 
observed any new value from
+     * the termination criteria stream between two consecutive 
onEpochWatermarkIncremented
+     * invocations. 2) The coordinator operator has not observed any new value 
from any feedback
+     * variable stream between two consecutive onEpochWatermarkIncremented 
invocations.
+     *
+     * <p>Required: 1) All the init variable streams and the data streams must 
be bounded. 2) The
+     * parallelism of any stream in the initial variable streams must equal 
the parallelism of the
+     * stream at the same index of the feedback variable streams returned by 
the IterationBody.
+     *
+     * @param initVariableStreams The initial variable streams. These streams 
will be merged with
+     *     the feedback variable streams before being used as the 1st 
parameter to invoke the
+     *     iteration body.
+     * @param dataStreams The data streams. These streams will be used as the 
2nd parameter to
+     *     invoke the iteration body.
+     * @param body The computation logic which takes variable/data streams and 
returns
+     *     variable/output streams.
+     * @return The list of output streams returned by the iteration boy.
+     */
+    public static DataStreamList iterateBoundedStreamsUntilTermination(
+            DataStreamList initVariableStreams, DataStreamList dataStreams, 
IterationBody body) {
+        return null;
+    }
+
+    /**
+     * This method can use an iteration body to process records in some 
bounded data streams
+     * iteratively until a termination criteria is reached (e.g. the given 
number of rounds is
+     * completed or no further variable update is needed). Because this method 
replays records in
+     * the data streams, the iteration body does not need to cache those 
records to visit those
+     * records repeatedly.
+     *
+     * <p>This method invokes the iteration body with the following 
parameters: 1) The 1st parameter
+     * is a list of input variable streams, which are created as the union of 
the initial variable
+     * streams and the corresponding feedback variable streams (returned by 
the iteration body). 2)
+     * The 2nd parameter is a list of replayed data streams, which are created 
by replaying the
+     * initial data streams round by round until the iteration terminates. The 
records in the Nth
+     * round will be emitted into the iteration body only if the low watermark 
of the first operator
+     * in the iteration body >= N - 1.
+     *
+     * <p>The epoch values are determined as described below. See 
IterationListener for how the
+     * epoch values are used. 1) All records in the initial variable streams 
has epoch=0. 2) The
+     * records from the initial data streams will be replayed round by round 
into the iteration
+     * body. The records in the first round have epoch=0. And records in the 
Nth round have epoch =
+     * N. 3) For any record emitted by this operator into a non-feedback 
stream, the epoch of this
+     * emitted record = the epoch of the input record that triggers this 
emission. If this record is
+     * emitted by onEpochWatermarkIncremented(), then the epoch of this record 
= epochWatermark. 4)
+     * For any record emitted by this operator into a feedback stream, the 
epoch of the emitted
+     * record = the epoch of the input record that triggers this emission + 1.
+     *
+     * <p>Suppose there is a coordinator operator which takes all feedback 
variable streams (emitted
+     * by the iteration body) and the termination criteria stream (if not 
null) as inputs. The
+     * execution of the graph created by the iteration body will terminate 
when all input streams
+     * have been fully consumed AND any of the following conditions is met: 1) 
The termination
+     * criteria stream is not null. And the coordinator operator has not 
observed any new value from
+     * the termination criteria stream between two consecutive 
onEpochWatermarkIncremented
+     * invocations. 2) The coordinator operator has not observed any new value 
from any feedback
+     * variable stream between two consecutive onEpochWatermarkIncremented 
invocations.
+     *
+     * <p>Required: 1) All the init variable streams and the data streams must 
be bounded. 2) The
+     * parallelism of any stream in the initial variable streams must equal 
the parallelism of the
+     * stream at the same index of the feedback variable streams returned by 
the IterationBody.
+     *
+     * @param initVariableStreams The initial variable streams. These streams 
will be merged with
+     *     the feedback variable streams before being used as the 1st 
parameter to invoke the
+     *     iteration body.
+     * @param initDataStreams The initial data streams. Records from these 
streams will be
+     *     repeatedly replayed and used as the 2nd parameter to invoke the 
iteration body.
+     * @param body The computation logic which takes variable/data streams and 
returns
+     *     variable/output streams.
+     * @return The list of output streams returned by the iteration boy.
+     */
+    static DataStreamList iterateAndReplayBoundedStreamsUntilTermination(

Review comment:
       Why is this method package private? Also it seems to me that whether 
replaying the dataStreams or not is more of a configuration than a separate 
method.

##########
File path: 
flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/DataStreamList.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.iteration;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/** An utility class to maintain a list of {@link DataStream}, which might 
have different types. */
+public class DataStreamList {
+    public static DataStreamList of(DataStream<?>... streams) {
+        return new DataStreamList(Arrays.asList(streams));

Review comment:
       +1

##########
File path: 
flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/IterationBody.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.iteration;
+
+/** The builder of the subgraph that will be executed inside the iteration. */
+public interface IterationBody {
+
+    /**
+     * This method creates the graph for the iteration body. See {@link 
Iterations} for how the

Review comment:
       In general I agree with Dong. While we need an overview of the entire 
iteration abstraction somewhere. The java doc explaining the semantic of a 
particular class and method is still necessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to