This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bc45a3  [HUDI-1445] Refactor AbstractHoodieLogRecordScanner to use 
Builder (#2313)
4bc45a3 is described below

commit 4bc45a391afc14738a42fc731fe958b2af5e3ee8
Author: Danny Chan <[email protected]>
AuthorDate: Thu Dec 10 20:02:02 2020 +0800

    [HUDI-1445] Refactor AbstractHoodieLogRecordScanner to use Builder (#2313)
---
 .../hudi/cli/commands/HoodieLogFileCommand.java    |  26 +++-
 .../cli/commands/TestHoodieLogFileCommand.java     |  24 ++-
 .../HoodieSparkMergeOnReadTableCompactor.java      |  16 +-
 .../table/log/AbstractHoodieLogRecordScanner.java  |  25 ++-
 .../table/log/HoodieMergedLogRecordScanner.java    |  81 ++++++++++
 .../table/log/HoodieUnMergedLogRecordScanner.java  |  74 +++++++++
 .../common/functional/TestHoodieLogFormat.java     | 171 ++++++++++++++++++---
 .../realtime/RealtimeCompactedRecordReader.java    |  23 +--
 .../realtime/RealtimeUnmergedRecordReader.java     |  17 +-
 .../reader/DFSHoodieDatasetInputReader.java        |  23 ++-
 .../org/apache/hudi/HoodieMergeOnReadRDD.scala     |  33 ++--
 11 files changed, 431 insertions(+), 82 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index f8e82ae..e53dd38 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -193,13 +193,25 @@ public class HoodieLogFileCommand implements 
CommandMarker {
     if (shouldMerge) {
       System.out.println("===========================> MERGING RECORDS 
<===================");
       HoodieMergedLogRecordScanner scanner =
-          new HoodieMergedLogRecordScanner(fs, client.getBasePath(), 
logFilePaths, readerSchema,
-              
client.getActiveTimeline().getCommitTimeline().lastInstant().get().getTimestamp(),
-                  
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES,
-                  
Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED),
-                  
Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED),
-                  HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE,
-                  HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH);
+          HoodieMergedLogRecordScanner.newBuilder()
+              .withFileSystem(fs)
+              .withBasePath(client.getBasePath())
+              .withLogFilePaths(logFilePaths)
+              .withReaderSchema(readerSchema)
+              .withLatestInstantTime(
+                  client.getActiveTimeline()
+                      .getCommitTimeline().lastInstant().get().getTimestamp())
+              .withReadBlocksLazily(
+                  Boolean.parseBoolean(
+                      
HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))
+              .withReverseReader(
+                  Boolean.parseBoolean(
+                      
HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED))
+              
.withBufferSize(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)
+              .withMaxMemorySizeInBytes(
+                  
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
+              
.withSpillableMapBasePath(HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)
+              .build();
       for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) 
{
         Option<IndexedRecord> record = 
hoodieRecord.getData().getInsertValue(readerSchema);
         if (allRecords.size() < limit) {
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index 0c52220..fbd2b92 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -197,13 +197,23 @@ public class TestHoodieLogFileCommand extends 
AbstractShellIntegrationTest {
     // get expected result of 10 records.
     List<String> logFilePaths = Arrays.stream(fs.globStatus(new 
Path(partitionPath + "/*")))
         .map(status -> 
status.getPath().toString()).collect(Collectors.toList());
-    HoodieMergedLogRecordScanner scanner =
-        new HoodieMergedLogRecordScanner(fs, tablePath, logFilePaths, schema, 
INSTANT_TIME,
-            HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES,
-            
Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED),
-            
Boolean.parseBoolean(HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED),
-            HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE,
-            HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH);
+    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(tablePath)
+        .withLogFilePaths(logFilePaths)
+        .withReaderSchema(schema)
+        .withLatestInstantTime(INSTANT_TIME)
+        .withMaxMemorySizeInBytes(
+            HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
+        .withReadBlocksLazily(
+            Boolean.parseBoolean(
+                
HoodieCompactionConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))
+        .withReverseReader(
+            Boolean.parseBoolean(
+                
HoodieCompactionConfig.DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED))
+        .withBufferSize(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)
+        
.withSpillableMapBasePath(HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)
+        .build();
 
     Iterator<HoodieRecord<? extends HoodieRecordPayload>> records = 
scanner.iterator();
     int num = 0;
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
index 505eabb..65cefc9 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
@@ -129,10 +129,18 @@ public class HoodieSparkMergeOnReadTableCompactor<T 
extends HoodieRecordPayload>
     List<String> logFiles = operation.getDeltaFileNames().stream().map(
         p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), 
operation.getPartitionPath()), p).toString())
         .collect(toList());
-    HoodieMergedLogRecordScanner scanner = new 
HoodieMergedLogRecordScanner(fs, metaClient.getBasePath(), logFiles,
-        readerSchema, maxInstantTime, maxMemoryPerCompaction, 
config.getCompactionLazyBlockReadEnabled(),
-        config.getCompactionReverseLogReadEnabled(), 
config.getMaxDFSStreamBufferSize(),
-        config.getSpillableMapBasePath());
+    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(metaClient.getBasePath())
+        .withLogFilePaths(logFiles)
+        .withReaderSchema(readerSchema)
+        .withLatestInstantTime(maxInstantTime)
+        .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
+        .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
+        .withReverseReader(config.getCompactionReverseLogReadEnabled())
+        .withBufferSize(config.getMaxDFSStreamBufferSize())
+        .withSpillableMapBasePath(config.getSpillableMapBasePath())
+        .build();
     if (!scanner.iterator().hasNext()) {
       return new ArrayList<>();
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index 8d8ef56..4ae709e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -105,7 +105,6 @@ public abstract class AbstractHoodieLogRecordScanner {
   // Progress
   private float progress = 0.0f;
 
-  // TODO (NA) - Change this to a builder, this constructor is too long
   public AbstractHoodieLogRecordScanner(FileSystem fs, String basePath, 
List<String> logFilePaths, Schema readerSchema,
       String latestInstantTime, boolean readBlocksLazily, boolean 
reverseReader, int bufferSize) {
     this.readerSchema = readerSchema;
@@ -358,4 +357,28 @@ public abstract class AbstractHoodieLogRecordScanner {
   public long getTotalCorruptBlocks() {
     return totalCorruptBlocks.get();
   }
+
+  /**
+   * Builder used to build {@code AbstractHoodieLogRecordScanner}.
+   */
+  public abstract static class Builder {
+
+    public abstract Builder withFileSystem(FileSystem fs);
+
+    public abstract Builder withBasePath(String basePath);
+
+    public abstract Builder withLogFilePaths(List<String> logFilePaths);
+
+    public abstract Builder withReaderSchema(Schema schema);
+
+    public abstract Builder withLatestInstantTime(String latestInstantTime);
+
+    public abstract Builder withReadBlocksLazily(boolean readBlocksLazily);
+
+    public abstract Builder withReverseReader(boolean reverseReader);
+
+    public abstract Builder withBufferSize(int bufferSize);
+
+    public abstract AbstractHoodieLogRecordScanner build();
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 025ae91..18f2167 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -105,6 +105,13 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
     return numMergedRecordsInLog;
   }
 
+  /**
+   * Returns the builder for {@code HoodieMergedLogRecordScanner}.
+   */
+  public static HoodieMergedLogRecordScanner.Builder newBuilder() {
+    return new Builder();
+  }
+
   @Override
   protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> 
hoodieRecord) throws IOException {
     String key = hoodieRecord.getRecordKey();
@@ -128,5 +135,79 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
   public long getTotalTimeTakenToReadAndMergeBlocks() {
     return totalTimeTakenToReadAndMergeBlocks;
   }
+
+  /**
+   * Builder used to build {@code HoodieUnMergedLogRecordScanner}.
+   */
+  public static class Builder extends AbstractHoodieLogRecordScanner.Builder {
+    private FileSystem fs;
+    private String basePath;
+    private List<String> logFilePaths;
+    private Schema readerSchema;
+    private String latestInstantTime;
+    private boolean readBlocksLazily;
+    private boolean reverseReader;
+    private int bufferSize;
+    // specific configurations
+    private Long maxMemorySizeInBytes;
+    private String spillableMapBasePath;
+
+    public Builder withFileSystem(FileSystem fs) {
+      this.fs = fs;
+      return this;
+    }
+
+    public Builder withBasePath(String basePath) {
+      this.basePath = basePath;
+      return this;
+    }
+
+    public Builder withLogFilePaths(List<String> logFilePaths) {
+      this.logFilePaths = logFilePaths;
+      return this;
+    }
+
+    public Builder withReaderSchema(Schema schema) {
+      this.readerSchema = schema;
+      return this;
+    }
+
+    public Builder withLatestInstantTime(String latestInstantTime) {
+      this.latestInstantTime = latestInstantTime;
+      return this;
+    }
+
+    public Builder withReadBlocksLazily(boolean readBlocksLazily) {
+      this.readBlocksLazily = readBlocksLazily;
+      return this;
+    }
+
+    public Builder withReverseReader(boolean reverseReader) {
+      this.reverseReader = reverseReader;
+      return this;
+    }
+
+    public Builder withBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) {
+      this.maxMemorySizeInBytes = maxMemorySizeInBytes;
+      return this;
+    }
+
+    public Builder withSpillableMapBasePath(String spillableMapBasePath) {
+      this.spillableMapBasePath = spillableMapBasePath;
+      return this;
+    }
+
+    @Override
+    public HoodieMergedLogRecordScanner build() {
+      return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, 
readerSchema,
+          latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, 
reverseReader,
+          bufferSize, spillableMapBasePath);
+    }
+  }
 }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 9c9df12..1aac633 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -41,6 +41,13 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordScann
     this.callback = callback;
   }
 
+  /**
+   * Returns the builder for {@code HoodieUnMergedLogRecordScanner}.
+   */
+  public static HoodieUnMergedLogRecordScanner.Builder newBuilder() {
+    return new Builder();
+  }
+
   @Override
   protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> 
hoodieRecord) throws Exception {
     // Just call callback without merging
@@ -60,4 +67,71 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordScann
 
     public void apply(HoodieRecord<? extends HoodieRecordPayload> record) 
throws Exception;
   }
+
+  /**
+   * Builder used to build {@code HoodieUnMergedLogRecordScanner}.
+   */
+  public static class Builder extends AbstractHoodieLogRecordScanner.Builder {
+    private FileSystem fs;
+    private String basePath;
+    private List<String> logFilePaths;
+    private Schema readerSchema;
+    private String latestInstantTime;
+    private boolean readBlocksLazily;
+    private boolean reverseReader;
+    private int bufferSize;
+    // specific configurations
+    private LogRecordScannerCallback callback;
+
+    public Builder withFileSystem(FileSystem fs) {
+      this.fs = fs;
+      return this;
+    }
+
+    public Builder withBasePath(String basePath) {
+      this.basePath = basePath;
+      return this;
+    }
+
+    public Builder withLogFilePaths(List<String> logFilePaths) {
+      this.logFilePaths = logFilePaths;
+      return this;
+    }
+
+    public Builder withReaderSchema(Schema schema) {
+      this.readerSchema = schema;
+      return this;
+    }
+
+    public Builder withLatestInstantTime(String latestInstantTime) {
+      this.latestInstantTime = latestInstantTime;
+      return this;
+    }
+
+    public Builder withReadBlocksLazily(boolean readBlocksLazily) {
+      this.readBlocksLazily = readBlocksLazily;
+      return this;
+    }
+
+    public Builder withReverseReader(boolean reverseReader) {
+      this.reverseReader = reverseReader;
+      return this;
+    }
+
+    public Builder withBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public Builder withLogRecordScannerCallback(LogRecordScannerCallback 
callback) {
+      this.callback = callback;
+      return this;
+    }
+
+    @Override
+    public HoodieUnMergedLogRecordScanner build() {
+      return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths, 
readerSchema,
+          latestInstantTime, readBlocksLazily, reverseReader, bufferSize, 
callback);
+    }
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index fa25e17..98ece73 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -460,9 +460,20 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.close();
 
     // scan all log blocks (across multiple log files)
-    HoodieMergedLogRecordScanner scanner = new 
HoodieMergedLogRecordScanner(fs, basePath,
-        logFiles.stream().map(logFile -> 
logFile.getPath().toString()).collect(Collectors.toList()), schema, "100",
-        10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
+    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(basePath)
+        .withLogFilePaths(
+            logFiles.stream()
+                .map(logFile -> 
logFile.getPath().toString()).collect(Collectors.toList()))
+        .withReaderSchema(schema)
+        .withLatestInstantTime("100")
+        .withMaxMemorySizeInBytes(10240L)
+        .withReadBlocksLazily(readBlocksLazily)
+        .withReverseReader(false)
+        .withBufferSize(bufferSize)
+        .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+        .build();
 
     List<IndexedRecord> scannedRecords = new ArrayList<>();
     for (HoodieRecord record : scanner) {
@@ -601,8 +612,18 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", 
HoodieLogFile.DELTA_EXTENSION, "100")
             .map(s -> s.getPath().toString()).collect(Collectors.toList());
 
-    HoodieMergedLogRecordScanner scanner = new 
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "100",
-        10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
+    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(basePath)
+        .withLogFilePaths(allLogFiles)
+        .withReaderSchema(schema)
+        .withLatestInstantTime("100")
+        .withMaxMemorySizeInBytes(10240L)
+        .withReadBlocksLazily(readBlocksLazily)
+        .withReverseReader(false)
+        .withBufferSize(bufferSize)
+        .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+        .build();
     assertEquals(200, scanner.getTotalLogRecords());
     Set<String> readKeys = new HashSet<>(200);
     scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -663,8 +684,18 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", 
HoodieLogFile.DELTA_EXTENSION, "100")
             .map(s -> s.getPath().toString()).collect(Collectors.toList());
 
-    HoodieMergedLogRecordScanner scanner = new 
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "102",
-        10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
+    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(basePath)
+        .withLogFilePaths(allLogFiles)
+        .withReaderSchema(schema)
+        .withLatestInstantTime("102")
+        .withMaxMemorySizeInBytes(10240L)
+        .withReadBlocksLazily(readBlocksLazily)
+        .withReverseReader(false)
+        .withBufferSize(bufferSize)
+        .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+        .build();
     assertEquals(200, scanner.getTotalLogRecords(), "We read 200 records from 
2 write batches");
     Set<String> readKeys = new HashSet<>(200);
     scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -742,8 +773,18 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", 
HoodieLogFile.DELTA_EXTENSION, "100")
             .map(s -> s.getPath().toString()).collect(Collectors.toList());
 
-    HoodieMergedLogRecordScanner scanner = new 
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "103",
-        10240L, true, false, bufferSize, BASE_OUTPUT_PATH);
+    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(basePath)
+        .withLogFilePaths(allLogFiles)
+        .withReaderSchema(schema)
+        .withLatestInstantTime("103")
+        .withMaxMemorySizeInBytes(10240L)
+        .withReadBlocksLazily(true)
+        .withReverseReader(false)
+        .withBufferSize(bufferSize)
+        .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+        .build();
     assertEquals(200, scanner.getTotalLogRecords(), "We would read 200 
records");
     Set<String> readKeys = new HashSet<>(200);
     scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -802,8 +843,18 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", 
HoodieLogFile.DELTA_EXTENSION, "100")
             .map(s -> s.getPath().toString()).collect(Collectors.toList());
 
-    HoodieMergedLogRecordScanner scanner = new 
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "102",
-        10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
+    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(basePath)
+        .withLogFilePaths(allLogFiles)
+        .withReaderSchema(schema)
+        .withLatestInstantTime("102")
+        .withMaxMemorySizeInBytes(10240L)
+        .withReadBlocksLazily(readBlocksLazily)
+        .withReverseReader(false)
+        .withBufferSize(bufferSize)
+        .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+        .build();
     assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 
records");
     final List<String> readKeys = new ArrayList<>(200);
     final List<Boolean> emptyPayloads = new ArrayList<>();
@@ -833,8 +884,18 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.appendBlock(commandBlock);
 
     readKeys.clear();
-    scanner = new HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, 
schema, "101", 10240L, readBlocksLazily,
-        false, bufferSize, BASE_OUTPUT_PATH);
+    scanner = HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(basePath)
+        .withLogFilePaths(allLogFiles)
+        .withReaderSchema(schema)
+        .withLatestInstantTime("101")
+        .withMaxMemorySizeInBytes(10240L)
+        .withReadBlocksLazily(readBlocksLazily)
+        .withReverseReader(false)
+        .withBufferSize(bufferSize)
+        .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+        .build();
     scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
     assertEquals(200, readKeys.size(), "Stream collect should return all 200 
records after rollback of delete");
   }
