This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 31a301f [HUDI-2485] Consume as mini-batch for flink stream reader
(#3710)
31a301f is described below
commit 31a301f0aa955450e52c28ee5857526034d523f0
Author: Danny Chan <[email protected]>
AuthorDate: Fri Sep 24 23:44:01 2021 +0800
[HUDI-2485] Consume as mini-batch for flink stream reader (#3710)
---
.../org/apache/hudi/source/StreamReadOperator.java | 50 +++++++++++++++++-----
.../org/apache/hudi/table/HoodieTableSource.java | 4 +-
.../table/format/mor/MergeOnReadInputFormat.java | 27 ++++++++++++
.../table/format/mor/MergeOnReadInputSplit.java | 19 +++++++-
.../apache/hudi/table/HoodieDataSourceITCase.java | 2 +-
5 files changed, 86 insertions(+), 16 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java
index e2f5f7b..0130433 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadOperator.java
@@ -64,6 +64,8 @@ public class StreamReadOperator extends
AbstractStreamOperator<RowData>
private static final Logger LOG =
LoggerFactory.getLogger(StreamReadOperator.class);
+ private static final int MINI_BATCH_SIZE = 1000;
+
// It's the same thread that runs this operator and checkpoint actions. Use
this executor to schedule only
// splits for subsequent reading, so that a new checkpoint could be
triggered without blocking a long time
// for exhausting all scheduled split reading tasks.
@@ -74,6 +76,7 @@ public class StreamReadOperator extends
AbstractStreamOperator<RowData>
private transient SourceFunction.SourceContext<RowData> sourceContext;
private transient ListState<MergeOnReadInputSplit> inputSplitsState;
+
private transient Queue<MergeOnReadInputSplit> splits;
// Splits are read by the same thread that calls #processElement. Each read
task is submitted to that thread by adding
@@ -146,31 +149,56 @@ public class StreamReadOperator extends
AbstractStreamOperator<RowData>
}
private void processSplits() throws IOException {
- MergeOnReadInputSplit split = splits.poll();
+ MergeOnReadInputSplit split = splits.peek();
if (split == null) {
currentSplitState = SplitState.IDLE;
return;
}
- // This log is important to indicate the consuming process, there is only
one log message for one data bucket.
- LOG.info("Processing input split : {}", split);
-
- try {
+ // 1. open a fresh new input split and start reading as mini-batch
+ // 2. if the input split has remaining records to read, switches to
another runnable to handle
+ // 3. if the input split reads to the end, close the format and remove the
split from the queue #splits
+ // 4. for each runnable, reads at most #MINI_BATCH_SIZE number of records
+ if (format.isClosed()) {
+ // This log is important to indicate the consuming process,
+ // there is only one log message for one data bucket.
+ LOG.info("Processing input split : {}", split);
format.open(split);
- RowData nextElement = null;
- while (!format.reachedEnd()) {
- nextElement = format.nextRecord(nextElement);
- sourceContext.collect(nextElement);
- }
+ }
+ try {
+ consumeAsMiniBatch(split);
} finally {
currentSplitState = SplitState.IDLE;
- format.close();
}
// Re-schedule to process the next split.
enqueueProcessSplits();
}
+ /**
+ * Consumes at most {@link #MINI_BATCH_SIZE} number of records
+ * for the given input split {@code split}.
+ *
+ * <p>Note: close the input format and remove the input split for the queue
{@link #splits}
+ * if the split reads to the end.
+ *
+ * @param split The input split
+ */
+ private void consumeAsMiniBatch(MergeOnReadInputSplit split) throws
IOException {
+ for (int i = 0; i < MINI_BATCH_SIZE; i++) {
+ if (!format.reachedEnd()) {
+ sourceContext.collect(format.nextRecord(null));
+ split.consume();
+ } else {
+ // close the input format
+ format.close();
+ // remove the split
+ splits.poll();
+ break;
+ }
+ }
+ }
+
@Override
public void processWatermark(Watermark mark) {
// we do nothing because we emit our own watermarks if needed.
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 6ef608b..4e193fa 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -180,11 +180,9 @@ public class HoodieTableSource implements
conf, FilePathUtils.toFlinkPath(path),
maxCompactionMemoryInBytes, getRequiredPartitionPaths());
InputFormat<RowData, ?> inputFormat = getInputFormat(true);
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData>
factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
- SingleOutputStreamOperator<RowData> source =
execEnv.addSource(monitoringFunction, "streaming_source")
- .uid("uid_streaming_source_" +
conf.getString(FlinkOptions.TABLE_NAME))
+ SingleOutputStreamOperator<RowData> source =
execEnv.addSource(monitoringFunction, "split_monitor")
.setParallelism(1)
.transform("split_reader", typeInfo, factory)
- .uid("uid_split_reader_" +
conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
return new DataStreamSource<>(source);
} else {
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 4cd45a8..566d4d3 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -129,6 +129,11 @@ public class MergeOnReadInputFormat
*/
private boolean emitDelete;
+ /**
+ * Flag saying whether the input format has been closed.
+ */
+ private boolean closed = true;
+
private MergeOnReadInputFormat(
Configuration conf,
MergeOnReadTableState tableState,
@@ -158,6 +163,7 @@ public class MergeOnReadInputFormat
@Override
public void open(MergeOnReadInputSplit split) throws IOException {
this.currentReadCount = 0L;
+ this.closed = false;
this.hadoopConf = StreamerUtil.getHadoopConf();
if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size()
> 0)) {
if (split.getInstantRange() != null) {
@@ -203,6 +209,7 @@ public class MergeOnReadInputFormat
+ "spark partition Index: " + split.getSplitNumber()
+ "merge type: " + split.getMergeType());
}
+ mayShiftInputSplit(split);
}
@Override
@@ -249,12 +256,32 @@ public class MergeOnReadInputFormat
this.iterator.close();
}
this.iterator = null;
+ this.closed = true;
+ }
+
+ public boolean isClosed() {
+ return this.closed;
}
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
+ /**
+ * Shifts the input split by its consumed records number.
+ *
+ * <p>Note: This action is time-consuming.
+ */
+ private void mayShiftInputSplit(MergeOnReadInputSplit split) throws
IOException {
+ if (split.isConsumed()) {
+ // if the input split has been consumed before,
+ // shift the input split with consumed num of records first
+ for (long i = 0; i < split.getConsumed() && !reachedEnd(); i++) {
+ nextRecord(null);
+ }
+ }
+ }
+
private ParquetColumnarRowSplitReader getFullSchemaReader(String path)
throws IOException {
return getReader(path, IntStream.range(0,
this.tableState.getRowType().getFieldCount()).toArray());
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
index 0c93eea..156622c 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputSplit.java
@@ -33,6 +33,8 @@ import java.util.List;
public class MergeOnReadInputSplit implements InputSplit {
private static final long serialVersionUID = 1L;
+ private static final long NUM_NO_CONSUMPTION = 0L;
+
private final int splitNum;
private final Option<String> basePath;
private final Option<List<String>> logPaths;
@@ -42,6 +44,10 @@ public class MergeOnReadInputSplit implements InputSplit {
private final String mergeType;
private final Option<InstantRange> instantRange;
+ // for streaming reader to record the consumed offset,
+ // which is the start of next round reading.
+ private long consumed = NUM_NO_CONSUMPTION;
+
public MergeOnReadInputSplit(
int splitNum,
@Nullable String basePath,
@@ -94,6 +100,18 @@ public class MergeOnReadInputSplit implements InputSplit {
return this.splitNum;
}
+ public void consume() {
+ this.consumed += 1L;
+ }
+
+ public long getConsumed() {
+ return consumed;
+ }
+
+ public boolean isConsumed() {
+ return this.consumed != NUM_NO_CONSUMPTION;
+ }
+
@Override
public String toString() {
return "MergeOnReadInputSplit{"
@@ -107,5 +125,4 @@ public class MergeOnReadInputSplit implements InputSplit {
+ ", instantRange=" + instantRange
+ '}';
}
-
}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
index db7111b..a5812aa 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java
@@ -128,7 +128,7 @@ public class HoodieDataSourceITCase extends
AbstractTestBase {
.setBoolean("table.dynamic-table-options.enabled", true);
// specify the start commit as earliest
List<Row> rows3 = execSelectSql(streamTableEnv,
- "select * from
t1/*+options('read.streaming.start-commit'='earliest')*/", 10);
+ "select * from t1/*+options('read.start-commit'='earliest')*/", 10);
assertRowsEquals(rows3, TestData.DATA_SET_SOURCE_INSERT);
}