This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f8950f01983 [HUDI-7316] AbstractHoodieLogRecordReader should accept 
HoodieTableMetaClient in order to reduce occurences of executors making file 
listing calls when reloading active timeline (#10540)
f8950f01983 is described below

commit f8950f01983c5ee180288ccd2a3396b02db4b1e1
Author: Krishen <[email protected]>
AuthorDate: Tue Jan 23 19:58:20 2024 -0800

    [HUDI-7316] AbstractHoodieLogRecordReader should accept 
HoodieTableMetaClient in order to reduce occurences of executors making file 
listing calls when reloading active timeline (#10540)
    
    Summary:
    Currently some implementors of  AbstractHoodieLogRecordReader create a 
HoodieTableMetaClient on construction, which implicitly reloads active 
timeline, causing a `listStatus` HDFS call. Since these are created in 
executors, each of the hundreds to thousands of executors will make a 
`listStatus` call at the same time during a stage. To avoid these redundant 
calls to HDFS NameNode, AbstractHoodieLogRecordReader and the following 
implementations have been updated to allow an existing Hoo [...]
    - HoodieUnMergedLogRecordScanner
    - HoodieMergedLogRecordScanner
    - HoodieMetadataMergedLogRecordReader
    As long as the caller passed in a HoodieTableMetaClient with active 
timeline already loaded, and the implementation doesn't need to re-load the 
timeline (such as in order to get a more "fresh" timeline) than `listStatus` 
calls can be avoided in the executor, without causing the logic to be incorrect.
    
    Co-authored-by: Krishen Bhan <[email protected]>
---
 .../java/org/apache/hudi/io/HoodieMergedReadHandle.java |  1 +
 .../hudi/table/action/compact/HoodieCompactor.java      |  1 +
 .../generators/HoodieLogCompactionPlanGenerator.java    |  1 +
 .../run/strategy/MultipleSparkJobExecutionStrategy.java |  1 +
 .../TestHoodieClientOnMergeOnReadStorage.java           |  2 ++
 .../common/table/log/AbstractHoodieLogRecordReader.java |  9 +++++++--
 .../common/table/log/HoodieMergedLogRecordScanner.java  | 17 ++++++++++++++---
 .../table/log/HoodieUnMergedLogRecordScanner.java       | 17 ++++++++++++++---
 .../apache/hudi/metadata/HoodieBackedTableMetadata.java |  1 +
 .../hudi/metadata/HoodieMetadataLogRecordReader.java    |  6 ++++++
 .../apache/hudi/metadata/HoodieTableMetadataUtil.java   |  1 +
 11 files changed, 49 insertions(+), 8 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
index dbadf4fd798..63b576cf454 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
@@ -132,6 +132,7 @@ public class HoodieMergedReadHandle<T, I, K, O> extends 
HoodieReadHandle<T, I, K
         .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
         
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
         .withRecordMerger(config.getRecordMerger())
+        .withTableMetaClient(hoodieTable.getMetaClient())
         .build();
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index 906ea6473a4..d1d69be16dc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -207,6 +207,7 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
         
.withOptimizedLogBlocksScan(executionHelper.enableOptimizedLogBlockScan(config))
         .withRecordMerger(config.getRecordMerger())
         .withInstantRange(instantRange)
+        .withTableMetaClient(metaClient)
         .build();
 
     Option<HoodieBaseFile> oldDataFileOpt =
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
index 2b704726580..7cc0e338bcf 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
@@ -98,6 +98,7 @@ public class HoodieLogCompactionPlanGenerator<T extends 
HoodieRecordPayload, I,
         .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
         .withOptimizedLogBlocksScan(true)
         .withRecordMerger(writeConfig.getRecordMerger())
+        .withTableMetaClient(metaClient)
         .build();
     scanner.scan(true);
     int totalBlocks = scanner.getCurrentInstantLogBlocks().size();
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 20fc7e9f479..0dd7f5aa124 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -321,6 +321,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
               
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
               
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
               .withRecordMerger(config.getRecordMerger())
+              .withTableMetaClient(table.getMetaClient())
               .build();
 
           Option<HoodieFileReader> baseFileReader = 
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index bc12e6771b1..8195a6b4084 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -444,6 +444,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends 
HoodieClientTestBase {
             .withLatestInstantTime(instant)
             .withBufferSize(config.getMaxDFSStreamBufferSize())
             .withOptimizedLogBlocksScan(true)
+            .withTableMetaClient(metaClient)
             .build();
         scanner.scan(true);
         List<String> prevInstants = scanner.getValidBlockInstants();
@@ -457,6 +458,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends 
HoodieClientTestBase {
             .withLatestInstantTime(currentInstant)
             .withBufferSize(config.getMaxDFSStreamBufferSize())
             .withOptimizedLogBlocksScan(true)
+            .withTableMetaClient(table.getMetaClient())
             .build();
         scanner2.scan(true);
         List<String> currentInstants = scanner2.getValidBlockInstants();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 4fc7996c873..49f7ac7ca70 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -157,10 +157,11 @@ public abstract class AbstractHoodieLogRecordReader {
                                           InternalSchema internalSchema,
                                           Option<String> keyFieldOverride,
                                           boolean enableOptimizedLogBlocksScan,
-                                          HoodieRecordMerger recordMerger) {
+                                          HoodieRecordMerger recordMerger,
+                                          Option<HoodieTableMetaClient> 
hoodieTableMetaClientOption) {
     this.readerSchema = readerSchema;
     this.latestInstantTime = latestInstantTime;
-    this.hoodieTableMetaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
+    this.hoodieTableMetaClient = hoodieTableMetaClientOption.orElseGet(() -> 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build());
     // load class from the payload fully qualified class name
     HoodieTableConfig tableConfig = 
this.hoodieTableMetaClient.getTableConfig();
     this.payloadClassFQN = tableConfig.getPayloadClass();
@@ -1035,6 +1036,10 @@ public abstract class AbstractHoodieLogRecordReader {
       throw new UnsupportedOperationException();
     }
 
+    public Builder withTableMetaClient(HoodieTableMetaClient 
hoodieTableMetaClient) {
+      throw new UnsupportedOperationException();
+    }
+
     public abstract AbstractHoodieLogRecordReader build();
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 85008a03e13..9062641f1a7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -26,6 +26,7 @@ import 
org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
@@ -100,9 +101,11 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
                                        Option<String> partitionName,
                                        InternalSchema internalSchema,
                                        Option<String> keyFieldOverride,
-                                       boolean enableOptimizedLogBlocksScan, 
HoodieRecordMerger recordMerger) {
+                                       boolean enableOptimizedLogBlocksScan, 
HoodieRecordMerger recordMerger,
+                                      Option<HoodieTableMetaClient> 
hoodieTableMetaClientOption) {
     super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, 
readBlocksLazily, reverseReader, bufferSize,
-        instantRange, withOperationField, forceFullScan, partitionName, 
internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger);
+        instantRange, withOperationField, forceFullScan, partitionName, 
internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger,
+        hoodieTableMetaClientOption);
     try {
       this.maxMemorySizeInBytes = maxMemorySizeInBytes;
       // Store merged records for all versions for this log file, set the 
in-memory footprint to maxInMemoryMapSize
@@ -336,6 +339,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
     private boolean forceFullScan = true;
     private boolean enableOptimizedLogBlocksScan = false;
     private HoodieRecordMerger recordMerger = 
HoodiePreCombineAvroRecordMerger.INSTANCE;
+    protected HoodieTableMetaClient hoodieTableMetaClient;
 
     @Override
     public Builder withFileSystem(FileSystem fs) {
@@ -452,6 +456,12 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
       return this;
     }
 
+    @Override
+    public Builder withTableMetaClient(HoodieTableMetaClient 
hoodieTableMetaClient) {
+      this.hoodieTableMetaClient = hoodieTableMetaClient;
+      return this;
+    }
+
     @Override
     public HoodieMergedLogRecordScanner build() {
       if (this.partitionName == null && 
CollectionUtils.nonEmpty(this.logFilePaths)) {
@@ -463,7 +473,8 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
           latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, 
reverseReader,
           bufferSize, spillableMapBasePath, instantRange,
           diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, 
forceFullScan,
-          Option.ofNullable(partitionName), internalSchema, 
Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, 
recordMerger);
+          Option.ofNullable(partitionName), internalSchema, 
Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, recordMerger,
+          Option.ofNullable(hoodieTableMetaClient));
     }
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index f62ec0febd5..4d870618e7b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.DeleteRecord;
 import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
@@ -44,9 +45,11 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordReade
   private HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, 
List<String> logFilePaths, Schema readerSchema,
                                          String latestInstantTime, boolean 
readBlocksLazily, boolean reverseReader, int bufferSize,
                                          LogRecordScannerCallback callback, 
Option<InstantRange> instantRange, InternalSchema internalSchema,
-                                         boolean enableOptimizedLogBlocksScan, 
HoodieRecordMerger recordMerger) {
+                                         boolean enableOptimizedLogBlocksScan, 
HoodieRecordMerger recordMerger,
+                                         Option<HoodieTableMetaClient> 
hoodieTableMetaClientOption) {
     super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, 
readBlocksLazily, reverseReader, bufferSize, instantRange,
-        false, true, Option.empty(), internalSchema, Option.empty(), 
enableOptimizedLogBlocksScan, recordMerger);
+        false, true, Option.empty(), internalSchema, Option.empty(), 
enableOptimizedLogBlocksScan, recordMerger,
+         hoodieTableMetaClientOption);
     this.callback = callback;
   }
 