@@ -898,8 +959,18 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
             .map(s -> s.getPath().toString()).collect(Collectors.toList());
 
     // all data must be rolled back before merge
-    HoodieMergedLogRecordScanner scanner = new 
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "100",
-        10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
+    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(basePath)
+        .withLogFilePaths(allLogFiles)
+        .withReaderSchema(schema)
+        .withLatestInstantTime("100")
+        .withMaxMemorySizeInBytes(10240L)
+        .withReadBlocksLazily(readBlocksLazily)
+        .withReverseReader(false)
+        .withBufferSize(bufferSize)
+        .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+        .build();
     assertEquals(0, scanner.getTotalLogRecords(), "We would have scanned 0 
records because of rollback");
 
     final List<String> readKeys = new ArrayList<>();
@@ -949,8 +1020,18 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", 
HoodieLogFile.DELTA_EXTENSION, "100")
             .map(s -> s.getPath().toString()).collect(Collectors.toList());
 
-    HoodieMergedLogRecordScanner scanner = new 
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "100",
-        10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
+    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(basePath)
+        .withLogFilePaths(allLogFiles)
+        .withReaderSchema(schema)
+        .withLatestInstantTime("100")
+        .withMaxMemorySizeInBytes(10240L)
+        .withReadBlocksLazily(readBlocksLazily)
+        .withReverseReader(false)
+        .withBufferSize(bufferSize)
+        .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+        .build();
     assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
   }
 
