This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 31a301f  [HUDI-2485] Consume as mini-batch for flink stream reader 
(#3710)
31a301f is described below

commit 31a301f0aa955450e52c28ee5857526034d523f0
Author: Danny Chan <[email protected]>
AuthorDate: Fri Sep 24 23:44:01 2021 +0800

    [HUDI-2485] Consume as mini-batch for flink stream reader (#3710)
---
 .../org/apache/hudi/source/StreamReadOperator.java | 50 +++++++++++++++++-----
 .../org/apache/hudi/table/HoodieTableSource.java   |  4 +-
 .../table/format/mor/MergeOnReadInputFormat.java   | 27 ++++++++++++
 .../table/format/mor/MergeOnReadInputSplit.java    | 19 +++++++-
 .../apache/hudi/table/HoodieDataSourceITCase.java  |  2 +-
 5 files changed, 86 insertions(+), 16 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java 
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java
index e2f5f7b..0130433 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java
@@ -64,6 +64,8 @@ public class StreamReadOperator extends 
AbstractStreamOperator<RowData>
 
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamReadOperator.class);
 
+  private static final int MINI_BATCH_SIZE = 1000;
+
   // It's the same thread that runs this operator and checkpoint actions. Use 
this executor to schedule only
   // splits for subsequent reading, so that a new checkpoint could be 
triggered without blocking a long time
   // for exhausting all scheduled split reading tasks.
@@ -74,6 +76,7 @@ public class StreamReadOperator extends 
AbstractStreamOperator<RowData>
   private transient SourceFunction.SourceContext<RowData> sourceContext;
 
   private transient ListState<MergeOnReadInputSplit> inputSplitsState;
+
   private transient Queue<MergeOnReadInputSplit> splits;
 
   // Splits are read by the same thread that calls #processElement. Each read 
task is submitted to that thread by adding
@@ -146,31 +149,56 @@ public class StreamReadOperator extends 
AbstractStreamOperator<RowData>
   }
 
   private void processSplits() throws IOException {
-    MergeOnReadInputSplit split = splits.poll();
+    MergeOnReadInputSplit split = splits.peek();
     if (split == null) {
       currentSplitState = SplitState.IDLE;
       return;
     }
 
-    // This log is important to indicate the consuming process, there is only 
one log message for one data bucket.
-    LOG.info("Processing input split : {}", split);
-
-    try {
+    // 1. open a fresh new input split and start reading as mini-batch
+    // 2. if the input split has remaining records to read, switches to 
another runnable to handle
+    // 3. if the input split reads to the end, close the format and remove the 
split from the queue #splits
+    // 4. for each runnable, reads at most #MINI_BATCH_SIZE number of records
+    if (format.isClosed()) {
+      // This log is important to indicate the consuming process,
+      // there is only one log message for one data bucket.
+      LOG.info("Processing input split : {}", split);
       format.open(split);
-      RowData nextElement = null;
-      while (!format.reachedEnd()) {
-        nextElement = format.nextRecord(nextElement);
-        sourceContext.collect(nextElement);
-      }
+    }
+    try {
+      consumeAsMiniBatch(split);
     } finally {
       currentSplitState = SplitState.IDLE;
-      format.close();
     }
 
     // Re-schedule to process the next split.
     enqueueProcessSplits();
   }
 
+  /**
+   * Consumes at most {@link #MINI_BATCH_SIZE} number of records
+   * for the given input split {@code split}.
+   *
+   * <p>Note: close the input format and remove the input split for the queue 
{@link #splits}
+   * if the split reads to the end.
+   *
+   * @param split The input split
+   */
+  private void consumeAsMiniBatch(MergeOnReadInputSplit split) throws 
IOException {
+    for (int i = 0; i < MINI_BATCH_SIZE; i++) {
+      if (!format.reachedEnd()) {
+        sourceContext.collect(format.nextRecord(null));
+        split.consume();
+      } else {
+        // close the input format
+        format.close();
+        // remove the split
+        splits.poll();
+        break;
+      }
+    }
+  }
+
   @Override
   public void processWatermark(Watermark mark) {
     // we do nothing because we emit our own watermarks if needed.
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 6ef608b..4e193fa 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -180,11 +180,9 @@ public class HoodieTableSource implements
               conf, FilePathUtils.toFlinkPath(path), 
maxCompactionMemoryInBytes, getRequiredPartitionPaths());
           InputFormat<RowData, ?> inputFormat = getInputFormat(true);
           OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> 
factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
-          SingleOutputStreamOperator<RowData> source = 
execEnv.addSource(monitoringFunction, "streaming_source")
-              .uid("uid_streaming_source_" + 
conf.getString(FlinkOptions.TABLE_NAME))
+          SingleOutputStreamOperator<RowData> source = 
execEnv.addSource(monitoringFunction, "split_monitor")
               .setParallelism(1)
               .transform("split_reader", typeInfo, factory)
-              .uid("uid_split_reader_" + 
conf.getString(FlinkOptions.TABLE_NAME))
               .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
           return new DataStreamSource<>(source);
         } else {
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 4cd45a8..566d4d3 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -129,6 +129,11 @@ public class MergeOnReadInputFormat
    */
   private boolean emitDelete;
 
+  /**
+   * Flag saying whether the input format has been closed.
+   */
+  private boolean closed = true;
+
   private MergeOnReadInputFormat(
       Configuration conf,
       MergeOnReadTableState tableState,
@@ -158,6 +163,7 @@ public class MergeOnReadInputFormat
   @Override
   public void open(MergeOnReadInputSplit split) throws IOException {
     this.currentReadCount = 0L;
+    this.closed = false;
     this.hadoopConf = StreamerUtil.getHadoopConf();
     if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() 
> 0)) {
       if (split.getInstantRange() != null) {
@@ -203,6 +209,7 @@ public class MergeOnReadInputFormat
           + "spark partition Index: " + split.getSplitNumber()
           + "merge type: " + split.getMergeType());
     }
+    mayShiftInputSplit(split);
   }
 
   @Override
