becketqin commented on a change in pull request #5: URL: https://github.com/apache/flink-ml/pull/5#discussion_r719859647
########## File path: flink-ml-iteration/src/main/java/org/apache/flink/ml/iteration/Iterations.java ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.iteration; + +import org.apache.flink.annotation.PublicEvolving; + +/** A helper class to create iterations. */ +@PublicEvolving +public class Iterations { + + /** + * This method uses an iteration body to process records in unbounded data streams. + * + * <p>This method invokes the iteration body with the following parameters: 1) The 1st parameter + * is a list of input variable streams, which are created as the union of the initial variable + * streams and the corresponding feedback variable streams (returned by the iteration body). 2) + * The 2nd parameter is the data streams given to this method. + * + * <p>The epoch values are determined as described below. See IterationListener for how the + * epoch values are used. 1) All records in the initial variable streams and initial data + * streams has epoch=0. 2) For any record emitted by this operator into a non-feedback stream, + * the epoch of this emitted record = the epoch of the input record that triggers this emission. + * If this record is emitted by onEpochWatermarkIncremented(), then the epoch of this record = + * epochWatermark. 3) For any record emitted by this operator into a feedback variable stream, + * the epoch of the emitted record = the epoch of the input record that triggers this emission + + * 1. If this record is emitted by onEpochWatermarkIncremented(), then the epoch of this record + * = epochWatermark. Review comment: Before we jump in and explain how epoch is determined, can we first give some context on what the epoch stands for and used for? For example: ``` Each of the records involved in the iteration has an epoch, including the records in the variable stream, feedback stream, output stream and the side output emitted by IterationListener. The iterations use the record epoch to mark the progress of the iteration. Different types of iterations assign the epoch slightly different. ``` Also I think an example would actually help a lot. We may refer the java doc to the FLIP where we have diagrams that are easier to understand. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