@@ -983,8 +1064,18 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", 
HoodieLogFile.DELTA_EXTENSION, "100")
             .map(s -> s.getPath().toString()).collect(Collectors.toList());
 
-    HoodieMergedLogRecordScanner scanner = new 
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "100",
-        10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
+    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(basePath)
+        .withLogFilePaths(allLogFiles)
+        .withReaderSchema(schema)
+        .withLatestInstantTime("100")
+        .withMaxMemorySizeInBytes(10240L)
+        .withReadBlocksLazily(readBlocksLazily)
+        .withReverseReader(false)
+        .withBufferSize(bufferSize)
+        .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+        .build();
     assertEquals(100, scanner.getTotalLogRecords(), "We still would read 100 
records");
     final List<String> readKeys = new ArrayList<>(100);
     scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
@@ -1036,8 +1127,18 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", 
HoodieLogFile.DELTA_EXTENSION, "100")
             .map(s -> s.getPath().toString()).collect(Collectors.toList());
 
-    HoodieMergedLogRecordScanner scanner = new 
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "101",
-        10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
+    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(basePath)
+        .withLogFilePaths(allLogFiles)
+        .withReaderSchema(schema)
+        .withLatestInstantTime("101")
+        .withMaxMemorySizeInBytes(10240L)
+        .withReadBlocksLazily(readBlocksLazily)
+        .withReverseReader(false)
+        .withBufferSize(bufferSize)
+        .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+        .build();
     assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
   }
 
