lindong28 commented on code in PR #202:
URL: https://github.com/apache/flink-ml/pull/202#discussion_r1095193719


##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java:
##########
@@ -114,57 +115,114 @@ public static DataStream<double[]> 
allReduceSum(DataStream<double[]> input) {
      */
     public static <IN, OUT> DataStream<OUT> mapPartition(
             DataStream<IN> input, MapPartitionFunction<IN, OUT> func) {
-        TypeInformation<OUT> resultType =
-                TypeExtractor.getMapPartitionReturnTypes(func, 
input.getType(), null, true);
-        return input.transform("mapPartition", resultType, new 
MapPartitionOperator<>(func))
+        return mapPartition(input, func, null);
+    }
+
+    /**
+     * Applies a {@link MapPartitionFunction} on a bounded data stream.
+     *
+     * @param input The input data stream.
+     * @param func The user defined mapPartition function.
+     * @param outType The type information of the output.
+     * @param <IN> The class type of the input.
+     * @param <OUT> The class type of output.
+     * @return The result data stream.
+     */
+    public static <IN, OUT> DataStream<OUT> mapPartition(
+            DataStream<IN> input,
+            MapPartitionFunction<IN, OUT> func,
+            TypeInformation<OUT> outType) {
+        if (outType == null) {

Review Comment:
   It seems more readable to move this logic to `mapPartition(DataStream<IN> 
input, MapPartitionFunction<IN, OUT> func)`.
   
   Same for other functions modified in this class.



##########
flink-ml-core/src/test/java/org/apache/flink/ml/util/ReadWriteUtilsTest.java:
##########
@@ -54,7 +54,7 @@ public void before() throws IOException {
                 new org.apache.flink.configuration.Configuration();
         
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
-        env.getConfig().enableObjectReuse();
+        env.getConfig().enableObjectReuse().disableGenericTypes();

Review Comment:
   Can we re-use TestUtils for consistency?



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ImputerTest.java:
##########
@@ -265,9 +265,7 @@ public void testNoValidDataOnMedianStrategy() {
         final List<Row> trainData =
                 new ArrayList<>(
                         Arrays.asList(
-                                Row.of(Double.NaN, Float.NaN),
-                                Row.of(null, null),
-                                Row.of(1.0, 1.0f)));
+                                Row.of(Double.NaN, 3.0f), Row.of(null, 2.0f), 
Row.of(1.0, 1.0f)));

Review Comment:
   Why do we need to change the value here as part of this PR?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/countvectorizer/CountVectorizer.java:
##########
@@ -78,7 +80,11 @@ public CountVectorizerModel fit(Table... inputs) {
         DataStream<CountVectorizerModelData> modelData =
                 DataStreamUtils.aggregate(
                         inputData,
-                        new VocabularyAggregator(getMinDF(), getMaxDF(), 
getVocabularySize()));
+                        new VocabularyAggregator(getMinDF(), getMaxDF(), 
getVocabularySize()),
+                        Types.TUPLE(
+                                Types.LONG,
+                                Types.MAP(Types.STRING, 
Types.TUPLE(Types.LONG, Types.LONG))),

Review Comment:
   It is surprising that `TypeExtractor.getAggregateFunctionAccumulatorType` 
can not automatically derive the right type from `Tuple2<Long, Map<String, 
Tuple2<Long, Long>>>`, which only contains common types.
   
   Can you check with related developers whether this is expected? Maybe open a 
ticket in Flink to track this issue.



##########
flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/DataStreamUtils.java:
##########
@@ -104,7 +104,8 @@ public static DataStream<double[]> 
allReduceSum(DataStream<double[]> input) {
     }
 
     /**
-     * Applies a {@link MapPartitionFunction} on a bounded data stream.
+     * Applies a {@link MapPartitionFunction} on a bounded data stream. The 
type of the output is

Review Comment:
   `The type of the output is inferred automatically` seems like an 
implementation detail of this method. It seems simpler not to specify this.
   
   Maybe see `DataStream#map(MapFunction<T, R> mapper)`'s doc for example.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/evaluation/binaryclassification/BinaryClassificationEvaluator.java:
##########
@@ -668,30 +669,64 @@ public Tuple3<Double, Boolean, Double> map(Row value) 
throws Exception {
     }
 
     /** Binary Summary of data in one worker. */
-    private static class BinarySummary implements Serializable {
-        private final Integer taskId;
+    public static class BinarySummary implements Serializable {
+        private Integer taskId;
         // maximum score in this partition
         private double maxScore;
         // real positives in this partition
         private long curPositive;
         // real negatives in this partition
         private long curNegative;
 
+        public BinarySummary() {}
+
         public BinarySummary(Integer taskId, double maxScore, long 
curPositive, long curNegative) {
             this.taskId = taskId;
             this.maxScore = maxScore;
             this.curPositive = curPositive;
             this.curNegative = curNegative;
         }
+
+        public Integer getTaskId() {

Review Comment:
   Would it be simpler to make these variables `public` than adding get/set 
methods?



##########
flink-ml-core/src/test/java/org/apache/flink/ml/common/broadcast/BroadcastUtilsTest.java:
##########
@@ -108,7 +108,7 @@ private JobGraph getJobGraph(int numNonBroadcastInputs) {
                                         true);
                             }
                         });
-        env.getConfig().enableObjectReuse();
+        env.getConfig().enableObjectReuse().disableGenericTypes();

Review Comment:
   The code style seems inconsistent here.
   
   Either we use chain-style for env.setParallelism().enableCheckpointing().., 
or we use separate line for`enableObjectReuse()` and `disableGenericTypes()`.
   
   It seems using separate line is more common in the Flink example code.
   
   
   Also, can we re-use TestUtils#getExecutionEnvironment() here? Note that we 
can still overwrite the parallelism and restart strategy etc. if needed after 
getting the env.
   
   



##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/GraphTest.java:
##########
@@ -49,13 +46,7 @@ public class GraphTest extends AbstractTestBase {
 
     @Before
     public void before() {
-        Configuration config = new Configuration();
-        
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, 
true);
-        env = StreamExecutionEnvironment.getExecutionEnvironment(config);
-        env.getConfig().enableObjectReuse();
-        env.setParallelism(4);
-        env.enableCheckpointing(100);
-        env.setRestartStrategy(RestartStrategies.noRestart());
+        env = org.apache.flink.ml.util.TestUtils.getExecutionEnvironment();

Review Comment:
   It is better to move the existing TestUtils from `flink-ml-lib` to 
`flink-ml-core` than duplicating the code + using full package path here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to