This is an automated email from the ASF dual-hosted git repository.
jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6768ee1e7057 [HUDI-9174] remove use of log scanner from cli (#13508)
6768ee1e7057 is described below
commit 6768ee1e7057e0117efeba2495380f428c8aee00
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Jul 18 07:54:38 2025 -0400
[HUDI-9174] remove use of log scanner from cli (#13508)
---
.../hudi/cli/commands/HoodieLogFileCommand.java | 92 ++++++++++++++--------
1 file changed, 59 insertions(+), 33 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index a33e03472bce..413c3744459b 100644
---
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -18,15 +18,16 @@
package org.apache.hudi.cli.commands;
+import org.apache.hudi.avro.HoodieAvroReaderContext;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader;
-import org.apache.hudi.common.config.HoodieCommonConfig;
-import org.apache.hudi.common.config.HoodieMemoryConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
@@ -34,13 +35,16 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
-import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.block.HoodieCorruptBlock;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import
org.apache.hudi.common.table.log.block.HoodieLogBlock.FooterMetadataType;
import
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -61,13 +65,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import scala.Tuple2;
import scala.Tuple3;
+import static
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
/**
@@ -189,7 +196,7 @@ public class HoodieLogFileCommand {
@ShellMethod(key = "show logfile records", value = "Read records from log
files")
public String showLogFileRecords(
- @ShellOption(value = {"--limit"}, help = "Limit commits",
+ @ShellOption(value = {"--limit"}, help = "Limit number of records",
defaultValue = "10") final Integer limit,
@ShellOption(value = "--logFilePathPattern",
help = "Fully qualified paths for the log files") final String
logFilePathPattern,
@@ -222,34 +229,33 @@ public class HoodieLogFileCommand {
}
Objects.requireNonNull(readerSchema);
List<IndexedRecord> allRecords = new ArrayList<>();
-
if (shouldMerge) {
System.out.println("===========================> MERGING RECORDS
<===================");
- HoodieMergedLogRecordScanner scanner =
- HoodieMergedLogRecordScanner.newBuilder()
- .withStorage(storage)
- .withBasePath(client.getBasePath())
- .withLogFilePaths(logFilePaths)
- .withReaderSchema(readerSchema)
- .withLatestInstantTime(
- client.getActiveTimeline()
-
.getCommitAndReplaceTimeline().lastInstant().get().requestedTime())
- .withReverseReader(
- Boolean.parseBoolean(
-
HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue()))
-
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
- .withMaxMemorySizeInBytes(
-
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
-
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
-
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
-
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
- .build();
- for (HoodieRecord hoodieRecord : scanner) {
- Option<HoodieAvroIndexedRecord> record =
hoodieRecord.toIndexedRecord(readerSchema, new Properties());
- if (allRecords.size() < limit) {
- allRecords.add(record.get().getData());
- }
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(
+ HoodieCLI.getTableMetaClient().getStorage().getConf(),
+ HoodieCLI.getTableMetaClient().getTableConfig(),
+ Option.empty(),
+ Option.empty());
+ StoragePath firstLogFile = new StoragePath(logFilePaths.get(0));
+ HoodieFileGroupId fileGroupId = new
HoodieFileGroupId(FSUtils.getRelativePartitionPath(HoodieCLI.getTableMetaClient().getBasePath(),
firstLogFile), FSUtils.getFileIdFromLogPath(firstLogFile));
+ FileSlice fileSlice = new FileSlice(fileGroupId,
HoodieTimeline.INIT_INSTANT_TS, null, logFilePaths.stream()
+ .map(l -> new HoodieLogFile(new
StoragePath(l))).collect(Collectors.toList()));
+ try (HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+ .withReaderContext(readerContext)
+ .withHoodieTableMetaClient(HoodieCLI.getTableMetaClient())
+ .withFileSlice(fileSlice)
+ .withDataSchema(readerSchema)
+ .withRequestedSchema(readerSchema)
+
.withLatestCommitTime(client.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant().map(HoodieInstant::requestedTime).orElse(HoodieInstantTimeGenerator.getCurrentInstantTimeStr()))
+ .withProps(buildFileGroupReaderProperties())
+ .withShouldUseRecordPosition(false)
+ .build();
+ ClosableIterator<IndexedRecord> recordIterator =
fileGroupReader.getClosableIterator()) {
+ recordIterator.forEachRemaining(record -> {
+ if (allRecords.size() < limit) {
+ allRecords.add(record);
+ }
+ });
}
} else {
for (String logFile : logFilePaths) {
@@ -287,4 +293,24 @@ public class HoodieLogFileCommand {
}
return HoodiePrintHelper.print(new String[]
{HoodieTableHeaderFields.HEADER_RECORDS}, rows);
}
+
+ /**
+ * Derive necessary properties for FG reader.
+ */
+ private TypedProperties buildFileGroupReaderProperties() {
+ TypedProperties props = new TypedProperties();
+ props.setProperty(
+ MAX_MEMORY_FOR_MERGE.key(),
+ Long.toString(MAX_MEMORY_FOR_MERGE.defaultValue()));
+ props.setProperty(
+ SPILLABLE_MAP_BASE_PATH.key(),
+ FileIOUtils.getDefaultSpillableMapBasePath());
+ props.setProperty(
+ SPILLABLE_DISK_MAP_TYPE.key(),
+ SPILLABLE_DISK_MAP_TYPE.defaultValue().name());
+ props.setProperty(
+ DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
+ Boolean.toString(DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()));
+ return props;
+ }
}