This is an automated email from the ASF dual-hosted git repository.

jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6768ee1e7057 [HUDI-9174] remove use of log scanner from cli (#13508)
6768ee1e7057 is described below

commit 6768ee1e7057e0117efeba2495380f428c8aee00
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Jul 18 07:54:38 2025 -0400

    [HUDI-9174] remove use of log scanner from cli (#13508)
---
 .../hudi/cli/commands/HoodieLogFileCommand.java    | 92 ++++++++++++++--------
 1 file changed, 59 insertions(+), 33 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index a33e03472bce..413c3744459b 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -18,15 +18,16 @@
 
 package org.apache.hudi.cli.commands;
 
+import org.apache.hudi.avro.HoodieAvroReaderContext;
 import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.HoodiePrintHelper;
 import org.apache.hudi.cli.HoodieTableHeaderFields;
 import org.apache.hudi.cli.TableHeader;
-import org.apache.hudi.common.config.HoodieCommonConfig;
-import org.apache.hudi.common.config.HoodieMemoryConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
@@ -34,13 +35,16 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
-import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.log.block.HoodieCorruptBlock;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.FooterMetadataType;
 import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -61,13 +65,16 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import scala.Tuple2;
 import scala.Tuple3;
 
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 
 /**
@@ -189,7 +196,7 @@ public class HoodieLogFileCommand {
 
   @ShellMethod(key = "show logfile records", value = "Read records from log 
files")
   public String showLogFileRecords(
-      @ShellOption(value = {"--limit"}, help = "Limit commits",
+      @ShellOption(value = {"--limit"}, help = "Limit number of records",
           defaultValue = "10") final Integer limit,
       @ShellOption(value = "--logFilePathPattern",
           help = "Fully qualified paths for the log files") final String 
logFilePathPattern,
@@ -222,34 +229,33 @@ public class HoodieLogFileCommand {
     }
     Objects.requireNonNull(readerSchema);
     List<IndexedRecord> allRecords = new ArrayList<>();
-
     if (shouldMerge) {
       System.out.println("===========================> MERGING RECORDS 
<===================");
-      HoodieMergedLogRecordScanner scanner =
-          HoodieMergedLogRecordScanner.newBuilder()
-              .withStorage(storage)
-              .withBasePath(client.getBasePath())
-              .withLogFilePaths(logFilePaths)
-              .withReaderSchema(readerSchema)
-              .withLatestInstantTime(
-                  client.getActiveTimeline()
-                      
.getCommitAndReplaceTimeline().lastInstant().get().requestedTime())
-              .withReverseReader(
-                  Boolean.parseBoolean(
-                      
HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue()))
-              
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
-              .withMaxMemorySizeInBytes(
-                  
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
-              
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
-              
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
-              
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-              
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
-              .build();
-      for (HoodieRecord hoodieRecord : scanner) {
-        Option<HoodieAvroIndexedRecord> record = 
hoodieRecord.toIndexedRecord(readerSchema, new Properties());
-        if (allRecords.size() < limit) {
-          allRecords.add(record.get().getData());
-        }
+      HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(
+          HoodieCLI.getTableMetaClient().getStorage().getConf(),
+          HoodieCLI.getTableMetaClient().getTableConfig(),
+          Option.empty(),
+          Option.empty());
+      StoragePath firstLogFile = new StoragePath(logFilePaths.get(0));
+      HoodieFileGroupId fileGroupId = new 
HoodieFileGroupId(FSUtils.getRelativePartitionPath(HoodieCLI.getTableMetaClient().getBasePath(),
 firstLogFile), FSUtils.getFileIdFromLogPath(firstLogFile));
+      FileSlice fileSlice = new FileSlice(fileGroupId, 
HoodieTimeline.INIT_INSTANT_TS, null, logFilePaths.stream()
+          .map(l -> new HoodieLogFile(new 
StoragePath(l))).collect(Collectors.toList()));
+      try (HoodieFileGroupReader<IndexedRecord> fileGroupReader = 
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+          .withReaderContext(readerContext)
+          .withHoodieTableMetaClient(HoodieCLI.getTableMetaClient())
+          .withFileSlice(fileSlice)
+          .withDataSchema(readerSchema)
+          .withRequestedSchema(readerSchema)
+          
.withLatestCommitTime(client.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant().map(HoodieInstant::requestedTime).orElse(HoodieInstantTimeGenerator.getCurrentInstantTimeStr()))
+          .withProps(buildFileGroupReaderProperties())
+          .withShouldUseRecordPosition(false)
+          .build();
+           ClosableIterator<IndexedRecord> recordIterator = 
fileGroupReader.getClosableIterator()) {
+        recordIterator.forEachRemaining(record -> {
+          if (allRecords.size() < limit) {
+            allRecords.add(record);
+          }
+        });
       }
     } else {
       for (String logFile : logFilePaths) {
@@ -287,4 +293,24 @@ public class HoodieLogFileCommand {
     }
     return HoodiePrintHelper.print(new String[] 
{HoodieTableHeaderFields.HEADER_RECORDS}, rows);
   }
+
+  /**
+   * Derive necessary properties for FG reader.
+   */
+  private TypedProperties buildFileGroupReaderProperties() {
+    TypedProperties props = new TypedProperties();
+    props.setProperty(
+        MAX_MEMORY_FOR_MERGE.key(),
+        Long.toString(MAX_MEMORY_FOR_MERGE.defaultValue()));
+    props.setProperty(
+        SPILLABLE_MAP_BASE_PATH.key(),
+        FileIOUtils.getDefaultSpillableMapBasePath());
+    props.setProperty(
+        SPILLABLE_DISK_MAP_TYPE.key(),
+        SPILLABLE_DISK_MAP_TYPE.defaultValue().name());
+    props.setProperty(
+        DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
+        Boolean.toString(DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()));
+    return props;
+  }
 }

Reply via email to