zhipeng93 commented on code in PR #192:
URL: https://github.com/apache/flink-ml/pull/192#discussion_r1105746948


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation.swing;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.iteration.operator.OperatorStateUtils;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * An AlgoOperator which implements the Swing algorithm.
+ *
+ * <p>Swing is an item recall algorithm. The topology of user-item graph 
usually can be described as
+ * user-item-user or item-user-item, which are like 'swing'. For example, if 
both user <em>u</em> and user <em>v</em>
+ * have purchased the same commodity <em>i</em> , they will form a 
relationship diagram similar to a swing. If

Review Comment:
   nit: there is a redundant space before `, they will...`



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation.swing;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.iteration.operator.OperatorStateUtils;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * An AlgoOperator which implements the Swing algorithm.
+ *
+ * <p>Swing is an item recall algorithm. The topology of user-item graph 
usually can be described as
+ * user-item-user or item-user-item, which are like 'swing'. For example, if 
both user <em>u</em> and user <em>v</em>
+ * have purchased the same commodity <em>i</em> , they will form a 
relationship diagram similar to a swing. If
+ * <em>u</em> and <em>v</em> have purchased commodity <em>j</em> in addition 
to <em>i</em>, it is supposed <em>i</em>
+ * and <em>j</em> are similar. The similarity between items in Swing is 
defined as
+ *
+ * <p>$$ w_{(i,j)}=\sum_{u\in U_i\cap U_j}\sum_{v\in U_i\cap
+ * 
U_j}{\frac{1}{{(I_u+\alpha_1)}^\beta}}*{\frac{1}{{(I_v+\alpha_1)}^\beta}}*{\frac{1}{\alpha_2+|I_u\cap
 I_v|}} $$
+ *
+ * <p>This implementation is based on the algorithm proposed in the paper: 
"Large Scale Product

Review Comment:
   Let's simplify the description here.
   
   `See "<a href="https://arxiv.org/pdf/2010.05525.pdf";>Large Scale Product 
Graph Construction for Recommendation in E-commerce</a>" by Xiaoyong Yang, 
Yadong Zhu and Yi Zhang.`



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation.swing;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.iteration.operator.OperatorStateUtils;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * An AlgoOperator which implements the Swing algorithm.
+ *
+ * <p>Swing is an item recall algorithm. The topology of user-item graph 
usually can be described as
+ * user-item-user or item-user-item, which are like 'swing'. For example, if 
both user <em>u</em> and user <em>v</em>
+ * have purchased the same commodity <em>i</em> , they will form a 
relationship diagram similar to a swing. If
+ * <em>u</em> and <em>v</em> have purchased commodity <em>j</em> in addition 
to <em>i</em>, it is supposed <em>i</em>
+ * and <em>j</em> are similar. The similarity between items in Swing is 
defined as
+ *
+ * <p>$$ w_{(i,j)}=\sum_{u\in U_i\cap U_j}\sum_{v\in U_i\cap
+ * 
U_j}{\frac{1}{{(I_u+\alpha_1)}^\beta}}*{\frac{1}{{(I_v+\alpha_1)}^\beta}}*{\frac{1}{\alpha_2+|I_u\cap
 I_v|}} $$
