the-other-tim-brown commented on code in PR #13508:
URL: https://github.com/apache/hudi/pull/13508#discussion_r2195051965
##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java:
##########
@@ -222,36 +226,38 @@ storage, new StoragePath(logFilePathPattern)).stream()
}
Objects.requireNonNull(readerSchema);
List<IndexedRecord> allRecords = new ArrayList<>();
-
if (shouldMerge) {
System.out.println("===========================> MERGING RECORDS
<===================");
- HoodieMergedLogRecordScanner scanner =
- HoodieMergedLogRecordScanner.newBuilder()
- .withStorage(storage)
- .withBasePath(client.getBasePath())
- .withLogFilePaths(logFilePaths)
- .withReaderSchema(readerSchema)
- .withLatestInstantTime(
- client.getActiveTimeline()
-
.getCommitAndReplaceTimeline().lastInstant().get().requestedTime())
- .withReverseReader(
- Boolean.parseBoolean(
-
HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue()))
-
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
- .withMaxMemorySizeInBytes(
-
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
-
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
-
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
-
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
- .build();
- for (HoodieRecord hoodieRecord : scanner) {
- Option<HoodieAvroIndexedRecord> record =
hoodieRecord.toIndexedRecord(readerSchema, new Properties());
- if (allRecords.size() < limit) {
- allRecords.add(record.get().getData());
- }
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(
+ HoodieCLI.getTableMetaClient().getStorage().getConf(),
+ HoodieCLI.getTableMetaClient().getTableConfig(),
+ Option.empty(),
+ Option.empty());
+ StoragePath firstLogFile = new StoragePath(logFilePaths.get(0));
+ HoodieFileGroupId fileGroupId = new
HoodieFileGroupId(FSUtils.getRelativePartitionPath(HoodieCLI.getTableMetaClient().getBasePath(),
firstLogFile), FSUtils.getFileIdFromLogPath(firstLogFile));
+ FileSlice fileSlice = new FileSlice(fileGroupId, "000", null,
logFilePaths.stream()
+ .map(l -> new HoodieLogFile(new
StoragePath(l))).collect(Collectors.toList()));
+ try (HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+ .withReaderContext(readerContext)
+ .withHoodieTableMetaClient(HoodieCLI.getTableMetaClient())
+ .withFileSlice(fileSlice)
+ .withDataSchema(readerSchema)
+ .withRequestedSchema(readerSchema)
+ .withLatestCommitTime("99999999999999")
Review Comment:
Should we have this line up with the previous code that used
`client.getActiveTimeline().getCommitAndReplaceTimeline().lastInstant().get().requestedTime())`?
##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java:
##########
@@ -222,36 +226,38 @@ storage, new StoragePath(logFilePathPattern)).stream()
}
Objects.requireNonNull(readerSchema);
List<IndexedRecord> allRecords = new ArrayList<>();
-
if (shouldMerge) {
System.out.println("===========================> MERGING RECORDS
<===================");
- HoodieMergedLogRecordScanner scanner =
- HoodieMergedLogRecordScanner.newBuilder()
- .withStorage(storage)
- .withBasePath(client.getBasePath())
- .withLogFilePaths(logFilePaths)
- .withReaderSchema(readerSchema)
- .withLatestInstantTime(
- client.getActiveTimeline()
-
.getCommitAndReplaceTimeline().lastInstant().get().requestedTime())
- .withReverseReader(
- Boolean.parseBoolean(
-
HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue()))
-
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
- .withMaxMemorySizeInBytes(
-
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
-
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
-
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
-
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
- .build();
- for (HoodieRecord hoodieRecord : scanner) {
- Option<HoodieAvroIndexedRecord> record =
hoodieRecord.toIndexedRecord(readerSchema, new Properties());
- if (allRecords.size() < limit) {
- allRecords.add(record.get().getData());
- }
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(
+ HoodieCLI.getTableMetaClient().getStorage().getConf(),
+ HoodieCLI.getTableMetaClient().getTableConfig(),
+ Option.empty(),
+ Option.empty());
+ StoragePath firstLogFile = new StoragePath(logFilePaths.get(0));
+ HoodieFileGroupId fileGroupId = new
HoodieFileGroupId(FSUtils.getRelativePartitionPath(HoodieCLI.getTableMetaClient().getBasePath(),
firstLogFile), FSUtils.getFileIdFromLogPath(firstLogFile));
+ FileSlice fileSlice = new FileSlice(fileGroupId, "000", null,
logFilePaths.stream()
+ .map(l -> new HoodieLogFile(new
StoragePath(l))).collect(Collectors.toList()));
+ try (HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+ .withReaderContext(readerContext)
+ .withHoodieTableMetaClient(HoodieCLI.getTableMetaClient())
+ .withFileSlice(fileSlice)
+ .withDataSchema(readerSchema)
+ .withRequestedSchema(readerSchema)
+ .withLatestCommitTime("99999999999999")
+ .withProps(buildFileGroupReaderProperties())
+ .withStart(0)
+ .withLength(Long.MAX_VALUE)
Review Comment:
The start and length will default to these values so you don't need them in
the builder.
##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java:
##########
@@ -222,36 +226,38 @@ storage, new StoragePath(logFilePathPattern)).stream()
}
Objects.requireNonNull(readerSchema);
List<IndexedRecord> allRecords = new ArrayList<>();
-
if (shouldMerge) {
System.out.println("===========================> MERGING RECORDS
<===================");
- HoodieMergedLogRecordScanner scanner =
- HoodieMergedLogRecordScanner.newBuilder()
- .withStorage(storage)
- .withBasePath(client.getBasePath())
- .withLogFilePaths(logFilePaths)
- .withReaderSchema(readerSchema)
- .withLatestInstantTime(
- client.getActiveTimeline()
-
.getCommitAndReplaceTimeline().lastInstant().get().requestedTime())
- .withReverseReader(
- Boolean.parseBoolean(
-
HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue()))
-
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
- .withMaxMemorySizeInBytes(
-
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
-
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
-
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
-
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
- .build();
- for (HoodieRecord hoodieRecord : scanner) {
- Option<HoodieAvroIndexedRecord> record =
hoodieRecord.toIndexedRecord(readerSchema, new Properties());
- if (allRecords.size() < limit) {
- allRecords.add(record.get().getData());
- }
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(
+ HoodieCLI.getTableMetaClient().getStorage().getConf(),
+ HoodieCLI.getTableMetaClient().getTableConfig(),
+ Option.empty(),
+ Option.empty());
+ StoragePath firstLogFile = new StoragePath(logFilePaths.get(0));
+ HoodieFileGroupId fileGroupId = new
HoodieFileGroupId(FSUtils.getRelativePartitionPath(HoodieCLI.getTableMetaClient().getBasePath(),
firstLogFile), FSUtils.getFileIdFromLogPath(firstLogFile));
+ FileSlice fileSlice = new FileSlice(fileGroupId, "000", null,
logFilePaths.stream()
Review Comment:
Is there a constant for some default start commit time to use here?
##########
hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java:
##########
@@ -222,36 +226,38 @@ storage, new StoragePath(logFilePathPattern)).stream()
}
Objects.requireNonNull(readerSchema);
List<IndexedRecord> allRecords = new ArrayList<>();
-
if (shouldMerge) {
System.out.println("===========================> MERGING RECORDS
<===================");
- HoodieMergedLogRecordScanner scanner =
- HoodieMergedLogRecordScanner.newBuilder()
- .withStorage(storage)
- .withBasePath(client.getBasePath())
- .withLogFilePaths(logFilePaths)
- .withReaderSchema(readerSchema)
- .withLatestInstantTime(
- client.getActiveTimeline()
-
.getCommitAndReplaceTimeline().lastInstant().get().requestedTime())
- .withReverseReader(
- Boolean.parseBoolean(
-
HoodieReaderConfig.COMPACTION_REVERSE_LOG_READ_ENABLE.defaultValue()))
-
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
- .withMaxMemorySizeInBytes(
-
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
-
.withSpillableMapBasePath(FileIOUtils.getDefaultSpillableMapBasePath())
-
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
-
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
-
.withOptimizedLogBlocksScan(Boolean.parseBoolean(HoodieReaderConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.defaultValue()))
- .build();
- for (HoodieRecord hoodieRecord : scanner) {
- Option<HoodieAvroIndexedRecord> record =
hoodieRecord.toIndexedRecord(readerSchema, new Properties());
- if (allRecords.size() < limit) {
- allRecords.add(record.get().getData());
- }
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(
+ HoodieCLI.getTableMetaClient().getStorage().getConf(),
+ HoodieCLI.getTableMetaClient().getTableConfig(),
+ Option.empty(),
+ Option.empty());
+ StoragePath firstLogFile = new StoragePath(logFilePaths.get(0));
+ HoodieFileGroupId fileGroupId = new
HoodieFileGroupId(FSUtils.getRelativePartitionPath(HoodieCLI.getTableMetaClient().getBasePath(),
firstLogFile), FSUtils.getFileIdFromLogPath(firstLogFile));
+ FileSlice fileSlice = new FileSlice(fileGroupId, "000", null,
logFilePaths.stream()
+ .map(l -> new HoodieLogFile(new
StoragePath(l))).collect(Collectors.toList()));
+ try (HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+ .withReaderContext(readerContext)
+ .withHoodieTableMetaClient(HoodieCLI.getTableMetaClient())
+ .withFileSlice(fileSlice)
+ .withDataSchema(readerSchema)
+ .withRequestedSchema(readerSchema)
+ .withLatestCommitTime("99999999999999")
+ .withProps(buildFileGroupReaderProperties())
+ .withStart(0)
+ .withLength(Long.MAX_VALUE)
+ .withShouldUseRecordPosition(false)
+ .build()) {
+
+ fileGroupReader.getClosableIterator().forEachRemaining(record -> {
Review Comment:
Can you use `try-with-resources` here to ensure the resources are closed?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]