danny0405 commented on code in PR #5516:
URL: https://github.com/apache/hudi/pull/5516#discussion_r867706679


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java:
##########
@@ -43,20 +41,25 @@ public class MergeOnReadInputSplit implements InputSplit {
   private final long maxCompactionMemoryInBytes;
   private final String mergeType;
   private final Option<InstantRange> instantRange;
+  private String fileId;
+
 
   // for streaming reader to record the consumed offset,
   // which is the start of next round reading.
   private long consumed = NUM_NO_CONSUMPTION;
 
+
+
   public MergeOnReadInputSplit(
-      int splitNum,
-      @Nullable String basePath,
-      Option<List<String>> logPaths,
-      String latestCommit,
-      String tablePath,
-      long maxCompactionMemoryInBytes,
-      String mergeType,
-      @Nullable InstantRange instantRange) {
+          int splitNum,
+          @Nullable String basePath,
+          Option<List<String>> logPaths,
+          String latestCommit,

Review Comment:
   Fix the indentation



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java:
##########
@@ -18,13 +18,11 @@
 
 package org.apache.hudi.table.format.mor;
 
+import org.apache.flink.core.io.InputSplit;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.Option;
 
-import org.apache.flink.core.io.InputSplit;
-

Review Comment:
   The changes for the import is not necessary.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java:
##########
@@ -181,7 +182,8 @@ public DataStream<RowData> 
produceDataStream(StreamExecutionEnvironment execEnv)
           OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> 
factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
           SingleOutputStreamOperator<RowData> source = 
execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
               .setParallelism(1)
-              .transform("split_reader", typeInfo, factory)
+                  .keyBy((KeySelector<MergeOnReadInputSplit, String>) mos -> 
String.valueOf(mos.getFileId()))
+                  .transform("split_reader", typeInfo, factory)

Review Comment:
   Can the explicit `KeySelector` be removed ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to