This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b1af4d4c546 [HUDI-9514] Scanner resources not properly closed in 
HoodieCompact (#13415)
b1af4d4c546 is described below

commit b1af4d4c54636117348b6760b27ae3b7133cebb0
Author: senthh <[email protected]>
AuthorDate: Fri Jun 13 16:12:17 2025 +0530

    [HUDI-9514] Scanner resources not properly closed in HoodieCompact (#13415)
---
 .../hudi/table/action/compact/HoodieCompactor.java | 113 ++++++++++-----------
 1 file changed, 56 insertions(+), 57 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index c768595096a..cbb81eb1068 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -209,38 +209,37 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
             new StoragePath(FSUtils.constructAbsolutePath(
                 metaClient.getBasePath(), operation.getPartitionPath()), 
p).toString())
         .collect(toList());
-    HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
-        .withStorage(storage)
-        .withBasePath(metaClient.getBasePath())
-        .withLogFilePaths(logFiles)
-        .withReaderSchema(readerSchema)
-        
.withLatestInstantTime(executionHelper.instantTimeToUseForScanning(instantTime, 
maxInstantTime))
-        .withInstantRange(instantRange)
-        
.withInternalSchema(internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema()))
-        .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
-        .withReverseReader(config.getCompactionReverseLogReadEnabled())
-        .withBufferSize(config.getMaxDFSStreamBufferSize())
-        .withSpillableMapBasePath(config.getSpillableMapBasePath())
-        .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
-        
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
-        .withOperationField(config.allowOperationMetadataField())
-        .withPartition(operation.getPartitionPath())
-        
.withOptimizedLogBlocksScan(executionHelper.enableOptimizedLogBlockScan(config))
-        .withRecordMerger(config.getRecordMerger())
-        .withTableMetaClient(metaClient)
-        .build();
+    try (HoodieMergedLogRecordScanner scanner = 
HoodieMergedLogRecordScanner.newBuilder()
+          .withStorage(storage)
+          .withBasePath(metaClient.getBasePath())
+          .withLogFilePaths(logFiles)
+          .withReaderSchema(readerSchema)
+          
.withLatestInstantTime(executionHelper.instantTimeToUseForScanning(instantTime, 
maxInstantTime))
+          .withInstantRange(instantRange)
+          
.withInternalSchema(internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema()))
+          .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
+          .withReverseReader(config.getCompactionReverseLogReadEnabled())
+          .withBufferSize(config.getMaxDFSStreamBufferSize())
+          .withSpillableMapBasePath(config.getSpillableMapBasePath())
+          .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
+          
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
+          .withOperationField(config.allowOperationMetadataField())
+          .withPartition(operation.getPartitionPath())
+          
.withOptimizedLogBlocksScan(executionHelper.enableOptimizedLogBlockScan(config))
+          .withRecordMerger(config.getRecordMerger())
+          .withTableMetaClient(metaClient)
+          .build()) {
 
-    Option<HoodieBaseFile> oldDataFileOpt =
-        operation.getBaseFile(metaClient.getBasePath().toString(), 
operation.getPartitionPath());
+      Option<HoodieBaseFile> oldDataFileOpt =
+          operation.getBaseFile(metaClient.getBasePath().toString(), 
operation.getPartitionPath());
 
-    // Considering following scenario: if all log blocks in this fileSlice is 
rollback, it returns an empty scanner.
-    // But in this case, we need to give it a base file. Otherwise, it will 
lose base file in following fileSlice.
-    if (!scanner.iterator().hasNext()) {
-      if (!oldDataFileOpt.isPresent()) {
-        scanner.close();
-        return new ArrayList<>();
-      } else {
-        // TODO: we may directly rename original parquet file if there is not 
evolution/devolution of schema
+      // Considering following scenario: if all log blocks in this fileSlice 
is rollback, it returns an empty scanner.
+      // But in this case, we need to give it a base file. Otherwise, it will 
lose base file in following fileSlice.
+      if (!scanner.iterator().hasNext()) {
+        if (!oldDataFileOpt.isPresent()) {
+          return new ArrayList<>();
+        } else {
+          // TODO: we may directly rename original parquet file if there is 
not evolution/devolution of schema
         /*
         TaskContextSupplier taskContextSupplier = 
hoodieCopyOnWriteTable.getTaskContextSupplier();
         String newFileName = FSUtils.makeDataFileName(instantTime,
@@ -250,36 +249,36 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
         Path newFilePath = new Path(oldFilePath.getParent(), newFileName);
         FileUtil.copy(fs,oldFilePath, fs, newFilePath, false, fs.getConf());
         */
+        }
       }
-    }
 
-    // Compacting is very similar to applying updates to existing file
-    Iterator<List<WriteStatus>> result;
-    result = executionHelper.writeFileAndGetWriteStats(compactionHandler, 
operation, instantTime, scanner, oldDataFileOpt);
-    scanner.close();
+      // Compacting is very similar to applying updates to existing file
+      Iterator<List<WriteStatus>> result;
+      result = executionHelper.writeFileAndGetWriteStats(compactionHandler, 
operation, instantTime, scanner, oldDataFileOpt);
 
-    Iterable<List<WriteStatus>> resultIterable = () -> result;
-    return StreamSupport.stream(resultIterable.spliterator(), 
false).flatMap(Collection::stream).peek(s -> {
-      final HoodieWriteStat stat = s.getStat();
-      stat.setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
-      stat.setTotalLogFilesCompacted(scanner.getTotalLogFiles());
-      stat.setTotalLogRecords(scanner.getTotalLogRecords());
-      stat.setPartitionPath(operation.getPartitionPath());
-      stat
-          
.setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue());
-      stat.setTotalLogBlocks(scanner.getTotalLogBlocks());
-      stat.setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
-      stat.setTotalRollbackBlocks(scanner.getTotalRollbacks());
-      RuntimeStats runtimeStats = new RuntimeStats();
-      // scan time has to be obtained from scanner.
-      
runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
-      // create and upsert time are obtained from the create or merge handle.
-      if (stat.getRuntimeStats() != null) {
-        
runtimeStats.setTotalCreateTime(stat.getRuntimeStats().getTotalCreateTime());
-        
runtimeStats.setTotalUpsertTime(stat.getRuntimeStats().getTotalUpsertTime());
-      }
-      stat.setRuntimeStats(runtimeStats);
-    }).collect(toList());
+      Iterable<List<WriteStatus>> resultIterable = () -> result;
+      return StreamSupport.stream(resultIterable.spliterator(), 
false).flatMap(Collection::stream).peek(s -> {
+        final HoodieWriteStat stat = s.getStat();
+        
stat.setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
+        stat.setTotalLogFilesCompacted(scanner.getTotalLogFiles());
+        stat.setTotalLogRecords(scanner.getTotalLogRecords());
+        stat.setPartitionPath(operation.getPartitionPath());
+        stat
+            
.setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue());
+        stat.setTotalLogBlocks(scanner.getTotalLogBlocks());
+        stat.setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
+        stat.setTotalRollbackBlocks(scanner.getTotalRollbacks());
+        RuntimeStats runtimeStats = new RuntimeStats();
+        // scan time has to be obtained from scanner.
+        
runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
+        // create and upsert time are obtained from the create or merge handle.
+        if (stat.getRuntimeStats() != null) {
+          
runtimeStats.setTotalCreateTime(stat.getRuntimeStats().getTotalCreateTime());
+          
runtimeStats.setTotalUpsertTime(stat.getRuntimeStats().getTotalUpsertTime());
+        }
+        stat.setRuntimeStats(runtimeStats);
+      }).collect(toList());
+    }
   }
 
   /**

Reply via email to