becketqin commented on a change in pull request #5:
URL: https://github.com/apache/flink-ml/pull/5#discussion_r719903195



##########
File path: 
flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/Iterations.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.iteration;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** A helper class to create iterations. */
+@PublicEvolving
+public class Iterations {
+
+    /**
+     * This method uses an iteration body to process records in unbounded data 
streams.
+     *
+     * <p>This method invokes the iteration body with the following 
parameters: 1) The 1st parameter
+     * is a list of input variable streams, which are created as the union of 
the initial variable
+     * streams and the corresponding feedback variable streams (returned by 
the iteration body). 2)
+     * The 2nd parameter is the data streams given to this method.
+     *
+     * <p>The epoch values are determined as described below. See 
IterationListener for how the
+     * epoch values are used. 1) All records in the initial variable streams 
and initial data
+     * streams has epoch=0. 2) For any record emitted by this operator into a 
non-feedback stream,
+     * the epoch of this emitted record = the epoch of the input record that 
triggers this emission.
+     * If this record is emitted by onEpochWatermarkIncremented(), then the 
epoch of this record =
+     * epochWatermark. 3) For any record emitted by this operator into a 
feedback variable stream,
+     * the epoch of the emitted record = the epoch of the input record that 
triggers this emission +
+     * 1. If this record is emitted by onEpochWatermarkIncremented(), then the 
epoch of this record
+     * = epochWatermark.
+     *
+     * <p>The iteration would not terminate if at least one of its inputs is 
unbounded. Otherwise it
+     * will terminated after all the inputs are terminated and no more records 
are iterating.
+     *
+     * <p>Required: 1) The parallelism of any stream in the initial variable 
streams must equal to
+     * the parallelism of the stream at the same index of the feedback 
variable streams returned by
+     * the IterationBody.
+     *
+     * @param initVariableStreams The initial variable streams. These streams 
will be merged with
+     *     the feedback variable streams before being used as the 1st 
parameter to invoke the
+     *     iteration body.
+     * @param dataStreams The data streams. These streams will be used as the 
2nd parameter to
+     *     invoke the iteration body.
+     * @param body The computation logic which takes variable/data streams and 
returns
+     *     variable/output streams.
+     * @return The list of output streams returned by the iteration boy.
+     */
+    public static DataStreamList iterateUnboundedStreams(
+            DataStreamList initVariableStreams, DataStreamList dataStreams, 
IterationBody body) {
+        return null;
+    }
+
+    /**
+     * This method uses an iteration body to process records in some bounded 
data streams
+     * iteratively until a termination criteria is reached (e.g. the given 
number of rounds is
+     * completed or no further variable update is needed). Because this method 
does not replay
+     * records in the data streams, the iteration body needs to cache those 
records in order to
+     * visit those records repeatedly.
+     *
+     * <p>This method invokes the iteration body with the following 
parameters: 1) The 1st parameter
+     * is a list of input variable streams, which are created as the union of 
the initial variable
+     * streams and the corresponding feedback variable streams (returned by 
the iteration body). 2)
+     * The 2nd parameter is the data streams given to this method.
+     *
+     * <p>The epoch values are determined as described below. See 
IterationListener for how the
+     * epoch values are used. 1) All records in the initial variable streams 
has epoch=0. 2) All
+     * records in the data streams has epoch=0. 3) For any record emitted by 
this operator into a
+     * non-feedback stream, the epoch of this emitted record = the epoch of 
the input record that

Review comment:
       Also, will there be cases, where there are multiple dataStreams and 
users only replay some of them. Should we provide a finer granularity on the 
dataStream replay?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to