zhipeng93 commented on code in PR #192: URL: https://github.com/apache/flink-ml/pull/192#discussion_r1105746948
########## flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java: ########## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.recommendation.swing; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.iteration.operator.OperatorStateUtils; +import org.apache.flink.ml.api.AlgoOperator; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * An AlgoOperator which implements the Swing algorithm. + * + * <p>Swing is an item recall algorithm. The topology of user-item graph usually can be described as + * user-item-user or item-user-item, which are like 'swing'. For example, if both user <em>u</em> and user <em>v</em> + * have purchased the same commodity <em>i</em> , they will form a relationship diagram similar to a swing. If Review Comment: nit: there is a redundant space before `, they will...` ########## flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java: ########## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.recommendation.swing; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.iteration.operator.OperatorStateUtils; +import org.apache.flink.ml.api.AlgoOperator; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * An AlgoOperator which implements the Swing algorithm. + * + * <p>Swing is an item recall algorithm. The topology of user-item graph usually can be described as + * user-item-user or item-user-item, which are like 'swing'. For example, if both user <em>u</em> and user <em>v</em> + * have purchased the same commodity <em>i</em> , they will form a relationship diagram similar to a swing. If + * <em>u</em> and <em>v</em> have purchased commodity <em>j</em> in addition to <em>i</em>, it is supposed <em>i</em> + * and <em>j</em> are similar. The similarity between items in Swing is defined as + * + * <p>$$ w_{(i,j)}=\sum_{u\in U_i\cap U_j}\sum_{v\in U_i\cap + * U_j}{\frac{1}{{(I_u+\alpha_1)}^\beta}}*{\frac{1}{{(I_v+\alpha_1)}^\beta}}*{\frac{1}{\alpha_2+|I_u\cap I_v|}} $$ + * + * <p>This implementation is based on the algorithm proposed in the paper: "Large Scale Product Review Comment: Let's simplify the description here. `See "<a href="https://arxiv.org/pdf/2010.05525.pdf">Large Scale Product Graph Construction for Recommendation in E-commerce</a>" by Xiaoyong Yang, Yadong Zhu and Yi Zhang.` ########## flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java: ########## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.recommendation.swing; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.iteration.operator.OperatorStateUtils; +import org.apache.flink.ml.api.AlgoOperator; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * An AlgoOperator which implements the Swing algorithm. + * + * <p>Swing is an item recall algorithm. The topology of user-item graph usually can be described as + * user-item-user or item-user-item, which are like 'swing'. For example, if both user <em>u</em> and user <em>v</em> + * have purchased the same commodity <em>i</em> , they will form a relationship diagram similar to a swing. If + * <em>u</em> and <em>v</em> have purchased commodity <em>j</em> in addition to <em>i</em>, it is supposed <em>i</em> + * and <em>j</em> are similar. The similarity between items in Swing is defined as + * + * <p>$$ w_{(i,j)}=\sum_{u\in U_i\cap U_j}\sum_{v\in U_i\cap + * U_j}{\frac{1}{{(I_u+\alpha_1)}^\beta}}*{\frac{1}{{(I_v+\alpha_1)}^\beta}}*{\frac{1}{\alpha_2+|I_u\cap I_v|}} $$ + * + * <p>This implementation is based on the algorithm proposed in the paper: "Large Scale Product + * Graph Construction for Recommendation in E-commerce" by Xiaoyong Yang, Yadong Zhu and Yi Zhang. (<a + * href="https://arxiv.org/pdf/2010.05525.pdf">https://arxiv.org/pdf/2010.05525.pdf</a>) + */ +public class Swing implements AlgoOperator<Swing>, SwingParams<Swing> { + private final Map<Param<?>, Object> paramMap = new HashMap<>(); + + public Swing() { + ParamUtils.initializeMapWithDefaultValues(paramMap, this); + } + + @Override + public Table[] transform(Table... inputs) { + Preconditions.checkArgument(inputs.length == 1); + final String userCol = getUserCol(); + final String itemCol = getItemCol(); + final ResolvedSchema schema = inputs[0].getResolvedSchema(); + + if (!(Types.LONG.equals(TableUtils.getTypeInfoByName(schema, userCol)) + && Types.LONG.equals(TableUtils.getTypeInfoByName(schema, itemCol)))) { + throw new IllegalArgumentException("The types of user and item columns must be Long."); + } + + if (getMaxUserBehavior() < getMinUserBehavior()) { + throw new IllegalArgumentException( + String.format("The maxUserBehavior must be larger or equal to minUserBehavior. " + + "The current value: maxUserBehavior=%d, minUserBehavior=%d.", + getMaxUserBehavior(), + getMinUserBehavior()) + ); + } + + StreamTableEnvironment tEnv = + (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment(); + + SingleOutputStreamOperator<Tuple2<Long, Long>> purchasingBehavior = + tEnv.toDataStream(inputs[0]) + .map( + row -> { + Long userId = (Long) row.getFieldAs(userCol); + Long itemId = (Long) row.getFieldAs(itemCol); + if (userId == null || itemId == null) { + throw new RuntimeException( + "Data of user and item column must not be null."); Review Comment: nit: How about updating the error message as `User or item column must not be null.`? ########## flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java: ########## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.recommendation.swing; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.iteration.operator.OperatorStateUtils; +import org.apache.flink.ml.api.AlgoOperator; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * An AlgoOperator which implements the Swing algorithm. + * + * <p>Swing is an item recall algorithm. The topology of user-item graph usually can be described as + * user-item-user or item-user-item, which are like 'swing'. For example, if both user <em>u</em> and user <em>v</em> + * have purchased the same commodity <em>i</em> , they will form a relationship diagram similar to a swing. If + * <em>u</em> and <em>v</em> have purchased commodity <em>j</em> in addition to <em>i</em>, it is supposed <em>i</em> + * and <em>j</em> are similar. The similarity between items in Swing is defined as + * + * <p>$$ w_{(i,j)}=\sum_{u\in U_i\cap U_j}\sum_{v\in U_i\cap + * U_j}{\frac{1}{{(I_u+\alpha_1)}^\beta}}*{\frac{1}{{(I_v+\alpha_1)}^\beta}}*{\frac{1}{\alpha_2+|I_u\cap I_v|}} $$ + * + * <p>This implementation is based on the algorithm proposed in the paper: "Large Scale Product + * Graph Construction for Recommendation in E-commerce" by Xiaoyong Yang, Yadong Zhu and Yi Zhang. (<a + * href="https://arxiv.org/pdf/2010.05525.pdf">https://arxiv.org/pdf/2010.05525.pdf</a>) + */ +public class Swing implements AlgoOperator<Swing>, SwingParams<Swing> { + private final Map<Param<?>, Object> paramMap = new HashMap<>(); + + public Swing() { + ParamUtils.initializeMapWithDefaultValues(paramMap, this); + } + + @Override + public Table[] transform(Table... inputs) { + Preconditions.checkArgument(inputs.length == 1); + final String userCol = getUserCol(); + final String itemCol = getItemCol(); + final ResolvedSchema schema = inputs[0].getResolvedSchema(); + + if (!(Types.LONG.equals(TableUtils.getTypeInfoByName(schema, userCol)) + && Types.LONG.equals(TableUtils.getTypeInfoByName(schema, itemCol)))) { + throw new IllegalArgumentException("The types of user and item columns must be Long."); + } + + if (getMaxUserBehavior() < getMinUserBehavior()) { + throw new IllegalArgumentException( + String.format("The maxUserBehavior must be larger or equal to minUserBehavior. " + + "The current value: maxUserBehavior=%d, minUserBehavior=%d.", + getMaxUserBehavior(), + getMinUserBehavior()) + ); + } + + StreamTableEnvironment tEnv = + (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment(); + + SingleOutputStreamOperator<Tuple2<Long, Long>> purchasingBehavior = + tEnv.toDataStream(inputs[0]) + .map( + row -> { + Long userId = (Long) row.getFieldAs(userCol); + Long itemId = (Long) row.getFieldAs(itemCol); + if (userId == null || itemId == null) { + throw new RuntimeException( + "Data of user and item column must not be null."); + } + return Tuple2.of(userId, itemId); + }) + .returns(Types.TUPLE(Types.LONG, Types.LONG)); + + SingleOutputStreamOperator<Tuple3<Long, Long, Map<Long, String>>> userBehavior = + purchasingBehavior + .keyBy(tuple -> tuple.f0) + .transform( + "collectingUserBehavior", + Types.TUPLE( + Types.LONG, + Types.LONG, + Types.MAP(Types.LONG, Types.STRING)), + new CollectingUserBehavior( + getMinUserBehavior(), getMaxUserBehavior())); + + RowTypeInfo outputTypeInfo = + new RowTypeInfo( + new TypeInformation[] {Types.LONG, Types.STRING}, + new String[] {getItemCol(), getOutputCol()}); + + DataStream<Row> output = + userBehavior + .keyBy(tuple -> tuple.f1) + .transform( + "computingSimilarItems", + outputTypeInfo, + new ComputingSimilarItems( + getK(), + getMaxUserNumPerItem(), + getAlpha1(), + getAlpha2(), + getBeta())); + + return new Table[] {tEnv.fromDataStream(output)}; + } + + @Override + public Map<Param<?>, Object> getParamMap() { + return paramMap; + } + + @Override + public void save(String path) throws IOException { + ReadWriteUtils.saveMetadata(this, path); + } + + public static Swing load(StreamTableEnvironment tEnv, String path) throws IOException { + return ReadWriteUtils.loadStageParam(path); + } + + /** + * Collects user behavior data and appends to the input table. + * + * <p>During the process, this operator collects users and all items he/she has purchased, and + * its input table must be bounded. Because Flink doesn't support type info of `Set` officially, The appended column + * is `Map` contains items as key and maps null value. + */ + private static class CollectingUserBehavior + extends AbstractStreamOperator<Tuple3<Long, Long, Map<Long, String>>> + implements OneInputStreamOperator< + Tuple2<Long, Long>, Tuple3<Long, Long, Map<Long, String>>>, + BoundedOneInput { + private final int minUserItemInteraction; + private final int maxUserItemInteraction; + + // Maps a user id to a set of items. Because ListState cannot keep values of type `Set`, + // we use `Map<Long, String>` with null values instead. So does `userItemsMap` and + // `itemUsersMap` in `ComputingSimilarItems`. + private Map<Long, Map<Long, String>> userAndPurchasedItems = new HashMap<>(); + + private ListState<Map<Long, Map<Long, String>>> userAndPurchasedItemsState; + + private CollectingUserBehavior(int minUserItemInteraction, int maxUserItemInteraction) { + this.minUserItemInteraction = minUserItemInteraction; + this.maxUserItemInteraction = maxUserItemInteraction; + } + + @Override + public void endInput() { + + userAndPurchasedItems.forEach( + (user, items) -> { + if (items.size() >= minUserItemInteraction + && items.size() <= maxUserItemInteraction) { + items.forEach( + (item, nullValue) -> + output.collect( + new StreamRecord<>( + new Tuple3<>(user, item, items)))); + } + }); + + userAndPurchasedItemsState.clear(); + } + + @Override + public void processElement(StreamRecord<Tuple2<Long, Long>> element) { + Tuple2<Long, Long> userAndItem = element.getValue(); + long user = userAndItem.f0; + long item = userAndItem.f1; + Map<Long, String> items = userAndPurchasedItems.get(user); + + if (items == null) { + items = new LinkedHashMap<>(); + userAndPurchasedItems.putIfAbsent(user, items); Review Comment: Is there any case that `user` does exist in the map? If no, we probably should use `put()`. ########## flink-ml-lib/src/test/java/org/apache/flink/ml/recommendation/SwingTest.java: ########## @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.recommendation; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.ml.recommendation.swing.Swing; +import org.apache.flink.ml.util.TestUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; + +import org.apache.commons.collections.IteratorUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Tests {@link Swing}. + */ +public class SwingTest { + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + private StreamExecutionEnvironment env; + private StreamTableEnvironment tEnv; + private Table trainData; + + @Before + public void before() { + env = TestUtils.getExecutionEnvironment(); + tEnv = StreamTableEnvironment.create(env); + List<Row> trainRows = + new ArrayList<>( + Arrays.asList( + Row.of(0L, 10L), + Row.of(0L, 11L), + Row.of(0L, 12L), + Row.of(1L, 13L), + Row.of(1L, 12L), + Row.of(2L, 10L), + Row.of(2L, 11L), + Row.of(2L, 12L), + Row.of(3L, 13L), + Row.of(3L, 12L), + Row.of(4L, 12L), + Row.of(4L, 10L), + Row.of(4L, 11L), + Row.of(4L, 12L), + Row.of(4L, 13L))); + trainData = + tEnv.fromDataStream( + env.fromCollection( + trainRows, + new RowTypeInfo( + new TypeInformation[] { + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO + }, + new String[] {"user_id", "item_id"}))); + } + + private void compareResultAndExpected(List<Row> results) { + List<Row> expectedScoreRows = + new ArrayList<>( + Arrays.asList( + Row.of(10L, "11,0.058845768947156235;12,0.058845768947156235"), + Row.of(11L, "10,0.058845768947156235;12,0.058845768947156235"), + Row.of( + 12L, + "13,0.09134833828228624;10,0.058845768947156235;11,0.058845768947156235"), + Row.of(13L, "12,0.09134833828228624"))); + + results.sort((o1, o2) -> Long.compare(o1.getFieldAs(0), o2.getFieldAs(0))); Review Comment: nit: This could be simplified as `results.sort(Comparator.comparingLong(o -> o.getFieldAs(0)));` ########## flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java: ########## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.recommendation.swing; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.iteration.operator.OperatorStateUtils; +import org.apache.flink.ml.api.AlgoOperator; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * An AlgoOperator which implements the Swing algorithm. + * + * <p>Swing is an item recall algorithm. The topology of user-item graph usually can be described as + * user-item-user or item-user-item, which are like 'swing'. For example, if both user <em>u</em> and user <em>v</em> + * have purchased the same commodity <em>i</em> , they will form a relationship diagram similar to a swing. If + * <em>u</em> and <em>v</em> have purchased commodity <em>j</em> in addition to <em>i</em>, it is supposed <em>i</em> + * and <em>j</em> are similar. The similarity between items in Swing is defined as + * + * <p>$$ w_{(i,j)}=\sum_{u\in U_i\cap U_j}\sum_{v\in U_i\cap + * U_j}{\frac{1}{{(I_u+\alpha_1)}^\beta}}*{\frac{1}{{(I_v+\alpha_1)}^\beta}}*{\frac{1}{\alpha_2+|I_u\cap I_v|}} $$ + * + * <p>This implementation is based on the algorithm proposed in the paper: "Large Scale Product + * Graph Construction for Recommendation in E-commerce" by Xiaoyong Yang, Yadong Zhu and Yi Zhang. (<a + * href="https://arxiv.org/pdf/2010.05525.pdf">https://arxiv.org/pdf/2010.05525.pdf</a>) + */ +public class Swing implements AlgoOperator<Swing>, SwingParams<Swing> { + private final Map<Param<?>, Object> paramMap = new HashMap<>(); + + public Swing() { + ParamUtils.initializeMapWithDefaultValues(paramMap, this); + } + + @Override + public Table[] transform(Table... inputs) { + Preconditions.checkArgument(inputs.length == 1); + final String userCol = getUserCol(); + final String itemCol = getItemCol(); + final ResolvedSchema schema = inputs[0].getResolvedSchema(); + + if (!(Types.LONG.equals(TableUtils.getTypeInfoByName(schema, userCol)) + && Types.LONG.equals(TableUtils.getTypeInfoByName(schema, itemCol)))) { + throw new IllegalArgumentException("The types of user and item columns must be Long."); + } + + if (getMaxUserBehavior() < getMinUserBehavior()) { + throw new IllegalArgumentException( + String.format("The maxUserBehavior must be larger or equal to minUserBehavior. " + + "The current value: maxUserBehavior=%d, minUserBehavior=%d.", + getMaxUserBehavior(), + getMinUserBehavior()) + ); + } + + StreamTableEnvironment tEnv = + (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment(); + + SingleOutputStreamOperator<Tuple2<Long, Long>> purchasingBehavior = + tEnv.toDataStream(inputs[0]) + .map( + row -> { + Long userId = (Long) row.getFieldAs(userCol); + Long itemId = (Long) row.getFieldAs(itemCol); + if (userId == null || itemId == null) { + throw new RuntimeException( + "Data of user and item column must not be null."); + } + return Tuple2.of(userId, itemId); + }) + .returns(Types.TUPLE(Types.LONG, Types.LONG)); + + SingleOutputStreamOperator<Tuple3<Long, Long, Map<Long, String>>> userBehavior = + purchasingBehavior + .keyBy(tuple -> tuple.f0) + .transform( + "collectingUserBehavior", + Types.TUPLE( + Types.LONG, + Types.LONG, + Types.MAP(Types.LONG, Types.STRING)), + new CollectingUserBehavior( + getMinUserBehavior(), getMaxUserBehavior())); + + RowTypeInfo outputTypeInfo = + new RowTypeInfo( + new TypeInformation[] {Types.LONG, Types.STRING}, + new String[] {getItemCol(), getOutputCol()}); + + DataStream<Row> output = + userBehavior + .keyBy(tuple -> tuple.f1) + .transform( + "computingSimilarItems", + outputTypeInfo, + new ComputingSimilarItems( + getK(), + getMaxUserNumPerItem(), + getAlpha1(), + getAlpha2(), + getBeta())); + + return new Table[] {tEnv.fromDataStream(output)}; + } + + @Override + public Map<Param<?>, Object> getParamMap() { + return paramMap; + } + + @Override + public void save(String path) throws IOException { + ReadWriteUtils.saveMetadata(this, path); + } + + public static Swing load(StreamTableEnvironment tEnv, String path) throws IOException { + return ReadWriteUtils.loadStageParam(path); + } + + /** + * Collects user behavior data and appends to the input table. + * + * <p>During the process, this operator collects users and all items he/she has purchased, and + * its input table must be bounded. Because Flink doesn't support type info of `Set` officially, The appended column + * is `Map` contains items as key and maps null value. + */ + private static class CollectingUserBehavior + extends AbstractStreamOperator<Tuple3<Long, Long, Map<Long, String>>> + implements OneInputStreamOperator< + Tuple2<Long, Long>, Tuple3<Long, Long, Map<Long, String>>>, + BoundedOneInput { + private final int minUserItemInteraction; + private final int maxUserItemInteraction; + + // Maps a user id to a set of items. Because ListState cannot keep values of type `Set`, + // we use `Map<Long, String>` with null values instead. So does `userItemsMap` and + // `itemUsersMap` in `ComputingSimilarItems`. + private Map<Long, Map<Long, String>> userAndPurchasedItems = new HashMap<>(); + + private ListState<Map<Long, Map<Long, String>>> userAndPurchasedItemsState; + + private CollectingUserBehavior(int minUserItemInteraction, int maxUserItemInteraction) { + this.minUserItemInteraction = minUserItemInteraction; + this.maxUserItemInteraction = maxUserItemInteraction; + } + + @Override + public void endInput() { + + userAndPurchasedItems.forEach( + (user, items) -> { + if (items.size() >= minUserItemInteraction + && items.size() <= maxUserItemInteraction) { + items.forEach( + (item, nullValue) -> + output.collect( + new StreamRecord<>( + new Tuple3<>(user, item, items)))); + } + }); + + userAndPurchasedItemsState.clear(); + } + + @Override + public void processElement(StreamRecord<Tuple2<Long, Long>> element) { + Tuple2<Long, Long> userAndItem = element.getValue(); + long user = userAndItem.f0; + long item = userAndItem.f1; + Map<Long, String> items = userAndPurchasedItems.get(user); + + if (items == null) { + items = new LinkedHashMap<>(); + userAndPurchasedItems.putIfAbsent(user, items); + } + + if (items.size() <= maxUserItemInteraction) { + items.put(item, null); + } + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + userAndPurchasedItemsState = + context.getOperatorStateStore() + .getListState( + new ListStateDescriptor<>( + "userAndPurchasedItemsState", + Types.MAP( + Types.LONG, + Types.MAP(Types.LONG, Types.STRING)))); + + OperatorStateUtils.getUniqueElement(userAndPurchasedItemsState, "userAndPurchasedItemsState") + .ifPresent( + stat -> { + userAndPurchasedItems = stat; + }); + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + userAndPurchasedItemsState.update(Collections.singletonList(userAndPurchasedItems)); + } + } + + /** + * Calculates similarity between items and keep top k similar items of each target item. + */ + private static class ComputingSimilarItems extends AbstractStreamOperator<Row> + implements OneInputStreamOperator<Tuple3<Long, Long, Map<Long, String>>, Row>, + BoundedOneInput { + + private Map<Long, Map<Long, String>> userAndPurchasedItems = new HashMap<>(); + private Map<Long, Map<Long, String>> itemAndPurchasers = new HashMap<>(); + private ListState<Map<Long, Map<Long, String>>> userAndPurchasedItemsState; + private ListState<Map<Long, Map<Long, String>>> itemAndPurchasersState; + + private final int k; + private final int maxUserNumPerItem; + + /** + * Alpha1 and alpha2 are integers larger or equal to zero. Though they by definition should be greater than Review Comment: Do you think we should explain this in the java doc of the `Swing` class? Because reader may get interested in the behavior of `Swing` when `alpha2` is zero, without reading the implementation here. ########## flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/SwingParams.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.recommendation.swing; + +import org.apache.flink.ml.common.param.HasOutputCol; +import org.apache.flink.ml.param.DoubleParam; +import org.apache.flink.ml.param.IntParam; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; +import org.apache.flink.ml.param.StringParam; +import org.apache.flink.ml.param.WithParams; + +/** + * Params for {@link Swing}. + * + * @param <T> The class type of this instance. + */ +public interface SwingParams<T> extends WithParams<T>, HasOutputCol<T> { + Param<String> USER_COL = + new StringParam( + "userCol", + "User column name. The type of user column must be Long.", + "user", + ParamValidators.notNull()); + + Param<String> ITEM_COL = + new StringParam( + "itemCol", + "Item column name. The type of item column must be Long.", + "item", + ParamValidators.notNull()); + + Param<Integer> MAX_USER_NUM_PER_ITEM = + new IntParam( + "maxUserNumPerItem", + "The max number of user(purchasers) for each item. If the number of user " + + "is larger than this value, then only maxUserNumPerItem users will " + + "be sampled and considered in the computation of similarity between two items.", + 1000, + ParamValidators.gt(0)); + + Param<Integer> K = + new IntParam( + "k", + "The max number of similar items to output for each item. If an item has " Review Comment: `If an item has...` seems an explanation of `The max number of similar items to output for each item.`. Do you think it provides more information? ########## flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/SwingParams.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.recommendation.swing; + +import org.apache.flink.ml.common.param.HasOutputCol; +import org.apache.flink.ml.param.DoubleParam; +import org.apache.flink.ml.param.IntParam; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; +import org.apache.flink.ml.param.StringParam; +import org.apache.flink.ml.param.WithParams; + +/** + * Params for {@link Swing}. + * + * @param <T> The class type of this instance. + */ +public interface SwingParams<T> extends WithParams<T>, HasOutputCol<T> { + Param<String> USER_COL = + new StringParam( + "userCol", + "User column name. The type of user column must be Long.", + "user", + ParamValidators.notNull()); + + Param<String> ITEM_COL = + new StringParam( + "itemCol", + "Item column name. The type of item column must be Long.", + "item", + ParamValidators.notNull()); + + Param<Integer> MAX_USER_NUM_PER_ITEM = + new IntParam( + "maxUserNumPerItem", + "The max number of user(purchasers) for each item. If the number of user " + + "is larger than this value, then only maxUserNumPerItem users will " + + "be sampled and considered in the computation of similarity between two items.", + 1000, + ParamValidators.gt(0)); + + Param<Integer> K = + new IntParam( + "k", + "The max number of similar items to output for each item. If an item has " + + "more than k recommendations, the first k similar items will be kept", + 100, + ParamValidators.gt(0)); + + Param<Integer> MIN_USER_BEHAVIOR = + new IntParam( + "minUserBehavior", + "The min number of items for a user purchases. If the items purchased by a user is smaller than " Review Comment: It is not a `fit` process. Can you replace the `fit` here? `This can affect the speed of the computation. The best value depends on the nature of the problem.` This sentence does not introduce extra accurate information --- It is hard to tell how would this parameter affect performance and accuracy, and thus cannot guide users. Moreover, it is unclear what is `nature` of the probem. So I would just suggest remove it here. I am also fine if you can make it more accurate. Same for MAX_USER_BEHAVIOR. ########## flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/Swing.java: ########## @@ -0,0 +1,422 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.recommendation.swing; + +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.iteration.operator.OperatorStateUtils; +import org.apache.flink.ml.api.AlgoOperator; +import org.apache.flink.ml.common.datastream.TableUtils; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.util.ParamUtils; +import org.apache.flink.ml.util.ReadWriteUtils; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.internal.TableImpl; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * An AlgoOperator which implements the Swing algorithm. + * + * <p>Swing is an item recall algorithm. The topology of user-item graph usually can be described as + * user-item-user or item-user-item, which are like 'swing'. For example, if both user <em>u</em> and user <em>v</em> + * have purchased the same commodity <em>i</em> , they will form a relationship diagram similar to a swing. If + * <em>u</em> and <em>v</em> have purchased commodity <em>j</em> in addition to <em>i</em>, it is supposed <em>i</em> + * and <em>j</em> are similar. The similarity between items in Swing is defined as + * + * <p>$$ w_{(i,j)}=\sum_{u\in U_i\cap U_j}\sum_{v\in U_i\cap + * U_j}{\frac{1}{{(I_u+\alpha_1)}^\beta}}*{\frac{1}{{(I_v+\alpha_1)}^\beta}}*{\frac{1}{\alpha_2+|I_u\cap I_v|}} $$ + * + * <p>This implementation is based on the algorithm proposed in the paper: "Large Scale Product + * Graph Construction for Recommendation in E-commerce" by Xiaoyong Yang, Yadong Zhu and Yi Zhang. (<a + * href="https://arxiv.org/pdf/2010.05525.pdf">https://arxiv.org/pdf/2010.05525.pdf</a>) + */ +public class Swing implements AlgoOperator<Swing>, SwingParams<Swing> { + private final Map<Param<?>, Object> paramMap = new HashMap<>(); + + public Swing() { + ParamUtils.initializeMapWithDefaultValues(paramMap, this); + } + + @Override + public Table[] transform(Table... inputs) { + Preconditions.checkArgument(inputs.length == 1); + final String userCol = getUserCol(); + final String itemCol = getItemCol(); + final ResolvedSchema schema = inputs[0].getResolvedSchema(); + + if (!(Types.LONG.equals(TableUtils.getTypeInfoByName(schema, userCol)) + && Types.LONG.equals(TableUtils.getTypeInfoByName(schema, itemCol)))) { + throw new IllegalArgumentException("The types of user and item columns must be Long."); + } + + if (getMaxUserBehavior() < getMinUserBehavior()) { + throw new IllegalArgumentException( + String.format("The maxUserBehavior must be larger or equal to minUserBehavior. " + + "The current value: maxUserBehavior=%d, minUserBehavior=%d.", + getMaxUserBehavior(), + getMinUserBehavior()) + ); + } + + StreamTableEnvironment tEnv = + (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment(); + + SingleOutputStreamOperator<Tuple2<Long, Long>> purchasingBehavior = + tEnv.toDataStream(inputs[0]) + .map( + row -> { + Long userId = (Long) row.getFieldAs(userCol); + Long itemId = (Long) row.getFieldAs(itemCol); + if (userId == null || itemId == null) { + throw new RuntimeException( + "Data of user and item column must not be null."); + } + return Tuple2.of(userId, itemId); + }) + .returns(Types.TUPLE(Types.LONG, Types.LONG)); + + SingleOutputStreamOperator<Tuple3<Long, Long, Map<Long, String>>> userBehavior = + purchasingBehavior + .keyBy(tuple -> tuple.f0) + .transform( + "collectingUserBehavior", + Types.TUPLE( + Types.LONG, + Types.LONG, + Types.MAP(Types.LONG, Types.STRING)), + new CollectingUserBehavior( + getMinUserBehavior(), getMaxUserBehavior())); + + RowTypeInfo outputTypeInfo = + new RowTypeInfo( + new TypeInformation[] {Types.LONG, Types.STRING}, + new String[] {getItemCol(), getOutputCol()}); + + DataStream<Row> output = + userBehavior + .keyBy(tuple -> tuple.f1) + .transform( + "computingSimilarItems", + outputTypeInfo, + new ComputingSimilarItems( + getK(), + getMaxUserNumPerItem(), + getAlpha1(), + getAlpha2(), + getBeta())); + + return new Table[] {tEnv.fromDataStream(output)}; + } + + @Override + public Map<Param<?>, Object> getParamMap() { + return paramMap; + } + + @Override + public void save(String path) throws IOException { + ReadWriteUtils.saveMetadata(this, path); + } + + public static Swing load(StreamTableEnvironment tEnv, String path) throws IOException { + return ReadWriteUtils.loadStageParam(path); + } + + /** + * Collects user behavior data and appends to the input table. + * + * <p>During the process, this operator collects users and all items he/she has purchased, and + * its input table must be bounded. Because Flink doesn't support type info of `Set` officially, The appended column + * is `Map` contains items as key and maps null value. + */ + private static class CollectingUserBehavior + extends AbstractStreamOperator<Tuple3<Long, Long, Map<Long, String>>> + implements OneInputStreamOperator< + Tuple2<Long, Long>, Tuple3<Long, Long, Map<Long, String>>>, + BoundedOneInput { + private final int minUserItemInteraction; + private final int maxUserItemInteraction; + + // Maps a user id to a set of items. Because ListState cannot keep values of type `Set`, + // we use `Map<Long, String>` with null values instead. So does `userItemsMap` and + // `itemUsersMap` in `ComputingSimilarItems`. Review Comment: nit: `userItemsMap` and `itemUsersMap` needs to be updated. It seems inconsistent with existing code. ########## flink-ml-lib/src/main/java/org/apache/flink/ml/recommendation/swing/SwingParams.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.recommendation.swing; + +import org.apache.flink.ml.common.param.HasOutputCol; +import org.apache.flink.ml.param.DoubleParam; +import org.apache.flink.ml.param.IntParam; +import org.apache.flink.ml.param.Param; +import org.apache.flink.ml.param.ParamValidators; +import org.apache.flink.ml.param.StringParam; +import org.apache.flink.ml.param.WithParams; + +/** + * Params for {@link Swing}. + * + * @param <T> The class type of this instance. + */ +public interface SwingParams<T> extends WithParams<T>, HasOutputCol<T> { + Param<String> USER_COL = + new StringParam( + "userCol", + "User column name. The type of user column must be Long.", Review Comment: We usually do not explain the type of a column when describing the param. Let's follow the existing convention. Same for ITEM_COL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