+ *
+ * <p>This implementation is based on the algorithm proposed in the paper: 
"Large Scale Product
+ * Graph Construction for Recommendation in E-commerce" by Xiaoyong Yang, 
Yadong Zhu and Yi Zhang. (<a
+ * 
href="https://arxiv.org/pdf/2010.05525.pdf";>https://arxiv.org/pdf/2010.05525.pdf</a>)
+ */
+public class Swing implements AlgoOperator<Swing>, SwingParams<Swing> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public Swing() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        final String userCol = getUserCol();
+        final String itemCol = getItemCol();
+        final ResolvedSchema schema = inputs[0].getResolvedSchema();
+
+        if (!(Types.LONG.equals(TableUtils.getTypeInfoByName(schema, userCol))
+                && Types.LONG.equals(TableUtils.getTypeInfoByName(schema, 
itemCol)))) {
+            throw new IllegalArgumentException("The types of user and item 
columns must be Long.");
+        }
+
+        if (getMaxUserBehavior() < getMinUserBehavior()) {
+            throw new IllegalArgumentException(
+                    String.format("The maxUserBehavior must be larger or equal 
to minUserBehavior. "
+                                    + "The current value: maxUserBehavior=%d, 
minUserBehavior=%d.",
+                            getMaxUserBehavior(),
+                            getMinUserBehavior())
+            );
+        }
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+
+        SingleOutputStreamOperator<Tuple2<Long, Long>> purchasingBehavior =
+                tEnv.toDataStream(inputs[0])
+                        .map(
+                                row -> {
+                                    Long userId = (Long) 
row.getFieldAs(userCol);
+                                    Long itemId = (Long) 
row.getFieldAs(itemCol);
+                                    if (userId == null || itemId == null) {
+                                        throw new RuntimeException(
+                                                "Data of user and item column 
must not be null.");

Review Comment:
   nit: How about updating the error message as `User or item column must not 
be null.`?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation.swing;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.iteration.operator.OperatorStateUtils;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * An AlgoOperator which implements the Swing algorithm.
+ *
+ * <p>Swing is an item recall algorithm. The topology of user-item graph 
usually can be described as
+ * user-item-user or item-user-item, which are like 'swing'. For example, if 
both user <em>u</em> and user <em>v</em>
+ * have purchased the same commodity <em>i</em> , they will form a 
relationship diagram similar to a swing. If
+ * <em>u</em> and <em>v</em> have purchased commodity <em>j</em> in addition 
to <em>i</em>, it is supposed <em>i</em>
+ * and <em>j</em> are similar. The similarity between items in Swing is 
defined as
+ *
+ * <p>$$ w_{(i,j)}=\sum_{u\in U_i\cap U_j}\sum_{v\in U_i\cap
+ * 
U_j}{\frac{1}{{(I_u+\alpha_1)}^\beta}}*{\frac{1}{{(I_v+\alpha_1)}^\beta}}*{\frac{1}{\alpha_2+|I_u\cap
 I_v|}} $$
+ *
+ * <p>This implementation is based on the algorithm proposed in the paper: 
"Large Scale Product
+ * Graph Construction for Recommendation in E-commerce" by Xiaoyong Yang, 
Yadong Zhu and Yi Zhang. (<a
+ * 
href="https://arxiv.org/pdf/2010.05525.pdf";>https://arxiv.org/pdf/2010.05525.pdf</a>)
+ */
+public class Swing implements AlgoOperator<Swing>, SwingParams<Swing> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public Swing() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        final String userCol = getUserCol();
+        final String itemCol = getItemCol();
+        final ResolvedSchema schema = inputs[0].getResolvedSchema();
+
+        if (!(Types.LONG.equals(TableUtils.getTypeInfoByName(schema, userCol))
+                && Types.LONG.equals(TableUtils.getTypeInfoByName(schema, 
itemCol)))) {
+            throw new IllegalArgumentException("The types of user and item 
columns must be Long.");
+        }
+
+        if (getMaxUserBehavior() < getMinUserBehavior()) {
+            throw new IllegalArgumentException(
+                    String.format("The maxUserBehavior must be larger or equal 
to minUserBehavior. "
+                                    + "The current value: maxUserBehavior=%d, 
minUserBehavior=%d.",
+                            getMaxUserBehavior(),
+                            getMinUserBehavior())
+            );
+        }
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+
+        SingleOutputStreamOperator<Tuple2<Long, Long>> purchasingBehavior =
+                tEnv.toDataStream(inputs[0])
+                        .map(
+                                row -> {
+                                    Long userId = (Long) 
row.getFieldAs(userCol);
+                                    Long itemId = (Long) 
row.getFieldAs(itemCol);
+                                    if (userId == null || itemId == null) {
+                                        throw new RuntimeException(
+                                                "Data of user and item column 
must not be null.");
+                                    }
+                                    return Tuple2.of(userId, itemId);
+                                })
+                        .returns(Types.TUPLE(Types.LONG, Types.LONG));
+
+        SingleOutputStreamOperator<Tuple3<Long, Long, Map<Long, String>>> 
userBehavior =
+                purchasingBehavior
+                        .keyBy(tuple -> tuple.f0)
+                        .transform(
+                                "collectingUserBehavior",
+                                Types.TUPLE(
+                                        Types.LONG,
+                                        Types.LONG,
+                                        Types.MAP(Types.LONG, Types.STRING)),
+                                new CollectingUserBehavior(
+                                        getMinUserBehavior(), 
getMaxUserBehavior()));
+
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        new TypeInformation[] {Types.LONG, Types.STRING},
+                        new String[] {getItemCol(), getOutputCol()});
+
+        DataStream<Row> output =
+                userBehavior
+                        .keyBy(tuple -> tuple.f1)
+                        .transform(
+                                "computingSimilarItems",
+                                outputTypeInfo,
+                                new ComputingSimilarItems(
+                                        getK(),
+                                        getMaxUserNumPerItem(),
+                                        getAlpha1(),
+                                        getAlpha2(),
+                                        getBeta()));
+
+        return new Table[] {tEnv.fromDataStream(output)};
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+    }
+
+    public static Swing load(StreamTableEnvironment tEnv, String path) throws 
IOException {
+        return ReadWriteUtils.loadStageParam(path);
+    }
+
+    /**
+     * Collects user behavior data and appends to the input table.
+     *
+     * <p>During the process, this operator collects users and all items 
he/she has purchased, and
+     * its input table must be bounded. Because Flink doesn't support type 
info of `Set` officially, The appended column
+     * is `Map` contains items as key and maps null value.
+     */
+    private static class CollectingUserBehavior
+            extends AbstractStreamOperator<Tuple3<Long, Long, Map<Long, 
String>>>
+            implements OneInputStreamOperator<
+            Tuple2<Long, Long>, Tuple3<Long, Long, Map<Long, String>>>,
+            BoundedOneInput {
+        private final int minUserItemInteraction;
+        private final int maxUserItemInteraction;
+
+        // Maps a user id to a set of items. Because ListState cannot keep 
values of type `Set`,
+        // we use `Map<Long, String>` with null values instead. So does 
`userItemsMap` and
+        // `itemUsersMap` in `ComputingSimilarItems`.
+        private Map<Long, Map<Long, String>> userAndPurchasedItems = new 
HashMap<>();
+
+        private ListState<Map<Long, Map<Long, String>>> 
userAndPurchasedItemsState;
+
+        private CollectingUserBehavior(int minUserItemInteraction, int 
maxUserItemInteraction) {
+            this.minUserItemInteraction = minUserItemInteraction;
+            this.maxUserItemInteraction = maxUserItemInteraction;
+        }
+
+        @Override
+        public void endInput() {
+
+            userAndPurchasedItems.forEach(
+                    (user, items) -> {
+                        if (items.size() >= minUserItemInteraction
+                                && items.size() <= maxUserItemInteraction) {
+                            items.forEach(
+                                    (item, nullValue) ->
+                                            output.collect(
+                                                    new StreamRecord<>(
+                                                            new Tuple3<>(user, 
item, items))));
+                        }
+                    });
+
+            userAndPurchasedItemsState.clear();
+        }
+
+        @Override
+        public void processElement(StreamRecord<Tuple2<Long, Long>> element) {
+            Tuple2<Long, Long> userAndItem = element.getValue();
+            long user = userAndItem.f0;
+            long item = userAndItem.f1;
+            Map<Long, String> items = userAndPurchasedItems.get(user);
+
+            if (items == null) {
+                items = new LinkedHashMap<>();
+                userAndPurchasedItems.putIfAbsent(user, items);

Review Comment:
   Is there any case that `user` does exist in the map? If no, we probably 
should use `put()`.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/recommendation/SwingTest.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.recommendation.swing.Swing;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests {@link Swing}.
+ */
+public class SwingTest {
+    @Rule
+    public final TemporaryFolder tempFolder = new TemporaryFolder();
+    private StreamExecutionEnvironment env;
+    private StreamTableEnvironment tEnv;
+    private Table trainData;
+
+    @Before
+    public void before() {
+        env = TestUtils.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+        List<Row> trainRows =
+                new ArrayList<>(
+                        Arrays.asList(
+                                Row.of(0L, 10L),
+                                Row.of(0L, 11L),
+                                Row.of(0L, 12L),
+                                Row.of(1L, 13L),
+                                Row.of(1L, 12L),
+                                Row.of(2L, 10L),
+                                Row.of(2L, 11L),
+                                Row.of(2L, 12L),
+                                Row.of(3L, 13L),
+                                Row.of(3L, 12L),
+                                Row.of(4L, 12L),
+                                Row.of(4L, 10L),
+                                Row.of(4L, 11L),
+                                Row.of(4L, 12L),
+                                Row.of(4L, 13L)));
+        trainData =
+                tEnv.fromDataStream(
+                        env.fromCollection(
+                                trainRows,
+                                new RowTypeInfo(
+                                        new TypeInformation[] {
+                                                BasicTypeInfo.LONG_TYPE_INFO,
+                                                BasicTypeInfo.LONG_TYPE_INFO
+                                        },
+                                        new String[] {"user_id", "item_id"})));
+    }
+
+    private void compareResultAndExpected(List<Row> results) {
+        List<Row> expectedScoreRows =
+                new ArrayList<>(
+                        Arrays.asList(
+                                Row.of(10L, 
"11,0.058845768947156235;12,0.058845768947156235"),
+                                Row.of(11L, 
"10,0.058845768947156235;12,0.058845768947156235"),
+                                Row.of(
+                                        12L,
+                                        
"13,0.09134833828228624;10,0.058845768947156235;11,0.058845768947156235"),
+                                Row.of(13L, "12,0.09134833828228624")));
+
+        results.sort((o1, o2) -> Long.compare(o1.getFieldAs(0), 
o2.getFieldAs(0)));

Review Comment:
   nit: This could be simplified as `results.sort(Comparator.comparingLong(o -> 
o.getFieldAs(0)));`



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation.swing;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.iteration.operator.OperatorStateUtils;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * An AlgoOperator which implements the Swing algorithm.
+ *
+ * <p>Swing is an item recall algorithm. The topology of user-item graph 
usually can be described as
+ * user-item-user or item-user-item, which are like 'swing'. For example, if 
both user <em>u</em> and user <em>v</em>
+ * have purchased the same commodity <em>i</em> , they will form a 
relationship diagram similar to a swing. If
+ * <em>u</em> and <em>v</em> have purchased commodity <em>j</em> in addition 
to <em>i</em>, it is supposed <em>i</em>
+ * and <em>j</em> are similar. The similarity between items in Swing is 
defined as
+ *
+ * <p>$$ w_{(i,j)}=\sum_{u\in U_i\cap U_j}\sum_{v\in U_i\cap
+ * 
U_j}{\frac{1}{{(I_u+\alpha_1)}^\beta}}*{\frac{1}{{(I_v+\alpha_1)}^\beta}}*{\frac{1}{\alpha_2+|I_u\cap
 I_v|}} $$
+ *
+ * <p>This implementation is based on the algorithm proposed in the paper: 
"Large Scale Product
+ * Graph Construction for Recommendation in E-commerce" by Xiaoyong Yang, 
Yadong Zhu and Yi Zhang. (<a
+ * 
href="https://arxiv.org/pdf/2010.05525.pdf";>https://arxiv.org/pdf/2010.05525.pdf</a>)
+ */
+public class Swing implements AlgoOperator<Swing>, SwingParams<Swing> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public Swing() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        final String userCol = getUserCol();
+        final String itemCol = getItemCol();
+        final ResolvedSchema schema = inputs[0].getResolvedSchema();
+
+        if (!(Types.LONG.equals(TableUtils.getTypeInfoByName(schema, userCol))
+                && Types.LONG.equals(TableUtils.getTypeInfoByName(schema, 
itemCol)))) {
+            throw new IllegalArgumentException("The types of user and item 
columns must be Long.");
+        }
+
+        if (getMaxUserBehavior() < getMinUserBehavior()) {
+            throw new IllegalArgumentException(
+                    String.format("The maxUserBehavior must be larger or equal 
to minUserBehavior. "
+                                    + "The current value: maxUserBehavior=%d, 
minUserBehavior=%d.",
+                            getMaxUserBehavior(),
+                            getMinUserBehavior())
+            );
+        }
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+
+        SingleOutputStreamOperator<Tuple2<Long, Long>> purchasingBehavior =
+                tEnv.toDataStream(inputs[0])
+                        .map(
+                                row -> {
+                                    Long userId = (Long) 
row.getFieldAs(userCol);
+                                    Long itemId = (Long) 
row.getFieldAs(itemCol);
+                                    if (userId == null || itemId == null) {
+                                        throw new RuntimeException(
+                                                "Data of user and item column 
must not be null.");
+                                    }
+                                    return Tuple2.of(userId, itemId);
+                                })
+                        .returns(Types.TUPLE(Types.LONG, Types.LONG));
+
+        SingleOutputStreamOperator<Tuple3<Long, Long, Map<Long, String>>> 
userBehavior =
+                purchasingBehavior
+                        .keyBy(tuple -> tuple.f0)
+                        .transform(
+                                "collectingUserBehavior",
+                                Types.TUPLE(
+                                        Types.LONG,
+                                        Types.LONG,
+                                        Types.MAP(Types.LONG, Types.STRING)),
+                                new CollectingUserBehavior(
+                                        getMinUserBehavior(), 
getMaxUserBehavior()));
+
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        new TypeInformation[] {Types.LONG, Types.STRING},
+                        new String[] {getItemCol(), getOutputCol()});
+
+        DataStream<Row> output =
+                userBehavior
+                        .keyBy(tuple -> tuple.f1)
+                        .transform(
+                                "computingSimilarItems",
+                                outputTypeInfo,
+                                new ComputingSimilarItems(
+                                        getK(),
+                                        getMaxUserNumPerItem(),
+                                        getAlpha1(),
+                                        getAlpha2(),
+                                        getBeta()));
+
+        return new Table[] {tEnv.fromDataStream(output)};
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+    }
+
+    public static Swing load(StreamTableEnvironment tEnv, String path) throws 
IOException {
+        return ReadWriteUtils.loadStageParam(path);
+    }
+
+    /**
+     * Collects user behavior data and appends to the input table.
+     *
+     * <p>During the process, this operator collects users and all items 
he/she has purchased, and
+     * its input table must be bounded. Because Flink doesn't support type 
info of `Set` officially, The appended column
+     * is `Map` contains items as key and maps null value.
+     */
+    private static class CollectingUserBehavior
+            extends AbstractStreamOperator<Tuple3<Long, Long, Map<Long, 
String>>>
+            implements OneInputStreamOperator<
+            Tuple2<Long, Long>, Tuple3<Long, Long, Map<Long, String>>>,
+            BoundedOneInput {
+        private final int minUserItemInteraction;
+        private final int maxUserItemInteraction;
+
+        // Maps a user id to a set of items. Because ListState cannot keep 
values of type `Set`,
+        // we use `Map<Long, String>` with null values instead. So does 
`userItemsMap` and
+        // `itemUsersMap` in `ComputingSimilarItems`.
+        private Map<Long, Map<Long, String>> userAndPurchasedItems = new 
HashMap<>();
+
+        private ListState<Map<Long, Map<Long, String>>> 
userAndPurchasedItemsState;
+
+        private CollectingUserBehavior(int minUserItemInteraction, int 
maxUserItemInteraction) {
+            this.minUserItemInteraction = minUserItemInteraction;
+            this.maxUserItemInteraction = maxUserItemInteraction;
+        }
+
+        @Override
+        public void endInput() {
+
+            userAndPurchasedItems.forEach(
+                    (user, items) -> {
+                        if (items.size() >= minUserItemInteraction
+                                && items.size() <= maxUserItemInteraction) {
+                            items.forEach(
+                                    (item, nullValue) ->
+                                            output.collect(
+                                                    new StreamRecord<>(
+                                                            new Tuple3<>(user, 
item, items))));
+                        }
+                    });
+
+            userAndPurchasedItemsState.clear();
+        }
+
+        @Override
+        public void processElement(StreamRecord<Tuple2<Long, Long>> element) {
+            Tuple2<Long, Long> userAndItem = element.getValue();
+            long user = userAndItem.f0;
+            long item = userAndItem.f1;
+            Map<Long, String> items = userAndPurchasedItems.get(user);
+
+            if (items == null) {
+                items = new LinkedHashMap<>();
+                userAndPurchasedItems.putIfAbsent(user, items);
+            }
+
+            if (items.size() <= maxUserItemInteraction) {
+                items.put(item, null);
+            }
+        }
+
+        @Override
+        public void initializeState(StateInitializationContext context) throws 
Exception {
+            super.initializeState(context);
+            userAndPurchasedItemsState =
+                    context.getOperatorStateStore()
+                            .getListState(
+                                    new ListStateDescriptor<>(
+                                            "userAndPurchasedItemsState",
+                                            Types.MAP(
+                                                    Types.LONG,
+                                                    Types.MAP(Types.LONG, 
Types.STRING))));
+
+            OperatorStateUtils.getUniqueElement(userAndPurchasedItemsState, 
"userAndPurchasedItemsState")
+                    .ifPresent(
+                            stat -> {
+                                userAndPurchasedItems = stat;
+                            });
+        }
+
+        @Override
+        public void snapshotState(StateSnapshotContext context) throws 
Exception {
+            super.snapshotState(context);
+            
userAndPurchasedItemsState.update(Collections.singletonList(userAndPurchasedItems));
+        }
+    }
+
+    /**
+     * Calculates similarity between items and keep top k similar items of 
each target item.
+     */
+    private static class ComputingSimilarItems extends 
AbstractStreamOperator<Row>
+            implements OneInputStreamOperator<Tuple3<Long, Long, Map<Long, 
String>>, Row>,
+            BoundedOneInput {
+
+        private Map<Long, Map<Long, String>> userAndPurchasedItems = new 
HashMap<>();
+        private Map<Long, Map<Long, String>> itemAndPurchasers = new 
HashMap<>();
+        private ListState<Map<Long, Map<Long, String>>> 
userAndPurchasedItemsState;
+        private ListState<Map<Long, Map<Long, String>>> itemAndPurchasersState;
+
+        private final int k;
+        private final int maxUserNumPerItem;
+
+        /**
+         * Alpha1 and alpha2 are integers larger or equal to zero. Though they 
by definition should be greater than

Review Comment:
   Do you think we should explain this in the java doc of the `Swing` class? 
   
   Because reader may get interested in the behavior of `Swing` when `alpha2` 
is zero, without reading the implementation here.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/SwingParams.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation.swing;
+
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.DoubleParam;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.StringParam;
+import org.apache.flink.ml.param.WithParams;
+
+/**
+ * Params for {@link Swing}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface SwingParams<T> extends WithParams<T>, HasOutputCol<T> {
+    Param<String> USER_COL =
+            new StringParam(
+                    "userCol",
+                    "User column name. The type of user column must be Long.",
+                    "user",
+                    ParamValidators.notNull());
+
+    Param<String> ITEM_COL =
+            new StringParam(
+                    "itemCol",
+                    "Item column name. The type of item column must be Long.",
+                    "item",
+                    ParamValidators.notNull());
+
+    Param<Integer> MAX_USER_NUM_PER_ITEM =
+            new IntParam(
+                    "maxUserNumPerItem",
+                    "The max number of user(purchasers) for each item. If the 
number of user "
+                            + "is larger than this value, then only 
maxUserNumPerItem users will "
+                            + "be sampled and considered in the computation of 
similarity between two items.",
+                    1000,
+                    ParamValidators.gt(0));
+
+    Param<Integer> K =
+            new IntParam(
+                    "k",
+                    "The max number of similar items to output for each item. 
If an item has "

Review Comment:
   `If an item has...` seems an explanation of `The max number of similar items 
to output for each item.`. 
   Do you think it provides more information?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/SwingParams.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation.swing;
+
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.DoubleParam;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.StringParam;
+import org.apache.flink.ml.param.WithParams;
+
+/**
+ * Params for {@link Swing}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface SwingParams<T> extends WithParams<T>, HasOutputCol<T> {
+    Param<String> USER_COL =
+            new StringParam(
+                    "userCol",
+                    "User column name. The type of user column must be Long.",
+                    "user",
+                    ParamValidators.notNull());
+
+    Param<String> ITEM_COL =
+            new StringParam(
+                    "itemCol",
+                    "Item column name. The type of item column must be Long.",
+                    "item",
+                    ParamValidators.notNull());
+
+    Param<Integer> MAX_USER_NUM_PER_ITEM =
+            new IntParam(
+                    "maxUserNumPerItem",
+                    "The max number of user(purchasers) for each item. If the 
number of user "
+                            + "is larger than this value, then only 
maxUserNumPerItem users will "
+                            + "be sampled and considered in the computation of 
similarity between two items.",
+                    1000,
+                    ParamValidators.gt(0));
+
+    Param<Integer> K =
+            new IntParam(
+                    "k",
+                    "The max number of similar items to output for each item. 
If an item has "
+                            + "more than k recommendations, the first k 
similar items will be kept",
+                    100,
+                    ParamValidators.gt(0));
+
+    Param<Integer> MIN_USER_BEHAVIOR =
+            new IntParam(
+                    "minUserBehavior",
+                    "The min number of items for a user purchases. If the 
items purchased by a user is smaller than "

Review Comment:
   It is not a `fit` process. Can you replace the `fit` here?
   
   `This can affect the speed of the computation. The best value depends on the 
nature of the problem.` This sentence does not introduce extra accurate 
information --- It is hard to tell how would this parameter affect performance 
and accuracy, and thus cannot guide users. Moreover, it is unclear what is 
`nature` of the probem.
   
   So I would just suggest remove it here. I am also fine if you can make it 
more accurate.
   
   Same for MAX_USER_BEHAVIOR.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation.swing;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.iteration.operator.OperatorStateUtils;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * An AlgoOperator which implements the Swing algorithm.
+ *
+ * <p>Swing is an item recall algorithm. The topology of user-item graph 
usually can be described as
+ * user-item-user or item-user-item, which are like 'swing'. For example, if 
both user <em>u</em> and user <em>v</em>
+ * have purchased the same commodity <em>i</em> , they will form a 
relationship diagram similar to a swing. If
+ * <em>u</em> and <em>v</em> have purchased commodity <em>j</em> in addition 
to <em>i</em>, it is supposed <em>i</em>
+ * and <em>j</em> are similar. The similarity between items in Swing is 
defined as
+ *
+ * <p>$$ w_{(i,j)}=\sum_{u\in U_i\cap U_j}\sum_{v\in U_i\cap
+ * 
U_j}{\frac{1}{{(I_u+\alpha_1)}^\beta}}*{\frac{1}{{(I_v+\alpha_1)}^\beta}}*{\frac{1}{\alpha_2+|I_u\cap
 I_v|}} $$
+ *
+ * <p>This implementation is based on the algorithm proposed in the paper: 
"Large Scale Product
+ * Graph Construction for Recommendation in E-commerce" by Xiaoyong Yang, 
Yadong Zhu and Yi Zhang. (<a
+ * 
href="https://arxiv.org/pdf/2010.05525.pdf";>https://arxiv.org/pdf/2010.05525.pdf</a>)
+ */
+public class Swing implements AlgoOperator<Swing>, SwingParams<Swing> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public Swing() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        final String userCol = getUserCol();
+        final String itemCol = getItemCol();
+        final ResolvedSchema schema = inputs[0].getResolvedSchema();
+
+        if (!(Types.LONG.equals(TableUtils.getTypeInfoByName(schema, userCol))
+                && Types.LONG.equals(TableUtils.getTypeInfoByName(schema, 
itemCol)))) {
+            throw new IllegalArgumentException("The types of user and item 
columns must be Long.");
+        }
+
+        if (getMaxUserBehavior() < getMinUserBehavior()) {
+            throw new IllegalArgumentException(
+                    String.format("The maxUserBehavior must be larger or equal 
to minUserBehavior. "
+                                    + "The current value: maxUserBehavior=%d, 
minUserBehavior=%d.",
+                            getMaxUserBehavior(),
+                            getMinUserBehavior())
+            );
+        }
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+
+        SingleOutputStreamOperator<Tuple2<Long, Long>> purchasingBehavior =
+                tEnv.toDataStream(inputs[0])
+                        .map(
+                                row -> {
+                                    Long userId = (Long) 
row.getFieldAs(userCol);
+                                    Long itemId = (Long) 
row.getFieldAs(itemCol);
+                                    if (userId == null || itemId == null) {
+                                        throw new RuntimeException(
+                                                "Data of user and item column 
must not be null.");
+                                    }
+                                    return Tuple2.of(userId, itemId);
+                                })
+                        .returns(Types.TUPLE(Types.LONG, Types.LONG));
+
+        SingleOutputStreamOperator<Tuple3<Long, Long, Map<Long, String>>> 
userBehavior =
+                purchasingBehavior
+                        .keyBy(tuple -> tuple.f0)
+                        .transform(
+                                "collectingUserBehavior",
+                                Types.TUPLE(
+                                        Types.LONG,
+                                        Types.LONG,
+                                        Types.MAP(Types.LONG, Types.STRING)),
+                                new CollectingUserBehavior(
+                                        getMinUserBehavior(), 
getMaxUserBehavior()));
+
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        new TypeInformation[] {Types.LONG, Types.STRING},
+                        new String[] {getItemCol(), getOutputCol()});
+
+        DataStream<Row> output =
+                userBehavior
+                        .keyBy(tuple -> tuple.f1)
+                        .transform(
+                                "computingSimilarItems",
+                                outputTypeInfo,
+                                new ComputingSimilarItems(
+                                        getK(),
+                                        getMaxUserNumPerItem(),
+                                        getAlpha1(),
+                                        getAlpha2(),
+                                        getBeta()));
+
+        return new Table[] {tEnv.fromDataStream(output)};
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+    }
+
+    public static Swing load(StreamTableEnvironment tEnv, String path) throws 
IOException {
+        return ReadWriteUtils.loadStageParam(path);
+    }
+
+    /**
+     * Collects user behavior data and appends to the input table.
+     *
+     * <p>During the process, this operator collects users and all items 
he/she has purchased, and
+     * its input table must be bounded. Because Flink doesn't support type 
info of `Set` officially, The appended column
+     * is `Map` contains items as key and maps null value.
+     */
+    private static class CollectingUserBehavior
+            extends AbstractStreamOperator<Tuple3<Long, Long, Map<Long, 
String>>>
+            implements OneInputStreamOperator<
+            Tuple2<Long, Long>, Tuple3<Long, Long, Map<Long, String>>>,
+            BoundedOneInput {
+        private final int minUserItemInteraction;
+        private final int maxUserItemInteraction;
+
+        // Maps a user id to a set of items. Because ListState cannot keep 
values of type `Set`,
+        // we use `Map<Long, String>` with null values instead. So does 
`userItemsMap` and
+        // `itemUsersMap` in `ComputingSimilarItems`.

Review Comment:
   nit: `userItemsMap` and `itemUsersMap` needs to be updated. It seems 
inconsistent with existing code.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/SwingParams.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation.swing;
+
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.DoubleParam;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.StringParam;
+import org.apache.flink.ml.param.WithParams;
+
+/**
+ * Params for {@link Swing}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface SwingParams<T> extends WithParams<T>, HasOutputCol<T> {
+    Param<String> USER_COL =
+            new StringParam(
+                    "userCol",
+                    "User column name. The type of user column must be Long.",

Review Comment:
   We usually do not explain the type of a column when describing the param. 
Let's follow the existing convention.
   
   Same for ITEM_COL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to