@@ -1126,8 +1227,18 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
         FSUtils.getAllLogFiles(fs, partitionPath, "test-fileid1", 
HoodieLogFile.DELTA_EXTENSION, "100")
             .map(s -> s.getPath().toString()).collect(Collectors.toList());
 
-    HoodieMergedLogRecordScanner scanner = new 
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema, "101",
-        10240L, readBlocksLazily, false, bufferSize, BASE_OUTPUT_PATH);
+    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(basePath)
+        .withLogFilePaths(allLogFiles)
+        .withReaderSchema(schema)
+        .withLatestInstantTime("101")
+        .withMaxMemorySizeInBytes(10240L)
+        .withReadBlocksLazily(readBlocksLazily)
+        .withReverseReader(false)
+        .withBufferSize(bufferSize)
+        .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+        .build();
     assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
   }
 
@@ -1183,8 +1294,18 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       List<String> allLogFiles = FSUtils.getAllLogFiles(fs, partitionPath, 
"test-fileid1",
           HoodieLogFile.DELTA_EXTENSION, "100").map(s -> 
s.getPath().toString()).collect(Collectors.toList());
 
-      HoodieMergedLogRecordScanner scanner = new 
HoodieMergedLogRecordScanner(fs, basePath, allLogFiles, schema,
-          "100", 10240L, readBlocksLazily, false, bufferSize, 
BASE_OUTPUT_PATH);
+      HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+          .withFileSystem(fs)
+          .withBasePath(basePath)
+          .withLogFilePaths(allLogFiles)
+          .withReaderSchema(schema)
+          .withLatestInstantTime("100")
+          .withMaxMemorySizeInBytes(10240L)
+          .withReadBlocksLazily(readBlocksLazily)
+          .withReverseReader(false)
+          .withBufferSize(bufferSize)
+          .withSpillableMapBasePath(BASE_OUTPUT_PATH)
+          .build();
 
       assertEquals(Math.max(numRecordsInLog1, numRecordsInLog2), 
scanner.getNumMergedRecordsInLog(),
           "We would read 100 records");
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index b710b59..a139997 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -63,17 +63,18 @@ class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
     // NOTE: HoodieCompactedLogRecordScanner will not return records for an 
