This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f8950f01983 [HUDI-7316] AbstractHoodieLogRecordReader should accept
HoodieTableMetaClient in order to reduce occurences of executors making file
listing calls when reloading active timeline (#10540)
f8950f01983 is described below
commit f8950f01983c5ee180288ccd2a3396b02db4b1e1
Author: Krishen <[email protected]>
AuthorDate: Tue Jan 23 19:58:20 2024 -0800
[HUDI-7316] AbstractHoodieLogRecordReader should accept
HoodieTableMetaClient in order to reduce occurences of executors making file
listing calls when reloading active timeline (#10540)
Summary:
Currently some implementors of AbstractHoodieLogRecordReader create a
HoodieTableMetaClient on construction, which implicitly reloads active
timeline, causing a `listStatus` HDFS call. Since these are created in
executors, each of the hundreds to thousands of executors will make a
`listStatus` call at the same time during a stage. To avoid these redundant
calls to HDFS NameNode, AbstractHoodieLogRecordReader and the following
implementations have been updated to allow an existing Hoo [...]
- HoodieUnMergedLogRecordScanner
- HoodieMergedLogRecordScanner
- HoodieMetadataMergedLogRecordReader
As long as the caller passed in a HoodieTableMetaClient with active
timeline already loaded, and the implementation doesn't need to re-load the
timeline (such as in order to get a more "fresh" timeline) than `listStatus`
calls can be avoided in the executor, without causing the logic to be incorrect.
Co-authored-by: Krishen Bhan <[email protected]>
---
.../java/org/apache/hudi/io/HoodieMergedReadHandle.java | 1 +
.../hudi/table/action/compact/HoodieCompactor.java | 1 +
.../generators/HoodieLogCompactionPlanGenerator.java | 1 +
.../run/strategy/MultipleSparkJobExecutionStrategy.java | 1 +
.../TestHoodieClientOnMergeOnReadStorage.java | 2 ++
.../common/table/log/AbstractHoodieLogRecordReader.java | 9 +++++++--
.../common/table/log/HoodieMergedLogRecordScanner.java | 17 ++++++++++++++---
.../table/log/HoodieUnMergedLogRecordScanner.java | 17 ++++++++++++++---
.../apache/hudi/metadata/HoodieBackedTableMetadata.java | 1 +
.../hudi/metadata/HoodieMetadataLogRecordReader.java | 6 ++++++
.../apache/hudi/metadata/HoodieTableMetadataUtil.java | 1 +
11 files changed, 49 insertions(+), 8 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
index dbadf4fd798..63b576cf454 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java
@@ -132,6 +132,7 @@ public class HoodieMergedReadHandle<T, I, K, O> extends
HoodieReadHandle<T, I, K
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withRecordMerger(config.getRecordMerger())
+ .withTableMetaClient(hoodieTable.getMetaClient())
.build();
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index 906ea6473a4..d1d69be16dc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -207,6 +207,7 @@ public abstract class HoodieCompactor<T, I, K, O>
implements Serializable {
.withOptimizedLogBlocksScan(executionHelper.enableOptimizedLogBlockScan(config))
.withRecordMerger(config.getRecordMerger())
.withInstantRange(instantRange)
+ .withTableMetaClient(metaClient)
.build();
Option<HoodieBaseFile> oldDataFileOpt =
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
index 2b704726580..7cc0e338bcf 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
@@ -98,6 +98,7 @@ public class HoodieLogCompactionPlanGenerator<T extends
HoodieRecordPayload, I,
.withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
.withOptimizedLogBlocksScan(true)
.withRecordMerger(writeConfig.getRecordMerger())
+ .withTableMetaClient(metaClient)
.build();
scanner.scan(true);
int totalBlocks = scanner.getCurrentInstantLogBlocks().size();
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 20fc7e9f479..0dd7f5aa124 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -321,6 +321,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withRecordMerger(config.getRecordMerger())
+ .withTableMetaClient(table.getMetaClient())
.build();
Option<HoodieFileReader> baseFileReader =
StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index bc12e6771b1..8195a6b4084 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -444,6 +444,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
.withLatestInstantTime(instant)
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withOptimizedLogBlocksScan(true)
+ .withTableMetaClient(metaClient)
.build();
scanner.scan(true);
List<String> prevInstants = scanner.getValidBlockInstants();
@@ -457,6 +458,7 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
.withLatestInstantTime(currentInstant)
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withOptimizedLogBlocksScan(true)
+ .withTableMetaClient(table.getMetaClient())
.build();
scanner2.scan(true);
List<String> currentInstants = scanner2.getValidBlockInstants();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 4fc7996c873..49f7ac7ca70 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -157,10 +157,11 @@ public abstract class AbstractHoodieLogRecordReader {
InternalSchema internalSchema,
Option<String> keyFieldOverride,
boolean enableOptimizedLogBlocksScan,
- HoodieRecordMerger recordMerger) {
+ HoodieRecordMerger recordMerger,
+ Option<HoodieTableMetaClient>
hoodieTableMetaClientOption) {
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
- this.hoodieTableMetaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
+ this.hoodieTableMetaClient = hoodieTableMetaClientOption.orElseGet(() ->
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build());
// load class from the payload fully qualified class name
HoodieTableConfig tableConfig =
this.hoodieTableMetaClient.getTableConfig();
this.payloadClassFQN = tableConfig.getPayloadClass();
@@ -1035,6 +1036,10 @@ public abstract class AbstractHoodieLogRecordReader {
throw new UnsupportedOperationException();
}
+ public Builder withTableMetaClient(HoodieTableMetaClient
hoodieTableMetaClient) {
+ throw new UnsupportedOperationException();
+ }
+
public abstract AbstractHoodieLogRecordReader build();
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 85008a03e13..9062641f1a7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -26,6 +26,7 @@ import
org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.DefaultSizeEstimator;
@@ -100,9 +101,11 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
Option<String> partitionName,
InternalSchema internalSchema,
Option<String> keyFieldOverride,
- boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger) {
+ boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger,
+ Option<HoodieTableMetaClient>
hoodieTableMetaClientOption) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime,
readBlocksLazily, reverseReader, bufferSize,
- instantRange, withOperationField, forceFullScan, partitionName,
internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger);
+ instantRange, withOperationField, forceFullScan, partitionName,
internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger,
+ hoodieTableMetaClientOption);
try {
this.maxMemorySizeInBytes = maxMemorySizeInBytes;
// Store merged records for all versions for this log file, set the
in-memory footprint to maxInMemoryMapSize
@@ -336,6 +339,7 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
private boolean forceFullScan = true;
private boolean enableOptimizedLogBlocksScan = false;
private HoodieRecordMerger recordMerger =
HoodiePreCombineAvroRecordMerger.INSTANCE;
+ protected HoodieTableMetaClient hoodieTableMetaClient;
@Override
public Builder withFileSystem(FileSystem fs) {
@@ -452,6 +456,12 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
return this;
}
+ @Override
+ public Builder withTableMetaClient(HoodieTableMetaClient
hoodieTableMetaClient) {
+ this.hoodieTableMetaClient = hoodieTableMetaClient;
+ return this;
+ }
+
@Override
public HoodieMergedLogRecordScanner build() {
if (this.partitionName == null &&
CollectionUtils.nonEmpty(this.logFilePaths)) {
@@ -463,7 +473,8 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
latestInstantTime, maxMemorySizeInBytes, readBlocksLazily,
reverseReader,
bufferSize, spillableMapBasePath, instantRange,
diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField,
forceFullScan,
- Option.ofNullable(partitionName), internalSchema,
Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan,
recordMerger);
+ Option.ofNullable(partitionName), internalSchema,
Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan, recordMerger,
+ Option.ofNullable(hoodieTableMetaClient));
}
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index f62ec0febd5..4d870618e7b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
@@ -44,9 +45,11 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordReade
private HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath,
List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, boolean
readBlocksLazily, boolean reverseReader, int bufferSize,
LogRecordScannerCallback callback,
Option<InstantRange> instantRange, InternalSchema internalSchema,
- boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger) {
+ boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger,
+ Option<HoodieTableMetaClient>
hoodieTableMetaClientOption) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime,
readBlocksLazily, reverseReader, bufferSize, instantRange,
- false, true, Option.empty(), internalSchema, Option.empty(),
enableOptimizedLogBlocksScan, recordMerger);
+ false, true, Option.empty(), internalSchema, Option.empty(),
enableOptimizedLogBlocksScan, recordMerger,
+ hoodieTableMetaClientOption);
this.callback = callback;
}
@@ -109,6 +112,7 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordReade
private LogRecordScannerCallback callback;
private boolean enableOptimizedLogBlocksScan;
private HoodieRecordMerger recordMerger =
HoodiePreCombineAvroRecordMerger.INSTANCE;
+ private HoodieTableMetaClient hoodieTableMetaClient;
public Builder withFileSystem(FileSystem fs) {
this.fs = fs;
@@ -180,13 +184,20 @@ public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordReade
return this;
}
+ @Override
+ public HoodieUnMergedLogRecordScanner.Builder withTableMetaClient(
+ HoodieTableMetaClient hoodieTableMetaClient) {
+ this.hoodieTableMetaClient = hoodieTableMetaClient;
+ return this;
+ }
+
@Override
public HoodieUnMergedLogRecordScanner build() {
ValidationUtils.checkArgument(recordMerger != null);
return new HoodieUnMergedLogRecordScanner(fs, basePath, logFilePaths,
readerSchema,
latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
callback, instantRange,
- internalSchema, enableOptimizedLogBlocksScan, recordMerger);
+ internalSchema, enableOptimizedLogBlocksScan, recordMerger,
Option.ofNullable(hoodieTableMetaClient));
}
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 57cc08ab59f..105bf77b909 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -498,6 +498,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
.enableFullScan(allowFullScan)
.withPartition(partitionName)
.withEnableOptimizedLogBlocksScan(metadataConfig.doEnableOptimizedLogBlocksScan())
+ .withTableMetaClient(metadataMetaClient)
.build();
Long logScannerOpenMs = timer.endTimer();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
index fc071790e47..900260b9413 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataLogRecordReader.java
@@ -22,6 +22,7 @@ import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.Option;
@@ -215,6 +216,11 @@ public class HoodieMetadataLogRecordReader implements
Closeable {
return this;
}
+ public Builder withTableMetaClient(HoodieTableMetaClient
hoodieTableMetaClient) {
+ scannerBuilder.withTableMetaClient(hoodieTableMetaClient);
+ return this;
+ }
+
public HoodieMetadataLogRecordReader build() {
return new HoodieMetadataLogRecordReader(scannerBuilder.build());
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 076aac0454f..5a168892b15 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1837,6 +1837,7 @@ public class HoodieTableMetadataUtil {
engineType,
Collections.emptyList(), // TODO: support different merger
classes, which is currently only known to write config
metaClient.getTableConfig().getRecordMergerStrategy()))
+ .withTableMetaClient(metaClient)
.build();
ClosableIterator<String> recordKeyIterator =
ClosableIterator.wrap(mergedLogRecordScanner.getRecords().keySet().iterator());
return new ClosableIterator<HoodieRecord>() {