@@ -249,12 +256,32 @@ public class MergeOnReadInputFormat
       this.iterator.close();
     }
     this.iterator = null;
+    this.closed = true;
+  }
+
+  public boolean isClosed() {
+    return this.closed;
   }
 
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
 
+  /**
+   * Shifts the input split by its consumed records number.
+   *
+   * <p>Note: This action is time-consuming.
+   */
+  private void mayShiftInputSplit(MergeOnReadInputSplit split) throws 
IOException {
+    if (split.isConsumed()) {
+      // if the input split has been consumed before,
+      // shift the input split with consumed num of records first
+      for (long i = 0; i < split.getConsumed() && !reachedEnd(); i++) {
+        nextRecord(null);
+      }
+    }
+  }
+
   private ParquetColumnarRowSplitReader getFullSchemaReader(String path) 
throws IOException {
     return getReader(path, IntStream.range(0, 
this.tableState.getRowType().getFieldCount()).toArray());
   }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
index 0c93eea..156622c 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
@@ -33,6 +33,8 @@ import java.util.List;
 public class MergeOnReadInputSplit implements InputSplit {
   private static final long serialVersionUID = 1L;
 
+  private static final long NUM_NO_CONSUMPTION = 0L;
+
   private final int splitNum;
   private final Option<String> basePath;
   private final Option<List<String>> logPaths;
@@ -42,6 +44,10 @@ public class MergeOnReadInputSplit implements InputSplit {
   private final String mergeType;
   private final Option<InstantRange> instantRange;
 
+  // for streaming reader to record the consumed offset,
+  // which is the start of next round reading.
+  private long consumed = NUM_NO_CONSUMPTION;
+
   public MergeOnReadInputSplit(
       int splitNum,
       @Nullable String basePath,
@@ -94,6 +100,18 @@ public class MergeOnReadInputSplit implements InputSplit {
     return this.splitNum;
   }
 
+  public void consume() {
+    this.consumed += 1L;
+  }
+
+  public long getConsumed() {
+    return consumed;
+  }
+
+  public boolean isConsumed() {
+    return this.consumed != NUM_NO_CONSUMPTION;
+  }
+
   @Override
   public String toString() {
     return "MergeOnReadInputSplit{"
@@ -107,5 +125,4 @@ public class MergeOnReadInputSplit implements InputSplit {
         + ", instantRange=" + instantRange
         + '}';
   }
-
 }
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index db7111b..a5812aa 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -128,7 +128,7 @@ public class HoodieDataSourceITCase extends 
AbstractTestBase {
         .setBoolean("table.dynamic-table-options.enabled", true);
     // specify the start commit as earliest
     List<Row> rows3 = execSelectSql(streamTableEnv,
-        "select * from 
t1/*+options('read.streaming.start-commit'='earliest')*/", 10);
+        "select * from t1/*+options('read.start-commit'='earliest')*/", 10);
     assertRowsEquals(rows3, TestData.DATA_SET_SOURCE_INSERT);
   }
 

Reply via email to