in-flight commit
     // but can return records for completed commits > the commit we are trying 
to read (if using
     // readCommit() API)
-    return new HoodieMergedLogRecordScanner(
-        FSUtils.getFs(split.getPath().toString(), jobConf),
-        split.getBasePath(),
-        split.getDeltaLogPaths(),
-        usesCustomPayload ? getWriterSchema() : getReaderSchema(),
-        split.getMaxCommitTime(),
-        HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf),
-        
Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
 HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
-        false,
-        jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, 
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
-        jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, 
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH));
+    return HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(FSUtils.getFs(split.getPath().toString(), jobConf))
+        .withBasePath(split.getBasePath())
+        .withLogFilePaths(split.getDeltaLogPaths())
+        .withReaderSchema(usesCustomPayload ? getWriterSchema() : 
getReaderSchema())
+        .withLatestInstantTime(split.getMaxCommitTime())
+        
.withMaxMemorySizeInBytes(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf))
+        
.withReadBlocksLazily(Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
 HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
+        .withReverseReader(false)
+        
.withBufferSize(jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
 HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
+        
.withSpillableMapBasePath(jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
 HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+        .build();
   }
 
   @Override
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
index 76de84b..d209a5a 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
@@ -77,15 +77,22 @@ class RealtimeUnmergedRecordReader extends 
AbstractRealtimeRecordReader
         Option.empty(), x -> x, new DefaultSizeEstimator<>());
     // Consumer of this record reader
     this.iterator = this.executor.getQueue().iterator();
-    this.logRecordScanner = new 
HoodieUnMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), 
this.jobConf),
-        split.getBasePath(), split.getDeltaLogPaths(), getReaderSchema(), 
split.getMaxCommitTime(),
-        
Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
 HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