@@ -109,6 +112,7 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordReade
     private LogRecordScannerCallback callback;
     private boolean enableOptimizedLogBlocksScan;
     private HoodieRecordMerger recordMerger = 
HoodiePreCombineAvroRecordMerger.INSTANCE;
+    private HoodieTableMetaClient hoodieTableMetaClient;
 
     public Builder withFileSystem(FileSystem fs) {
       this.fs = fs;
@@ -180,13 +184,20 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordReade
       return this;
     }
 
+    @Override
+    public HoodieUnMergedLogRecordScanner.Builder withTableMetaClient(
+        HoodieTableMetaClient hoodieTableMetaClient) {
+      this.hoodieTableMetaClient = hoodieTableMetaClient;
+      return this;
+    }
+
     @Override
     public HoodieUnMergedLogRecordScanner build() {
       ValidationUtils.checkArgument(recordMerger != null);
 
       return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths, 
readerSchema,
           latestInstantTime, readBlocksLazily, reverseReader, bufferSize, 
callback, instantRange,
-          internalSchema, enableOptimizedLogBlocksScan, recordMerger);
+          internalSchema, enableOptimizedLogBlocksScan, recordMerger, 
Option.ofNullable(hoodieTableMetaClient));
     }
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 57cc08ab59f..105bf77b909 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -498,6 +498,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
         .enableFullScan(allowFullScan)
         .withPartition(partitionName)
         
