This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4bc45a3 [HUDI-1445] Refactor AbstractHoodieLogRecordScanner to use
Builder (#2313)
4bc45a3 is described below
commit 4bc45a391afc14738a42fc731fe958b2af5e3ee8
Author: Danny Chan <[email protected]>
AuthorDate: Thu Dec 10 20:02:02 2020 +0800
[HUDI-1445] Refactor AbstractHoodieLogRecordScanner to use Builder (#2313)
---
.../hudi/cli/commands/HoodieLogFileCommand.java | 26 +++-
.../cli/commands/TestHoodieLogFileCommand.java | 24 ++-
.../HoodieSparkMergeOnReadTableCompactor.java | 16 +-
.../table/log/AbstractHoodieLogRecordScanner.java | 25 ++-
.../table/log/HoodieMergedLogRecordScanner.java | 81 ++++++++++
.../table/log/HoodieUnMergedLogRecordScanner.java | 74 +++++++++
.../common/functional/TestHoodieLogFormat.java | 171 ++++++++++++++++++---
.../realtime/RealtimeCompactedRecordReader.java | 23 +--
.../realtime/RealtimeUnmergedRecordReader.java | 17 +-
.../reader/DFSHoodieDatasetInputReader.java | 23 ++-
.../org/apache/hudi/HoodieMergeOnReadRDD.scala | 33 ++--
11 files changed, 431 insertions(+), 82 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index f8e82ae..e53dd38 100644
---
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -193,13 +193,25 @@ public class HoodieLogFileCommand implements
CommandMarker {
if (shouldMerge) {
System.out.println("===========================> MERGING RECORDS
<===================");
HoodieMergedLogRecordScanner scanner =
- new HoodieMergedLogRecordScanner(fs, client.getBasePath(),
logFilePaths, readerSchema,
-
client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(),
-
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES,
-
Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED),
-
Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED),
- HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE,
- HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH);
+ HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(client.getBasePath())
+ .withLogFilePaths(logFilePaths)
+ .withReaderSchema(readerSchema)
+ .withLatestInstantTime(
+ client.getActiveTimeline()
+ .getCommitTimeline().lastInstant().get().getTimestamp())
+ .withReadBlocksLazily(
+ Boolean.parseBoolean(
+
HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))
+ .withReverseReader(
+ Boolean.parseBoolean(
+
HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED))
+
.withBufferSize(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)
+ .withMaxMemorySizeInBytes(
+
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
+
.withSpillableMapBasePath(HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)
+ .build();
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner)
{
Option<IndexedRecord> record =
hoodieRecord.getData().getInsertValue(readerSchema);
if (allRecords.size() < limit) {
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index 0c52220..fbd2b92 100644
---
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -197,13 +197,23 @@ public class TestHoodieLogFileCommand extends
AbstractShellIntegrationTest {
// get expected result of 10 records.
List<String> logFilePaths = Arrays.stream(fs.globStatus(new
Path(partitionPath + "/*")))
.map(status ->
status.getPath().toString()).collect(Collectors.toList());
- HoodieMergedLogRecordScanner scanner =
- new HoodieMergedLogRecordScanner(fs, tablePath, logFilePaths, schema,
INSTANT_TIME,
- HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES,
-
Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED),
-
Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED),
- HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE,
- HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH);
+ HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(tablePath)
+ .withLogFilePaths(logFilePaths)
+ .withReaderSchema(schema)
+ .withLatestInstantTime(INSTANT_TIME)
+ .withMaxMemorySizeInBytes(
+ HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
+ .withReadBlocksLazily(
+ Boolean.parseBoolean(
+
HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))
+ .withReverseReader(
+ Boolean.parseBoolean(
+
HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED))
+ .withBufferSize(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)
+
.withSpillableMapBasePath(HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)
+ .build();
Iterator<HoodieRecord<? extends HoodieRecordPayload>> records =
scanner.iterator();
int num = 0;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
index 505eabb..65cefc9 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
@@ -129,10 +129,18 @@ public class HoodieSparkMergeOnReadTableCompactor<T
extends HoodieRecordPayload>
List<String> logFiles = operation.getDeltaFileNames().stream().map(
p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(),
operation.getPartitionPath()), p).toString())
.collect(toList());
- HoodieMergedLogRecordScanner scanner = new
HoodieMergedLogRecordScanner(fs, metaClient.getBasePath(), logFiles,
- readerSchema, maxInstantTime, maxMemoryPerCompaction,
config.getCompactionLazyBlockReadEnabled(),
- config.getCompactionReverseLogReadEnabled(),
config.getMaxDFSStreamBufferSize(),
- config.getSpillableMapBasePath());
+ HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(metaClient.getBasePath())
+ .withLogFilePaths(logFiles)
+ .withReaderSchema(readerSchema)
+ .withLatestInstantTime(maxInstantTime)
+ .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
+ .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
+ .withReverseReader(config.getCompactionReverseLogReadEnabled())
+ .withBufferSize(config.getMaxDFSStreamBufferSize())
+ .withSpillableMapBasePath(config.getSpillableMapBasePath())
+ .build();
if (!scanner.iterator().hasNext()) {
return new ArrayList<>();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index 8d8ef56..4ae709e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -105,7 +105,6 @@ public abstract class AbstractHoodieLogRecordScanner {
// Progress
private float progress = 0.0f;
- // TODO (NA) - Change this to a builder, this constructor is too long
public AbstractHoodieLogRecordScanner(FileSystem fs, String basePath,
List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, boolean readBlocksLazily, boolean
reverseReader, int bufferSize) {
this.readerSchema = readerSchema;
@@ -358,4 +357,28 @@ public abstract class AbstractHoodieLogRecordScanner {
public long getTotalCorruptBlocks() {
return totalCorruptBlocks.get();
}
+
+ /**
+ * Builder used to build {@code AbstractHoodieLogRecordScanner}.
+ */
+ public abstract static class Builder {
+
+ public abstract Builder withFileSystem(FileSystem fs);
+
+ public abstract Builder withBasePath(String basePath);
+
+ public abstract Builder withLogFilePaths(List<String> logFilePaths);
+
+ public abstract Builder withReaderSchema(Schema schema);
+
+ public abstract Builder withLatestInstantTime(String latestInstantTime);
+
+ public abstract Builder withReadBlocksLazily(boolean readBlocksLazily);
+
+ public abstract Builder withReverseReader(boolean reverseReader);
+
+ public abstract Builder withBufferSize(int bufferSize);
+
+ public abstract AbstractHoodieLogRecordScanner build();
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 025ae91..18f2167 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -105,6 +105,13 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
return numMergedRecordsInLog;
}
+ /**
+ * Returns the builder for {@code HoodieMergedLogRecordScanner}.
+ */
+ public static HoodieMergedLogRecordScanner.Builder newBuilder() {
+ return new Builder();
+ }
+
@Override
protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload>
hoodieRecord) throws IOException {
String key = hoodieRecord.getRecordKey();
@@ -128,5 +135,79 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
public long getTotalTimeTakenToReadAndMergeBlocks() {
return totalTimeTakenToReadAndMergeBlocks;
}
+
+ /**
+ * Builder used to build {@code HoodieUnMergedLogRecordScanner}.
+ */
+ public static class Builder extends AbstractHoodieLogRecordScanner.Builder {
+ private FileSystem fs;
+ private String basePath;
+ private List<String> logFilePaths;
+ private Schema readerSchema;
+ private String latestInstantTime;
+ private boolean readBlocksLazily;
+ private boolean reverseReader;
+ private int bufferSize;
+ // specific configurations
+ private Long maxMemorySizeInBytes;
+ private String spillableMapBasePath;
+
+ public Builder withFileSystem(FileSystem fs) {
+ this.fs = fs;
+ return this;
+ }
+
+ public Builder withBasePath(String basePath) {
+ this.basePath = basePath;
+ return this;
+ }
+
+ public Builder withLogFilePaths(List<String> logFilePaths) {
+ this.logFilePaths = logFilePaths;
+ return this;
+ }
+
+ public Builder withReaderSchema(Schema schema) {
+ this.readerSchema = schema;
+ return this;
+ }
+
+ public Builder withLatestInstantTime(String latestInstantTime) {
+ this.latestInstantTime = latestInstantTime;
+ return this;
+ }
+
+ public Builder withReadBlocksLazily(boolean readBlocksLazily) {
+ this.readBlocksLazily = readBlocksLazily;
+ return this;
+ }
+
+ public Builder withReverseReader(boolean reverseReader) {
+ this.reverseReader = reverseReader;
+ return this;
+ }
+
+ public Builder withBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ return this;
+ }
+
+ public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) {
+ this.maxMemorySizeInBytes = maxMemorySizeInBytes;
+ return this;
+ }
+
+ public Builder withSpillableMapBasePath(String spillableMapBasePath) {
+ this.spillableMapBasePath = spillableMapBasePath;
+ return this;
+ }
+
+ @Override
+ public HoodieMergedLogRecordScanner build() {
+ return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths,
readerSchema,
+ latestInstantTime, maxMemorySizeInBytes, readBlocksLazily,
reverseReader,
+ bufferSize, spillableMapBasePath);
+ }
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 9c9df12..1aac633 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -41,6 +41,13 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordScann
this.callback = callback;
}
+ /**
+ * Returns the builder for {@code HoodieUnMergedLogRecordScanner}.
+ */
+ public static HoodieUnMergedLogRecordScanner.Builder newBuilder() {
+ return new Builder();
+ }
+
@Override
protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload>
hoodieRecord) throws Exception {
// Just call callback without merging
@@ -60,4 +67,71 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordScann
public void apply(HoodieRecord<? extends HoodieRecordPayload> record)
throws Exception;
}
+
+ /**
+ * Builder used to build {@code HoodieUnMergedLogRecordScanner}.
+ */
+ public static class Builder extends AbstractHoodieLogRecordScanner.Builder {
+ private FileSystem fs;
+ private String basePath;
+ private List<String> logFilePaths;
+ private Schema readerSchema;
+ private String latestInstantTime;
+ private boolean readBlocksLazily;
+ private boolean reverseReader;
+ private int bufferSize;
+ // specific configurations
+ private LogRecordScannerCallback callback;
+
+ public Builder withFileSystem(FileSystem fs) {
+ this.fs = fs;
+ return this;
+ }
+
+ public Builder withBasePath(String basePath) {
+ this.basePath = basePath;
+ return this;
+ }
+
+ public Builder withLogFilePaths(List<String> logFilePaths) {
+ this.logFilePaths = logFilePaths;
+ return this;
+ }
+
+ public Builder withReaderSchema(Schema schema) {
+ this.readerSchema = schema;
+ return this;
+ }
+
+ public Builder withLatestInstantTime(String latestInstantTime) {
+ this.latestInstantTime = latestInstantTime;
+ return this;
+ }
+
+ public Builder withReadBlocksLazily(boolean readBlocksLazily) {
+ this.readBlocksLazily = readBlocksLazily;
+ return this;
+ }
+
+ public Builder withReverseReader(boolean reverseReader) {
+ this.reverseReader = reverseReader;
+ return this;
+ }
+
+ public Builder withBufferSize(int bufferSize) {
+ this.bufferSize = bufferSize;
+ return this;
+ }
+
+ public Builder withLogRecordScannerCallback(LogRecordScannerCallback
callback) {
+ this.callback = callback;
+ return this;
+ }
+
+ @Override
+ public HoodieUnMergedLogRecordScanner build() {
+ return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths,
readerSchema,
+ latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
callback);
+ }
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index fa25e17..98ece73 100755
---
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -460,9 +460,20 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
writer.close();
// scan all log blocks (across multiple log files)
- HoodieMergedLogRecordScanner scanner = new
HoodieMergedLogRecordScanner(fs, basePath,
- logFiles.stream().map(logFile ->
logFile.getPath().toString()).collect(Collectors.toList()), schema, "100",
- 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
+ HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(basePath)
+ .withLogFilePaths(
+ logFiles.stream()
+ .map(logFile ->
logFile.getPath().toString()).collect(Collectors.toList()))
+ .withReaderSchema(schema)
+ .withLatestInstantTime("100")
+ .withMaxMemorySizeInBytes(10240L)
+ .withReadBlocksLazily(readBlocksLazily)
+ .withReverseReader(false)
+ .withBufferSize(bufferSize)
+ .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+ .build();
List<IndexedRecord> scannedRecords = new ArrayList<>();
for (HoodieRecord record : scanner) {
@@ -601,8 +612,18 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1",
HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString()).collect(Collectors.toList());
- HoodieMergedLogRecordScanner scanner = new
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "100",
- 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
+ HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(basePath)
+ .withLogFilePaths(allLogFiles)
+ .withReaderSchema(schema)
+ .withLatestInstantTime("100")
+ .withMaxMemorySizeInBytes(10240L)
+ .withReadBlocksLazily(readBlocksLazily)
+ .withReverseReader(false)
+ .withBufferSize(bufferSize)
+ .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+ .build();
assertEquals(200, scanner.getTotalLogRecords());
Set<String> readKeys = new HashSet<>(200);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -663,8 +684,18 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1",
HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString()).collect(Collectors.toList());
- HoodieMergedLogRecordScanner scanner = new
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "102",
- 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
+ HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(basePath)
+ .withLogFilePaths(allLogFiles)
+ .withReaderSchema(schema)
+ .withLatestInstantTime("102")
+ .withMaxMemorySizeInBytes(10240L)
+ .withReadBlocksLazily(readBlocksLazily)
+ .withReverseReader(false)
+ .withBufferSize(bufferSize)
+ .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+ .build();
assertEquals(200, scanner.getTotalLogRecords(), "We read 200 records from
2 write batches");
Set<String> readKeys = new HashSet<>(200);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -742,8 +773,18 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1",
HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString()).collect(Collectors.toList());
- HoodieMergedLogRecordScanner scanner = new
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "103",
- 10240L, true, false, bufferSize, BASE_OUTPUT_PATH);
+ HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(basePath)
+ .withLogFilePaths(allLogFiles)
+ .withReaderSchema(schema)
+ .withLatestInstantTime("103")
+ .withMaxMemorySizeInBytes(10240L)
+ .withReadBlocksLazily(true)
+ .withReverseReader(false)
+ .withBufferSize(bufferSize)
+ .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+ .build();
assertEquals(200, scanner.getTotalLogRecords(), "We would read 200
records");
Set<String> readKeys = new HashSet<>(200);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -802,8 +843,18 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1",
HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString()).collect(Collectors.toList());
- HoodieMergedLogRecordScanner scanner = new
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "102",
- 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
+ HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(basePath)
+ .withLogFilePaths(allLogFiles)
+ .withReaderSchema(schema)
+ .withLatestInstantTime("102")
+ .withMaxMemorySizeInBytes(10240L)
+ .withReadBlocksLazily(readBlocksLazily)
+ .withReverseReader(false)
+ .withBufferSize(bufferSize)
+ .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+ .build();
assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200
records");
final List<String> readKeys = new ArrayList<>(200);
final List<Boolean> emptyPayloads = new ArrayList<>();
@@ -833,8 +884,18 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
writer.appendBlock(commandBlock);
readKeys.clear();
- scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles,
schema, "101", 10240L, readBlocksLazily,
- false, bufferSize, BASE_OUTPUT_PATH);
+ scanner = HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(basePath)
+ .withLogFilePaths(allLogFiles)
+ .withReaderSchema(schema)
+ .withLatestInstantTime("101")
+ .withMaxMemorySizeInBytes(10240L)
+ .withReadBlocksLazily(readBlocksLazily)
+ .withReverseReader(false)
+ .withBufferSize(bufferSize)
+ .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+ .build();
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
assertEquals(200, readKeys.size(), "Stream collect should return all 200
records after rollback of delete");
}
@@ -898,8 +959,18 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
.map(s -> s.getPath().toString()).collect(Collectors.toList());
// all data must be rolled back before merge
- HoodieMergedLogRecordScanner scanner = new
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "100",
- 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
+ HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(basePath)
+ .withLogFilePaths(allLogFiles)
+ .withReaderSchema(schema)
+ .withLatestInstantTime("100")
+ .withMaxMemorySizeInBytes(10240L)
+ .withReadBlocksLazily(readBlocksLazily)
+ .withReverseReader(false)
+ .withBufferSize(bufferSize)
+ .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+ .build();
assertEquals(0, scanner.getTotalLogRecords(), "We would have scanned 0
records because of rollback");
final List<String> readKeys = new ArrayList<>();
@@ -949,8 +1020,18 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1",
HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString()).collect(Collectors.toList());
- HoodieMergedLogRecordScanner scanner = new
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "100",
- 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
+ HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(basePath)
+ .withLogFilePaths(allLogFiles)
+ .withReaderSchema(schema)
+ .withLatestInstantTime("100")
+ .withMaxMemorySizeInBytes(10240L)
+ .withReadBlocksLazily(readBlocksLazily)
+ .withReverseReader(false)
+ .withBufferSize(bufferSize)
+ .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+ .build();
assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
}
@@ -983,8 +1064,18 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1",
HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString()).collect(Collectors.toList());
- HoodieMergedLogRecordScanner scanner = new
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "100",
- 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
+ HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(basePath)
+ .withLogFilePaths(allLogFiles)
+ .withReaderSchema(schema)
+ .withLatestInstantTime("100")
+ .withMaxMemorySizeInBytes(10240L)
+ .withReadBlocksLazily(readBlocksLazily)
+ .withReverseReader(false)
+ .withBufferSize(bufferSize)
+ .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+ .build();
assertEquals(100, scanner.getTotalLogRecords(), "We still would read 100
records");
final List<String> readKeys = new ArrayList<>(100);
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -1036,8 +1127,18 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1",
HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString()).collect(Collectors.toList());
- HoodieMergedLogRecordScanner scanner = new
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "101",
- 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
+ HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(basePath)
+ .withLogFilePaths(allLogFiles)
+ .withReaderSchema(schema)
+ .withLatestInstantTime("101")
+ .withMaxMemorySizeInBytes(10240L)
+ .withReadBlocksLazily(readBlocksLazily)
+ .withReverseReader(false)
+ .withBufferSize(bufferSize)
+ .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+ .build();
assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
}
@@ -1126,8 +1227,18 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1",
HoodieLogFile.DELTA_EXTENSION, "100")
.map(s -> s.getPath().toString()).collect(Collectors.toList());
- HoodieMergedLogRecordScanner scanner = new
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "101",
- 10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
+ HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(basePath)
+ .withLogFilePaths(allLogFiles)
+ .withReaderSchema(schema)
+ .withLatestInstantTime("101")
+ .withMaxMemorySizeInBytes(10240L)
+ .withReadBlocksLazily(readBlocksLazily)
+ .withReverseReader(false)
+ .withBufferSize(bufferSize)
+ .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+ .build();
assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
}
@@ -1183,8 +1294,18 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
List<String> allLogFiles = FSUtils.getAllLogFiles(fs, partitionPath,
"test-fileid1",
HoodieLogFile.DELTA_EXTENSION, "100").map(s ->
s.getPath().toString()).collect(Collectors.toList());
- HoodieMergedLogRecordScanner scanner = new
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema,
- "100", 10240L, readBlocksLazily, false, bufferSize,
BASE_OUTPUT_PATH);
+ HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(basePath)
+ .withLogFilePaths(allLogFiles)
+ .withReaderSchema(schema)
+ .withLatestInstantTime("100")
+ .withMaxMemorySizeInBytes(10240L)
+ .withReadBlocksLazily(readBlocksLazily)
+ .withReverseReader(false)
+ .withBufferSize(bufferSize)
+ .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+ .build();
assertEquals(Math.max(numRecordsInLog1, numRecordsInLog2),
scanner.getNumMergedRecordsInLog(),
"We would read 100 records");
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index b710b59..a139997 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -63,17 +63,18 @@ class RealtimeCompactedRecordReader extends
AbstractRealtimeRecordReader
// NOTE: HoodieCompactedLogRecordScanner will not return records for an
in-flight commit
// but can return records for completed commits > the commit we are trying
to read (if using
// readCommit() API)
- return new HoodieMergedLogRecordScanner(
- FSUtils.getFs(split.getPath().toString(), jobConf),
- split.getBasePath(),
- split.getDeltaLogPaths(),
- usesCustomPayload ? getWriterSchema() : getReaderSchema(),
- split.getMaxCommitTime(),
- HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf),
-
Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
- false,
- jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
- jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH));
+ return HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(FSUtils.getFs(split.getPath().toString(), jobConf))
+ .withBasePath(split.getBasePath())
+ .withLogFilePaths(split.getDeltaLogPaths())
+ .withReaderSchema(usesCustomPayload ? getWriterSchema() :
getReaderSchema())
+ .withLatestInstantTime(split.getMaxCommitTime())
+
.withMaxMemorySizeInBytes(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf))
+
.withReadBlocksLazily(Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
+ .withReverseReader(false)
+
.withBufferSize(jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
+
.withSpillableMapBasePath(jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+ .build();
}
@Override
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
index 76de84b..d209a5a 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
@@ -77,15 +77,22 @@ class RealtimeUnmergedRecordReader extends
AbstractRealtimeRecordReader
Option.empty(), x -> x, new DefaultSizeEstimator<>());
// Consumer of this record reader
this.iterator = this.executor.getQueue().iterator();
- this.logRecordScanner = new
HoodieUnMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(),
this.jobConf),
- split.getBasePath(), split.getDeltaLogPaths(), getReaderSchema(),
split.getMaxCommitTime(),
-
Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
- false,
this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> {
+ this.logRecordScanner = HoodieUnMergedLogRecordScanner.newBuilder()
+ .withFileSystem(FSUtils.getFs(split.getPath().toString(),
this.jobConf))
+ .withBasePath(split.getBasePath())
+ .withLogFilePaths(split.getDeltaLogPaths())
+ .withReaderSchema(getReaderSchema())
+ .withLatestInstantTime(split.getMaxCommitTime())
+
.withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
+ .withReverseReader(false)
+
.withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
+ .withLogRecordScannerCallback(record -> {
// convert Hoodie log record to Hadoop AvroWritable and buffer
GenericRecord rec = (GenericRecord)
record.getData().getInsertValue(getReaderSchema()).get();
ArrayWritable aWritable = (ArrayWritable)
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema());
this.executor.getQueue().insertRecord(aWritable);
- });
+ })
+ .build();
// Start reading and buffering
this.executor.startProducers();
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index cfe7991..2bd507c 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -249,14 +249,21 @@ public class DFSHoodieDatasetInputReader extends
DFSDeltaInputReader {
return itr;
} else {
// If there is no data file, fall back to reading log files
- HoodieMergedLogRecordScanner scanner = new
HoodieMergedLogRecordScanner(metaClient.getFs(),
- metaClient.getBasePath(),
- fileSlice.getLogFiles().map(l ->
l.getPath().getName()).collect(Collectors.toList()),
- new Schema.Parser().parse(schemaStr),
metaClient.getActiveTimeline().getCommitsTimeline()
- .filterCompletedInstants().lastInstant().get().getTimestamp(),
- HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES,
true, false,
- HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE,
- HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH);
+ HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(metaClient.getFs())
+ .withBasePath(metaClient.getBasePath())
+ .withLogFilePaths(
+ fileSlice.getLogFiles().map(l ->
l.getPath().getName()).collect(Collectors.toList()))
+ .withReaderSchema(new Schema.Parser().parse(schemaStr))
+
.withLatestInstantTime(metaClient.getActiveTimeline().getCommitsTimeline()
+ .filterCompletedInstants().lastInstant().get().getTimestamp())
+ .withMaxMemorySizeInBytes(
+ HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
+ .withReadBlocksLazily(true)
+ .withReverseReader(false)
+
.withBufferSize(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)
+
.withSpillableMapBasePath(HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)
+ .build();
// readAvro log files
Iterable<HoodieRecord<? extends HoodieRecordPayload>> iterable = () ->
scanner.iterator();
Schema schema = new Schema.Parser().parse(schemaStr);
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index 1481024..e8caa63 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -255,19 +255,24 @@ private object HoodieMergeOnReadRDD {
def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config:
Configuration): HoodieMergedLogRecordScanner = {
val fs = FSUtils.getFs(split.tablePath, config)
- new HoodieMergedLogRecordScanner(
- fs,
- split.tablePath,
- split.logPaths.get.asJava,
- logSchema,
- split.latestCommit,
- split.maxCompactionMemoryInBytes,
-
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean).getOrElse(false),
- false,
- config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
- HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
- config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
- HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+ HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(fs)
+ .withBasePath(split.tablePath)
+ .withLogFilePaths(split.logPaths.get.asJava)
+ .withReaderSchema(logSchema)
+ .withLatestInstantTime(split.latestCommit)
+ .withReadBlocksLazily(
+
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
+
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
+ .getOrElse(false))
+ .withReverseReader(false)
+ .withBufferSize(
+ config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
+ HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
+ .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
+ .withSpillableMapBasePath(
+ config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
+ HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+ .build()
}
}