danny0405 commented on code in PR #12608:
URL: https://github.com/apache/hudi/pull/12608#discussion_r1940442316
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java:
##########
@@ -174,82 +168,34 @@ public static HoodieMergedLogRecordScanner logScanner(
.build();
}
- /**
- * Utility to read and buffer the records in the unMerged log record scanner.
- */
- public static class BoundedMemoryRecords {
- // Executor that runs the above producers in parallel
- private final BoundedInMemoryExecutor<HoodieRecord<?>, HoodieRecord<?>, ?>
executor;
-
- // Iterator for the buffer consumer
- private final Iterator<HoodieRecord<?>> iterator;
-
- public BoundedMemoryRecords(
- MergeOnReadInputSplit split,
- Schema logSchema,
- InternalSchema internalSchema,
- Configuration hadoopConf,
- org.apache.flink.configuration.Configuration flinkConf) {
- List<String> mergers =
Arrays.stream(flinkConf.getString(FlinkOptions.RECORD_MERGER_IMPLS).split(","))
- .map(String::trim)
- .distinct()
- .collect(Collectors.toList());
- HoodieRecordMerger merger = HoodieRecordUtils.createRecordMerger(
- split.getTablePath(), EngineType.FLINK, mergers,
flinkConf.getString(FlinkOptions.RECORD_MERGER_STRATEGY_ID));
- HoodieUnMergedLogRecordScanner.Builder scannerBuilder =
- HoodieUnMergedLogRecordScanner.newBuilder()
- .withStorage(HoodieStorageUtils.getStorage(
- split.getTablePath(),
HadoopFSUtils.getStorageConf(hadoopConf)))
- .withBasePath(split.getTablePath())
- .withLogFilePaths(split.getLogPaths().get())
- .withReaderSchema(logSchema)
- .withInternalSchema(internalSchema)
- .withLatestInstantTime(split.getLatestCommit())
- .withReverseReader(false)
- .withBufferSize(
-
flinkConf.getInteger(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
- HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE))
- .withInstantRange(split.getInstantRange())
- .withRecordMerger(merger);
-
- this.executor = new BoundedInMemoryExecutor<>(
- StreamerUtil.getMaxCompactionMemoryInBytes(flinkConf),
- getParallelProducers(scannerBuilder),
- Option.empty(),
- Function.identity(),
- new DefaultSizeEstimator<>(),
- Functions.noop());
- this.iterator = this.executor.getRecordIterator();
-
- // Start reading and buffering
- this.executor.startProducingAsync();
- }
-
- public Iterator<HoodieRecord<?>> getRecordsIterator() {
- return this.iterator;
- }
-
- /**
- * Setup log and parquet reading in parallel. Both write to central buffer.
- */
- private List<HoodieProducer<HoodieRecord<?>>> getParallelProducers(
- HoodieUnMergedLogRecordScanner.Builder scannerBuilder
- ) {
- List<HoodieProducer<HoodieRecord<?>>> producers = new ArrayList<>();
- producers.add(new FunctionBasedQueueProducer<>(queue -> {
- HoodieUnMergedLogRecordScanner scanner =
-
scannerBuilder.withLogRecordScannerCallback(queue::insertRecord).build();
- // Scan all the delta-log files, filling in the queue
- scanner.scan();
- return null;
- }));
+ public static HoodieUnMergedLogRecordScanner
getUnMergedLogRecordScanner(MergeOnReadInputSplit split,
+
Schema logSchema,
+
InternalSchema internalSchema,
+
Configuration hadoopConf,
+
org.apache.flink.configuration.Configuration flinkConf) {
+ List<String> mergers =
Arrays.stream(flinkConf.getString(FlinkOptions.RECORD_MERGER_IMPLS).split(","))
+ .map(String::trim)
+ .distinct()
+ .collect(Collectors.toList());
- return producers;
- }
+ HoodieRecordMerger merger = HoodieRecordUtils.createRecordMerger(
+ split.getTablePath(), EngineType.FLINK, mergers,
flinkConf.getString(FlinkOptions.RECORD_MERGER_STRATEGY_ID));
- public void close() {
- this.executor.shutdownNow();
- }
+ return HoodieUnMergedLogRecordScanner.newBuilder()
+ .withStorage(HoodieStorageUtils.getStorage(
+ split.getTablePath(), HadoopFSUtils.getStorageConf(hadoopConf)))
+ .withBasePath(split.getTablePath())
+ .withLogFilePaths(split.getLogPaths().get())
+ .withReaderSchema(logSchema)
+ .withInternalSchema(internalSchema)
+ .withLatestInstantTime(split.getLatestCommit())
+ .withReverseReader(false)
+ .withBufferSize(
+
flinkConf.getInteger(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
+ HoodieMemoryConfig.DEFAULT_MR_MAX_DFS_STREAM_BUFFER_SIZE))
+ .withInstantRange(split.getInstantRange())
+ .withRecordMerger(merger)
Review Comment:
Not sure whether the merger is necessary because for
`HoodieUnMergedLogRecordScanner`, the merge never happens.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]