-        false, 
this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, 
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> {
+    this.logRecordScanner = HoodieUnMergedLogRecordScanner.newBuilder()
+        .withFileSystem(FSUtils.getFs(split.getPath().toString(), 
this.jobConf))
+        .withBasePath(split.getBasePath())
+        .withLogFilePaths(split.getDeltaLogPaths())
+        .withReaderSchema(getReaderSchema())
+        .withLatestInstantTime(split.getMaxCommitTime())
+        
.withReadBlocksLazily(Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
 HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
+        .withReverseReader(false)
+        
.withBufferSize(this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
 HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
+        .withLogRecordScannerCallback(record -> {
           // convert Hoodie log record to Hadoop AvroWritable and buffer
           GenericRecord rec = (GenericRecord) 
record.getData().getInsertValue(getReaderSchema()).get();
           ArrayWritable aWritable = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema());
           this.executor.getQueue().insertRecord(aWritable);
-    });
+        })
+        .build();
     // Start reading and buffering
     this.executor.startProducers();
   }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index cfe7991..2bd507c 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -249,14 +249,21 @@ public class DFSHoodieDatasetInputReader extends 
DFSDeltaInputReader {
       return itr;
     } else {
       // If there is no data file, fall back to reading log files
-      HoodieMergedLogRecordScanner scanner = new 
HoodieMergedLogRecordScanner(metaClient.getFs(),
-          metaClient.getBasePath(),
-          fileSlice.getLogFiles().map(l -> 
l.getPath().getName()).collect(Collectors.toList()),
-          new Schema.Parser().parse(schemaStr), 
metaClient.getActiveTimeline().getCommitsTimeline()
-          .filterCompletedInstants().lastInstant().get().getTimestamp(),
-          HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES, 
true, false,
-          HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE,
-          HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH);
+      HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+          .withFileSystem(metaClient.getFs())
+          .withBasePath(metaClient.getBasePath())
+          .withLogFilePaths(
+              fileSlice.getLogFiles().map(l -> 
l.getPath().getName()).collect(Collectors.toList()))
+          .withReaderSchema(new Schema.Parser().parse(schemaStr))
+          
.withLatestInstantTime(metaClient.getActiveTimeline().getCommitsTimeline()
+              .filterCompletedInstants().lastInstant().get().getTimestamp())
+          .withMaxMemorySizeInBytes(
+              HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
+          .withReadBlocksLazily(true)
+          .withReverseReader(false)
+          
.withBufferSize(HoodieMemoryConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)
+          
.withSpillableMapBasePath(HoodieMemoryConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)
+          .build();
       // readAvro log files
       Iterable<HoodieRecord<? extends HoodieRecordPayload>> iterable = () -> 
scanner.iterator();
       Schema schema = new Schema.Parser().parse(schemaStr);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index 1481024..e8caa63 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -255,19 +255,24 @@ private object HoodieMergeOnReadRDD {
 
   def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: 
Configuration): HoodieMergedLogRecordScanner = {
     val fs = FSUtils.getFs(split.tablePath, config)
-    new HoodieMergedLogRecordScanner(
-      fs,
-      split.tablePath,
-      split.logPaths.get.asJava,
-      logSchema,
-      split.latestCommit,
-      split.maxCompactionMemoryInBytes,
-      
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-        
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean).getOrElse(false),
-      false,
-      config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
-        HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
-      config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
-        HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+    HoodieMergedLogRecordScanner.newBuilder()
+        .withFileSystem(fs)
+        .withBasePath(split.tablePath)
+        .withLogFilePaths(split.logPaths.get.asJava)
+        .withReaderSchema(logSchema)
+        .withLatestInstantTime(split.latestCommit)
+        .withReadBlocksLazily(
+          
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
+            
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
+              .getOrElse(false))
+        .withReverseReader(false)
+        .withBufferSize(
+          config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
+            HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
+        .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
+        .withSpillableMapBasePath(
+          config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
+            HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+        .build()
   }
 }

Reply via email to