lindong28 commented on code in PR #230: URL: https://github.com/apache/flink-ml/pull/230#discussion_r1163453277
########## flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/sort/CoGroupOperator.java: ########## @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.common.datastream.sort; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.CoGroupFunction; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory; +import org.apache.flink.configuration.AlgorithmOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.sort.ExternalSorter; +import org.apache.flink.runtime.operators.sort.NonReusingSortMergeCoGroupIterator; +import org.apache.flink.runtime.operators.sort.PushSorter; +import org.apache.flink.runtime.operators.sort.ReusingSortMergeCoGroupIterator; +import org.apache.flink.runtime.operators.util.CoGroupTaskIterator; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedMultiInput; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.TimestampedCollector; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.MutableObjectIterator; +import org.apache.flink.util.TraversableOnceException; + +import java.io.Serializable; +import java.util.Iterator; + +/** + * An operator that implements the co-group logic. + * + * @param <IN1> The class type of the first input. + * @param <IN2> The class type of the second input. + * @param <KEY> The class type of the key. + * @param <OUT> The class type of the output values. + */ +public class CoGroupOperator<IN1, IN2, KEY extends Serializable, OUT> + extends AbstractUdfStreamOperator<OUT, CoGroupFunction<IN1, IN2, OUT>> + implements TwoInputStreamOperator<IN1, IN2, OUT>, BoundedMultiInput { + + private PushSorter<Tuple2<byte[], StreamRecord<IN1>>> sorterA; Review Comment: The main reason is to reduce the amount of work in this PR that will be discarded in the future. In order to change the type from `StreamRecord<IN1>` to `IN1`, we would need to update the classes `BytesKeyNormalizationUtil`, `FixedLengthByteKeyComparator`, `KeyAndValueSerializer` and `VariableLengthByteKeyComparator` so that they don't rely on `StreamRecord`. The above four classes are copied and pasted from apache/flink. And we will likely remove them from Flink ML in the future after we optimize the cogroup API in apache/flink. And I am not sure the amount of work described above will be useful later. Given that the overhead of DataStream is roughly 12.3% more than than DataSet, and the performance difference will likely be negligible for non-trivial algorithms such as ALS, I find it reasonable to just use the StreamRecord here as the short term approach and spend time on the long term approach (i.e. optimizing apache/flink). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
