This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b1af4d4c546 [HUDI-9514] Scanner resources not properly closed in
HoodieCompact (#13415)
b1af4d4c546 is described below
commit b1af4d4c54636117348b6760b27ae3b7133cebb0
Author: senthh <[email protected]>
AuthorDate: Fri Jun 13 16:12:17 2025 +0530
[HUDI-9514] Scanner resources not properly closed in HoodieCompact (#13415)
---
.../hudi/table/action/compact/HoodieCompactor.java | 113 ++++++++++-----------
1 file changed, 56 insertions(+), 57 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index c768595096a..cbb81eb1068 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -209,38 +209,37 @@ public abstract class HoodieCompactor<T, I, K, O>
implements Serializable {
new StoragePath(FSUtils.constructAbsolutePath(
metaClient.getBasePath(), operation.getPartitionPath()),
p).toString())
.collect(toList());
- HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
- .withStorage(storage)
- .withBasePath(metaClient.getBasePath())
- .withLogFilePaths(logFiles)
- .withReaderSchema(readerSchema)
-
.withLatestInstantTime(executionHelper.instantTimeToUseForScanning(instantTime,
maxInstantTime))
- .withInstantRange(instantRange)
-
.withInternalSchema(internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema()))
- .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
- .withReverseReader(config.getCompactionReverseLogReadEnabled())
- .withBufferSize(config.getMaxDFSStreamBufferSize())
- .withSpillableMapBasePath(config.getSpillableMapBasePath())
- .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
-
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
- .withOperationField(config.allowOperationMetadataField())
- .withPartition(operation.getPartitionPath())
-
.withOptimizedLogBlocksScan(executionHelper.enableOptimizedLogBlockScan(config))
- .withRecordMerger(config.getRecordMerger())
- .withTableMetaClient(metaClient)
- .build();
+ try (HoodieMergedLogRecordScanner scanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withStorage(storage)
+ .withBasePath(metaClient.getBasePath())
+ .withLogFilePaths(logFiles)
+ .withReaderSchema(readerSchema)
+
.withLatestInstantTime(executionHelper.instantTimeToUseForScanning(instantTime,
maxInstantTime))
+ .withInstantRange(instantRange)
+
.withInternalSchema(internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema()))
+ .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
+ .withReverseReader(config.getCompactionReverseLogReadEnabled())
+ .withBufferSize(config.getMaxDFSStreamBufferSize())
+ .withSpillableMapBasePath(config.getSpillableMapBasePath())
+ .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
+
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
+ .withOperationField(config.allowOperationMetadataField())
+ .withPartition(operation.getPartitionPath())
+
.withOptimizedLogBlocksScan(executionHelper.enableOptimizedLogBlockScan(config))
+ .withRecordMerger(config.getRecordMerger())
+ .withTableMetaClient(metaClient)
+ .build()) {
- Option<HoodieBaseFile> oldDataFileOpt =
- operation.getBaseFile(metaClient.getBasePath().toString(),
operation.getPartitionPath());
+ Option<HoodieBaseFile> oldDataFileOpt =
+ operation.getBaseFile(metaClient.getBasePath().toString(),
operation.getPartitionPath());
- // Considering following scenario: if all log blocks in this fileSlice is
rollback, it returns an empty scanner.
- // But in this case, we need to give it a base file. Otherwise, it will
lose base file in following fileSlice.
- if (!scanner.iterator().hasNext()) {
- if (!oldDataFileOpt.isPresent()) {
- scanner.close();
- return new ArrayList<>();
- } else {
- // TODO: we may directly rename original parquet file if there is not
evolution/devolution of schema
+ // Considering following scenario: if all log blocks in this fileSlice
is rollback, it returns an empty scanner.
+ // But in this case, we need to give it a base file. Otherwise, it will
lose base file in following fileSlice.
+ if (!scanner.iterator().hasNext()) {
+ if (!oldDataFileOpt.isPresent()) {
+ return new ArrayList<>();
+ } else {
+ // TODO: we may directly rename original parquet file if there is
not evolution/devolution of schema
/*
TaskContextSupplier taskContextSupplier =
hoodieCopyOnWriteTable.getTaskContextSupplier();
String newFileName = FSUtils.makeDataFileName(instantTime,
@@ -250,36 +249,36 @@ public abstract class HoodieCompactor<T, I, K, O>
implements Serializable {
Path newFilePath = new Path(oldFilePath.getParent(), newFileName);
FileUtil.copy(fs,oldFilePath, fs, newFilePath, false, fs.getConf());
*/
+ }
}
- }
- // Compacting is very similar to applying updates to existing file
- Iterator<List<WriteStatus>> result;
- result = executionHelper.writeFileAndGetWriteStats(compactionHandler,
operation, instantTime, scanner, oldDataFileOpt);
- scanner.close();
+ // Compacting is very similar to applying updates to existing file
+ Iterator<List<WriteStatus>> result;
+ result = executionHelper.writeFileAndGetWriteStats(compactionHandler,
operation, instantTime, scanner, oldDataFileOpt);
- Iterable<List<WriteStatus>> resultIterable = () -> result;
- return StreamSupport.stream(resultIterable.spliterator(),
false).flatMap(Collection::stream).peek(s -> {
- final HoodieWriteStat stat = s.getStat();
- stat.setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
- stat.setTotalLogFilesCompacted(scanner.getTotalLogFiles());
- stat.setTotalLogRecords(scanner.getTotalLogRecords());
- stat.setPartitionPath(operation.getPartitionPath());
- stat
-
.setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue());
- stat.setTotalLogBlocks(scanner.getTotalLogBlocks());
- stat.setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
- stat.setTotalRollbackBlocks(scanner.getTotalRollbacks());
- RuntimeStats runtimeStats = new RuntimeStats();
- // scan time has to be obtained from scanner.
-
runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
- // create and upsert time are obtained from the create or merge handle.
- if (stat.getRuntimeStats() != null) {
-
runtimeStats.setTotalCreateTime(stat.getRuntimeStats().getTotalCreateTime());
-
runtimeStats.setTotalUpsertTime(stat.getRuntimeStats().getTotalUpsertTime());
- }
- stat.setRuntimeStats(runtimeStats);
- }).collect(toList());
+ Iterable<List<WriteStatus>> resultIterable = () -> result;
+ return StreamSupport.stream(resultIterable.spliterator(),
false).flatMap(Collection::stream).peek(s -> {
+ final HoodieWriteStat stat = s.getStat();
+
stat.setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
+ stat.setTotalLogFilesCompacted(scanner.getTotalLogFiles());
+ stat.setTotalLogRecords(scanner.getTotalLogRecords());
+ stat.setPartitionPath(operation.getPartitionPath());
+ stat
+
.setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue());
+ stat.setTotalLogBlocks(scanner.getTotalLogBlocks());
+ stat.setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
+ stat.setTotalRollbackBlocks(scanner.getTotalRollbacks());
+ RuntimeStats runtimeStats = new RuntimeStats();
+ // scan time has to be obtained from scanner.
+
runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
+ // create and upsert time are obtained from the create or merge handle.
+ if (stat.getRuntimeStats() != null) {
+
runtimeStats.setTotalCreateTime(stat.getRuntimeStats().getTotalCreateTime());
+
runtimeStats.setTotalUpsertTime(stat.getRuntimeStats().getTotalUpsertTime());
+ }
+ stat.setRuntimeStats(runtimeStats);
+ }).collect(toList());
+ }
}
/**