.withEnableOptimizedLogBlocksScan(metadataConfig.doEnableOptimizedLogBlocksScan())
+        .withTableMetaClient(metadataMetaClient)
         .build();
 
     Long logScannerOpenMs = timer.endTimer();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
index fc071790e47..900260b9413 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
@@ -22,6 +22,7 @@ import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.Option;
@@ -215,6 +216,11 @@ public class HoodieMetadataLogRecordReader implements 
Closeable {
       return this;
     }
 
+    public Builder withTableMetaClient(HoodieTableMetaClient 
hoodieTableMetaClient) {
+      scannerBuilder.withTableMetaClient(hoodieTableMetaClient);
+      return this;
+    }
+
     public HoodieMetadataLogRecordReader build() {
       return new HoodieMetadataLogRecordReader(scannerBuilder.build());
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 076aac0454f..5a168892b15 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1837,6 +1837,7 @@ public class HoodieTableMetadataUtil {
                 engineType,
                 Collections.emptyList(), // TODO: support different merger 
classes, which is currently only known to write config
                 metaClient.getTableConfig().getRecordMergerStrategy()))
+            .withTableMetaClient(metaClient)
             .build();
         ClosableIterator<String> recordKeyIterator = 
ClosableIterator.wrap(mergedLogRecordScanner.getRecords().keySet().iterator());
         return new ClosableIterator<HoodieRecord>() {

Reply via email to