This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c9069fe57948 fix: Allow HFile Cache reader configuration to be
configurable (#13927)
c9069fe57948 is described below
commit c9069fe5794840971c1710034b08a0f36f53518e
Author: voonhous <[email protected]>
AuthorDate: Tue Sep 30 00:40:39 2025 +0800
fix: Allow HFile Cache reader configuration to be configurable (#13927)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/cli/commands/HoodieLogFileCommand.java | 11 ++--
.../org/apache/hudi/io/HoodieAppendHandle.java | 4 +-
.../hudi/metadata/HoodieMetadataWriteUtils.java | 8 +++
.../hudi/table/action/compact/HoodieCompactor.java | 12 ++++-
.../client/common/HoodieFlinkEngineContext.java | 2 +-
.../client/common/HoodieJavaEngineContext.java | 2 +-
.../hudi/client/TestJavaHoodieBackedMetadata.java | 3 +-
.../client/common/HoodieSparkEngineContext.java | 2 +-
.../apache/hudi/avro/HoodieAvroReaderContext.java | 33 ++++++++----
.../hudi/common/config/HoodieReaderConfig.java | 6 ---
.../common/engine/AvroReaderContextFactory.java | 11 ++--
.../hudi/common/engine/HoodieEngineContext.java | 2 +-
.../common/engine/HoodieLocalEngineContext.java | 2 +-
.../hudi/common/engine/HoodieReaderContext.java | 18 ++++++-
.../hudi/common/table/log/HoodieLogFileReader.java | 5 +-
.../table/log/block/HoodieHFileDataBlock.java | 25 +++------
.../org/apache/hudi/common/util/ConfigUtils.java | 60 ++++++++++++++++++++--
.../hudi/io/storage/HoodieFileReaderFactory.java | 4 --
.../hudi/metadata/HoodieBackedTableMetadata.java | 60 ++++------------------
.../testutils/reader/HoodieFileSliceTestUtils.java | 4 +-
.../common/testutils/HoodieCommonTestHarness.java | 3 +-
.../hudi/hadoop/testutils/InputFormatTestUtil.java | 3 +-
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 6 +--
.../hudi/functional/RecordLevelIndexTestBase.scala | 6 +--
.../hudi/functional/TestRecordLevelIndex.scala | 1 +
.../utilities/HoodieMetadataTableValidator.java | 7 +--
26 files changed, 163 insertions(+), 137 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 9dea9b8fa2ba..4549050d9a3e 100644
---
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -231,16 +231,17 @@ public class HoodieLogFileCommand {
List<IndexedRecord> allRecords = new ArrayList<>();
if (shouldMerge) {
System.out.println("===========================> MERGING RECORDS
<===================");
- HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(
- HoodieCLI.getTableMetaClient().getStorage().getConf(),
- HoodieCLI.getTableMetaClient().getTableConfig(),
- Option.empty(),
- Option.empty());
StoragePath firstLogFile = new StoragePath(logFilePaths.get(0));
HoodieFileGroupId fileGroupId = new
HoodieFileGroupId(FSUtils.getRelativePartitionPath(HoodieCLI.getTableMetaClient().getBasePath(),
firstLogFile), FSUtils.getFileIdFromLogPath(firstLogFile));
FileSlice fileSlice = new FileSlice(fileGroupId,
HoodieTimeline.INIT_INSTANT_TS, null, logFilePaths.stream()
.map(l -> new HoodieLogFile(new
StoragePath(l))).collect(Collectors.toList()));
TypedProperties fileGroupReaderProperties =
buildFileGroupReaderProperties();
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(
+ HoodieCLI.getTableMetaClient().getStorage().getConf(),
+ HoodieCLI.getTableMetaClient().getTableConfig(),
+ Option.empty(),
+ Option.empty(),
+ fileGroupReaderProperties);
try (HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>newBuilder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(HoodieCLI.getTableMetaClient())
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index eff2954ac05c..5b49cc4c8f8e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -20,7 +20,6 @@ package org.apache.hudi.io;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
@@ -720,8 +719,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
// Not supporting positions in HFile data blocks
header.remove(HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS);
return new HoodieHFileDataBlock(
- records, header, writeConfig.getHFileCompressionAlgorithm(), new
StoragePath(writeConfig.getBasePath()),
-
writeConfig.getBooleanOrDefault(HoodieReaderConfig.USE_NATIVE_HFILE_READER));
+ records, header, writeConfig.getHFileCompressionAlgorithm(), new
StoragePath(writeConfig.getBasePath()));
case PARQUET_DATA_BLOCK:
return new HoodieParquetDataBlock(
records,
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index 7895a5e4ced1..72594892d2d8 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.client.FailOnFirstErrorWriteStatus;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
@@ -199,6 +200,13 @@ public class HoodieMetadataWriteUtils {
properties.put(HoodieMetricsConfig.METRICS_REPORTER_PREFIX.key(),
writeConfig.getMetricReporterMetricsNamePrefix() +
METADATA_TABLE_NAME_SUFFIX);
}
+ // HFile caching properties
+ properties.put(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key(),
+
writeConfig.getBooleanOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED));
+ properties.put(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE.key(),
+
writeConfig.getIntOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE));
+ properties.put(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES.key(),
+
writeConfig.getIntOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES));
builder.withProperties(properties);
if (writeConfig.isMetricsOn()) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index 6e72397d541a..932841c98d48 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.engine.ReaderContextFactory;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
@@ -135,7 +136,13 @@ public abstract class HoodieCompactor<T, I, K, O>
implements Serializable {
operation -> logCompact(config, operation,
compactionInstantTime, instantRange, table, taskContextSupplier))
.flatMap(List::iterator);
} else {
- ReaderContextFactory<T> readerContextFactory =
context.getReaderContextFactory(metaClient);
+ ReaderContextFactory<T> readerContextFactory;
+ if (!metaClient.isMetadataTable()) {
+ readerContextFactory = context.getReaderContextFactory(metaClient);
+ } else {
+ // Payload and HFile caching props are required here
+ readerContextFactory = (ReaderContextFactory<T>)
context.getReaderContextFactoryForWrite(metaClient, HoodieRecordType.AVRO,
config.getProps());
+ }
return context.parallelize(operations).map(
operation -> compact(config, operation, compactionInstantTime,
readerContextFactory.getContext(), table, maxInstantTime, taskContextSupplier))
.flatMap(List::iterator);
@@ -164,7 +171,8 @@ public abstract class HoodieCompactor<T, I, K, O>
implements Serializable {
Option<InstantRange> instantRange,
HoodieTable table,
TaskContextSupplier taskContextSupplier)
throws IOException {
- HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(table.getStorageConf(),
table.getMetaClient().getTableConfig(), instantRange, Option.empty());
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(
+ table.getStorageConf(), table.getMetaClient().getTableConfig(),
instantRange, Option.empty(), writeConfig.getProps());
FileGroupReaderBasedAppendHandle<IndexedRecord, ?, ?, ?> appendHandle =
new FileGroupReaderBasedAppendHandle<>(writeConfig, instantTime, table,
operation, taskContextSupplier, readerContext);
appendHandle.doAppend();
return appendHandle.close();
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index 8c45c9c35f9d..edbdc1ef0703 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -248,7 +248,7 @@ public class HoodieFlinkEngineContext extends
HoodieEngineContext {
public ReaderContextFactory<?> getReaderContextFactory(HoodieTableMetaClient
metaClient) {
// metadata table reads are only supported by the AvroReaderContext.
if (metaClient.isMetadataTable()) {
- return new AvroReaderContextFactory(metaClient);
+ return new AvroReaderContextFactory(metaClient, new TypedProperties());
}
return getEngineReaderContextFactory(metaClient);
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
index a96caf311328..18383a3e4688 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/common/HoodieJavaEngineContext.java
@@ -199,7 +199,7 @@ public class HoodieJavaEngineContext extends
HoodieEngineContext {
@Override
public ReaderContextFactory<IndexedRecord>
getEngineReaderContextFactory(HoodieTableMetaClient metaClient) {
- return new AvroReaderContextFactory(metaClient);
+ return new AvroReaderContextFactory(metaClient, new TypedProperties());
}
@Override
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index cc2e5803a809..f263ab7a7827 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -949,7 +949,8 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
if (enableMetaFields) {
schema = HoodieAvroUtils.addMetadataFields(schema);
}
- HoodieAvroReaderContext readerContext = new
HoodieAvroReaderContext(metadataMetaClient.getStorageConf(),
metadataMetaClient.getTableConfig(), Option.empty(), Option.empty());
+ HoodieAvroReaderContext readerContext = new
HoodieAvroReaderContext(metadataMetaClient.getStorageConf(),
metadataMetaClient.getTableConfig(), Option.empty(), Option.empty(),
+ new TypedProperties());
HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>newBuilder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metadataMetaClient)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
index 09d371d63eae..fca42be1a97e 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/HoodieSparkEngineContext.java
@@ -260,7 +260,7 @@ public class HoodieSparkEngineContext extends
HoodieEngineContext {
public ReaderContextFactory<?> getReaderContextFactory(HoodieTableMetaClient
metaClient) {
// metadata table are only supported by the AvroReaderContext.
if (metaClient.isMetadataTable()) {
- return new AvroReaderContextFactory(metaClient);
+ return new AvroReaderContextFactory(metaClient, new TypedProperties());
}
return getEngineReaderContextFactory(metaClient);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index c659c6beecaf..5af1aadd4156 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -21,6 +21,7 @@ package org.apache.hudi.avro;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.fs.FSUtils;
@@ -34,6 +35,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.table.read.BufferedRecordSerializer;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SizeEstimator;
@@ -77,12 +79,21 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
* @param instantRangeOpt the set of valid instants for this read
* @param filterOpt an optional filter to apply on the record keys
*/
+ public HoodieAvroReaderContext(
+ StorageConfiguration<?> storageConfiguration,
+ HoodieTableConfig tableConfig,
+ Option<InstantRange> instantRangeOpt,
+ Option<Predicate> filterOpt,
+ TypedProperties props) {
+ this(storageConfiguration, tableConfig, instantRangeOpt, filterOpt,
Collections.emptyMap(), tableConfig.getPayloadClass(), new HoodieConfig(props));
+ }
+
public HoodieAvroReaderContext(
StorageConfiguration<?> storageConfiguration,
HoodieTableConfig tableConfig,
Option<InstantRange> instantRangeOpt,
Option<Predicate> filterOpt) {
- this(storageConfiguration, tableConfig, instantRangeOpt, filterOpt,
Collections.emptyMap());
+ this(storageConfiguration, tableConfig, instantRangeOpt, filterOpt,
Collections.emptyMap(), tableConfig.getPayloadClass(),
ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER);
}
/**
@@ -100,8 +111,9 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
HoodieTableConfig tableConfig,
Option<InstantRange> instantRangeOpt,
Option<Predicate> filterOpt,
- Map<StoragePath, HoodieAvroFileReader> reusableFileReaders) {
- this(storageConfiguration, tableConfig, instantRangeOpt, filterOpt,
reusableFileReaders, tableConfig.getPayloadClass());
+ Map<StoragePath, HoodieAvroFileReader> reusableFileReaders,
+ TypedProperties props) {
+ this(storageConfiguration, tableConfig, instantRangeOpt, filterOpt,
reusableFileReaders, tableConfig.getPayloadClass(), new HoodieConfig(props));
}
/**
@@ -110,12 +122,14 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
* @param storageConfiguration the storage configuration to use for reading
files
* @param tableConfig the configuration of the Hudi table being read
* @param payloadClassName the payload class for the writer
+ * @param props the reader configurations that should be used
when performing reads
*/
public HoodieAvroReaderContext(
StorageConfiguration<?> storageConfiguration,
HoodieTableConfig tableConfig,
- String payloadClassName) {
- this(storageConfiguration, tableConfig, Option.empty(), Option.empty(),
Collections.emptyMap(), payloadClassName);
+ String payloadClassName,
+ TypedProperties props) {
+ this(storageConfiguration, tableConfig, Option.empty(), Option.empty(),
Collections.emptyMap(), payloadClassName, new HoodieConfig(props));
}
private HoodieAvroReaderContext(
@@ -124,8 +138,9 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
Option<InstantRange> instantRangeOpt,
Option<Predicate> filterOpt,
Map<StoragePath, HoodieAvroFileReader> reusableFileReaders,
- String payloadClassName) {
- super(storageConfiguration, tableConfig, instantRangeOpt, filterOpt, new
AvroRecordContext(tableConfig, payloadClassName));
+ String payloadClassName,
+ HoodieConfig hoodieReaderConfig) {
+ super(storageConfiguration, tableConfig, instantRangeOpt, filterOpt, new
AvroRecordContext(tableConfig, payloadClassName), hoodieReaderConfig);
this.reusableFileReaders = reusableFileReaders;
this.isMultiFormat = tableConfig.isMultipleBaseFileFormatsEnabled();
}
@@ -138,7 +153,7 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
HoodieAvroFileReader reader =
getOrCreateFileReader(storagePathInfo.getPath(), isLogFile, format -> {
try {
return (HoodieAvroFileReader) HoodieIOFactory.getIOFactory(storage)
-
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(new
HoodieConfig(),
+
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hoodieReaderConfig,
storagePathInfo, format, Option.empty());
} catch (IOException e) {
throw new HoodieIOException("Failed to create avro records iterator
from file path " + storagePathInfo.getPath(), e);
@@ -160,7 +175,7 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
HoodieAvroFileReader reader = getOrCreateFileReader(filePath, isLogFile,
format -> {
try {
return (HoodieAvroFileReader) HoodieIOFactory.getIOFactory(storage)
-
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(new
HoodieConfig(),
+
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hoodieReaderConfig,
filePath, format, Option.empty());
} catch (IOException e) {
throw new HoodieIOException("Failed to create avro records iterator
from file path " + filePath, e);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
index fa7a1346b1a0..702309874261 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
@@ -29,12 +29,6 @@ import javax.annotation.concurrent.Immutable;
groupName = ConfigGroups.Names.READER,
description = "Configurations that control file group reading.")
public class HoodieReaderConfig extends HoodieConfig {
- public static final ConfigProperty<Boolean> USE_NATIVE_HFILE_READER =
ConfigProperty
- .key("_hoodie.hfile.use.native.reader")
- .defaultValue(true)
- .markAdvanced()
- .sinceVersion("1.0.0")
- .withDocumentation("When enabled, the native HFile reader is used to
read HFiles. This is an internal config.");
public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLE
= ConfigProperty
.key("hoodie.compaction.lazy.block.read")
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/AvroReaderContextFactory.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/AvroReaderContextFactory.java
index 0a8eec241fce..590f76a8cc9d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/AvroReaderContextFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/AvroReaderContextFactory.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.engine;
import org.apache.hudi.avro.HoodieAvroReaderContext;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.avro.generic.IndexedRecord;
@@ -29,18 +30,20 @@ import org.apache.avro.generic.IndexedRecord;
public class AvroReaderContextFactory implements
ReaderContextFactory<IndexedRecord> {
private final HoodieTableMetaClient metaClient;
private final String payloadClassName;
+ private final TypedProperties props;
- public AvroReaderContextFactory(HoodieTableMetaClient metaClient) {
- this(metaClient, metaClient.getTableConfig().getPayloadClass());
+ public AvroReaderContextFactory(HoodieTableMetaClient metaClient,
TypedProperties props) {
+ this(metaClient, metaClient.getTableConfig().getPayloadClass(), props);
}
- public AvroReaderContextFactory(HoodieTableMetaClient metaClient, String
payloadClassName) {
+ public AvroReaderContextFactory(HoodieTableMetaClient metaClient, String
payloadClassName, TypedProperties props) {
this.metaClient = metaClient;
this.payloadClassName = payloadClassName;
+ this.props = props;
}
@Override
public HoodieReaderContext<IndexedRecord> getContext() {
- return new HoodieAvroReaderContext(metaClient.getStorageConf(),
metaClient.getTableConfig(), payloadClassName);
+ return new HoodieAvroReaderContext(metaClient.getStorageConf(),
metaClient.getTableConfig(), payloadClassName, props);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
index f288baae2c97..2f929b5f106e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java
@@ -147,7 +147,7 @@ public abstract class HoodieEngineContext {
TypedProperties properties) {
if (recordType == HoodieRecord.HoodieRecordType.AVRO) {
String payloadClass = ConfigUtils.getPayloadClass(properties);
- return new AvroReaderContextFactory(metaClient, payloadClass);
+ return new AvroReaderContextFactory(metaClient, payloadClass,
properties);
}
return getEngineReaderContextFactory(metaClient);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
index 469225cf3c87..9abf88875f1d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java
@@ -195,7 +195,7 @@ public final class HoodieLocalEngineContext extends
HoodieEngineContext {
@Override
public ReaderContextFactory<IndexedRecord>
getEngineReaderContextFactory(HoodieTableMetaClient metaClient) {
- return new AvroReaderContextFactory(metaClient);
+ return new AvroReaderContextFactory(metaClient, new TypedProperties());
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index c64e7fbee7a7..fb6fcf612999 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.engine;
+import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -94,18 +95,29 @@ public abstract class HoodieReaderContext<T> {
private FileGroupReaderSchemaHandler<T> schemaHandler = null;
// the default iterator mode is engine-specific record mode
private IteratorMode iteratorMode = IteratorMode.ENGINE_RECORD;
+ protected final HoodieConfig hoodieReaderConfig;
+
+ protected HoodieReaderContext(StorageConfiguration<?> storageConfiguration,
+ HoodieTableConfig tableConfig,
+ Option<InstantRange> instantRangeOpt,
+ Option<Predicate> keyFilterOpt,
+ RecordContext<T> recordContext) {
+ this(storageConfiguration, tableConfig, instantRangeOpt, keyFilterOpt,
recordContext, ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER);
+ }
protected HoodieReaderContext(StorageConfiguration<?> storageConfiguration,
HoodieTableConfig tableConfig,
Option<InstantRange> instantRangeOpt,
Option<Predicate> keyFilterOpt,
- RecordContext<T> recordContext) {
+ RecordContext<T> recordContext,
+ HoodieConfig hoodieReaderConfig) {
this.tableConfig = tableConfig;
this.storageConfiguration = storageConfiguration;
this.baseFileFormat = tableConfig.getBaseFileFormat();
this.instantRangeOpt = instantRangeOpt;
this.keyFilterOpt = keyFilterOpt;
this.recordContext = recordContext;
+ this.hoodieReaderConfig = hoodieReaderConfig;
}
// Getter and Setter for schemaHandler
@@ -213,6 +225,10 @@ public abstract class HoodieReaderContext<T> {
return recordContext;
}
+ public HoodieConfig getHoodieReaderConfig() {
+ return hoodieReaderConfig;
+ }
+
/**
* Gets the record iterator based on the type of engine-specific record
representation from the
* file.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 5202f68e0279..72aabb410526 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -18,7 +18,6 @@
package org.apache.hudi.common.table.log;
-import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
@@ -198,9 +197,7 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
String.format("HFile block could not be of version (%d)",
HoodieLogFormatVersion.DEFAULT_VERSION));
return new HoodieHFileDataBlock(
() -> getDataInputStream(storage, this.logFile, bufferSize),
content, true, logBlockContentLoc,
- Option.ofNullable(readerSchema), header, footer,
enableRecordLookups, logFile.getPath(),
-
storage.getConf().getBoolean(HoodieReaderConfig.USE_NATIVE_HFILE_READER.key(),
- HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue()));
+ Option.ofNullable(readerSchema), header, footer,
enableRecordLookups, logFile.getPath());
case PARQUET_DATA_BLOCK:
checkState(nextBlockVersion.getVersion() !=
HoodieLogFormatVersion.DEFAULT_VERSION,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 6480ce83eff5..0ee914c32dc5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -18,12 +18,11 @@
package org.apache.hudi.common.table.log.block;
-import org.apache.hudi.common.config.HoodieConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
@@ -64,7 +63,6 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
// This path is used for constructing HFile reader context, which should not
be
// interpreted as the actual file path for the HFile data blocks
private final StoragePath pathForReader;
- private final HoodieConfig hFileReaderConfig;
public HoodieHFileDataBlock(Supplier<SeekableDataInputStream>
inputStreamSupplier,
Option<byte[]> content,
@@ -74,24 +72,20 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
Map<HeaderMetadataType, String> header,
Map<FooterMetadataType, String> footer,
boolean enablePointLookups,
- StoragePath pathForReader,
- boolean useNativeHFileReader) {
+ StoragePath pathForReader) {
super(content, inputStreamSupplier, readBlockLazily,
Option.of(logBlockContentLocation), readerSchema,
header, footer, HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME,
enablePointLookups);
this.compressionCodec = Option.empty();
this.pathForReader = pathForReader;
- this.hFileReaderConfig = getHFileReaderConfig(useNativeHFileReader);
}
public HoodieHFileDataBlock(List<HoodieRecord> records,
Map<HeaderMetadataType, String> header,
String compressionCodec,
- StoragePath pathForReader,
- boolean useNativeHFileReader) {
+ StoragePath pathForReader) {
super(records, header, new HashMap<>(),
HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME);
this.compressionCodec = Option.of(compressionCodec);
this.pathForReader = pathForReader;
- this.hFileReaderConfig = getHFileReaderConfig(useNativeHFileReader);
}
@Override
@@ -118,7 +112,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
// Read the content
try (HoodieFileReader reader = HoodieIOFactory.getIOFactory(inlineStorage)
.getReaderFactory(HoodieRecordType.AVRO)
- .getContentReader(hFileReaderConfig, pathForReader,
HoodieFileFormat.HFILE,
+ .getContentReader(ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER,
pathForReader, HoodieFileFormat.HFILE,
inlineStorage, content, Option.of(getSchemaFromHeader()))) {
return unsafeCast(reader.getRecordIterator(readerSchema));
}
@@ -151,7 +145,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
// Read the content
try (HoodieAvroHFileReaderImplBase reader = (HoodieAvroHFileReaderImplBase)
HoodieIOFactory.getIOFactory(inlineStorage).getReaderFactory(HoodieRecordType.AVRO).getContentReader(
- hFileReaderConfig, pathForReader, HoodieFileFormat.HFILE,
inlineStorage, content,
+ readerContext.getHoodieReaderConfig(), pathForReader,
HoodieFileFormat.HFILE, inlineStorage, content,
Option.of(getSchemaFromHeader()))) {
return unsafeCast(reader.getIndexedRecordIterator(readerSchema,
readerSchema));
}
@@ -175,7 +169,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
try (final HoodieAvroHFileReaderImplBase reader =
(HoodieAvroHFileReaderImplBase) HoodieIOFactory
.getIOFactory(inlineStorage)
.getReaderFactory(HoodieRecordType.AVRO)
- .getFileReader(hFileReaderConfig, inlinePath, HoodieFileFormat.HFILE,
Option.of(getSchemaFromHeader()))) {
+ .getFileReader(ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER, inlinePath,
HoodieFileFormat.HFILE, Option.of(getSchemaFromHeader()))) {
// Get writer's schema from the header
final ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator =
fullKey ? reader.getRecordsByKeysIterator(sortedKeys, readerSchema)
: reader.getRecordsByKeyPrefixIterator(sortedKeys, readerSchema);
@@ -183,11 +177,4 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
return new CloseableMappingIterator<>(recordIterator, data ->
(HoodieRecord<T>) data);
}
}
-
- private HoodieConfig getHFileReaderConfig(boolean useNativeHFileReader) {
- HoodieConfig config = new HoodieConfig();
- config.setValue(
- HoodieReaderConfig.USE_NATIVE_HFILE_READER,
Boolean.toString(useNativeHFileReader));
- return config;
- }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 63ff1428b708..816394a207e4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -19,7 +19,10 @@
package org.apache.hudi.common.util;
import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.PropertiesConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodiePayloadProps;
@@ -52,7 +55,10 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static
org.apache.hudi.common.config.HoodieReaderConfig.USE_NATIVE_HFILE_READER;
+import static
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
import static
org.apache.hudi.keygen.constant.KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED;
@@ -748,8 +754,25 @@ public class ConfigUtils {
public static HoodieConfig getReaderConfigs(StorageConfiguration<?>
storageConf) {
HoodieConfig config = new HoodieConfig();
config.setAll(DEFAULT_HUDI_CONFIG_FOR_READER.getProps());
- config.setValue(USE_NATIVE_HFILE_READER,
- Boolean.toString(storageConf.getBoolean(USE_NATIVE_HFILE_READER.key(),
USE_NATIVE_HFILE_READER.defaultValue())));
+ return config;
+ }
+
+ /**
+ * Apply HFile cache configurations from options to a HoodieConfig.
+ * This method extracts HFile cache-related settings from the provided
options map
+ * and applies them to the given HoodieConfig instance.
+ *
+ * @param options Map of options containing HFile cache configurations
+ * @return HoodieConfig with HFile reader configurations
+ */
+ public static HoodieReaderConfig getHFileCacheConfigs(Map<String, String>
options) {
+ HoodieReaderConfig config = new HoodieReaderConfig();
+ config.setValue(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED,
+ getStringWithAltKeys(options,
HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED));
+ config.setValue(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE,
+ getStringWithAltKeys(options,
HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE));
+ config.setValue(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES,
+ getStringWithAltKeys(options,
HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES));
return config;
}
@@ -789,4 +812,35 @@ public class ConfigUtils {
}
return mergeProperties;
}
+
+ /**
+ * Derive necessary properties for FG reader.
+ */
+ public static TypedProperties
buildFileGroupReaderProperties(HoodieMetadataConfig metadataConfig,
+ boolean
shouldReuse) {
+ HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder()
+ .fromProperties(metadataConfig.getProps()).build();
+ TypedProperties props = new TypedProperties();
+ props.setProperty(
+ MAX_MEMORY_FOR_MERGE.key(),
+ Long.toString(metadataConfig.getMaxReaderMemory()));
+ props.setProperty(
+ SPILLABLE_MAP_BASE_PATH.key(),
+ metadataConfig.getSplliableMapDir());
+ props.setProperty(
+ SPILLABLE_DISK_MAP_TYPE.key(),
+ commonConfig.getSpillableDiskMapType().name());
+ props.setProperty(
+ DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
+ Boolean.toString(commonConfig.isBitCaskDiskMapCompressionEnabled()));
+ props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key(),
+ shouldReuse ? "true" :
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED));
+ props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key(),
+
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED));
+ props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE.key(),
+
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE));
+ props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES.key(),
+
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES));
+ return props;
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
index 912fc6a76a0c..60d9afe199e5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReaderFactory.java
@@ -19,7 +19,6 @@
package org.apache.hudi.io.storage;
import org.apache.hudi.common.config.HoodieConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.util.Option;
@@ -134,7 +133,4 @@ public class HoodieFileReaderFactory {
throw new UnsupportedOperationException();
}
- protected static boolean isUseNativeHFileReaderEnabled(HoodieConfig
hoodieConfig) {
- return
hoodieConfig.getBooleanOrDefault(HoodieReaderConfig.USE_NATIVE_HFILE_READER);
- }
}
\ No newline at end of file
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 9fb9a6c93cb4..63c3bcf5b102 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -21,10 +21,8 @@ package org.apache.hudi.metadata;
import org.apache.hudi.avro.HoodieAvroReaderContext;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
-import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
@@ -49,6 +47,7 @@ import
org.apache.hudi.common.table.read.buffer.FileGroupRecordBufferLoader;
import
org.apache.hudi.common.table.read.buffer.ReusableFileGroupRecordBufferLoader;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.HoodieDataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -89,7 +88,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
@@ -97,10 +95,6 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
-import static
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
-import static
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
-import static
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FULL_SCAN_LOG_FILES;
import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.metadata.HoodieMetadataPayload.KEY_FIELD_NAME;
@@ -510,17 +504,16 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
Map<StoragePath, HoodieAvroFileReader> baseFileReaders =
Collections.emptyMap();
ReusableFileGroupRecordBufferLoader<IndexedRecord> recordBufferLoader =
null;
boolean shouldReuse = reuse &&
isFullScanAllowedForPartition(fileSlice.getPartitionPath());
+ TypedProperties fileGroupReaderProps =
ConfigUtils.buildFileGroupReaderProperties(metadataConfig, shouldReuse);
if (shouldReuse) {
Pair<HoodieAvroFileReader,
ReusableFileGroupRecordBufferLoader<IndexedRecord>> readers =
reusableFileReaders.computeIfAbsent(fileSlice.getFileGroupId(), fgId
-> {
try {
HoodieAvroFileReader baseFileReader = null;
if (fileSlice.getBaseFile().isPresent()) {
- TypedProperties props =
TypedProperties.copy(metadataConfig.getProps());
- setHFileBlockCacheProps(props);
- HoodieConfig newConfig = new HoodieConfig(props);
+ HoodieConfig fileGroupReaderConfig = new
HoodieConfig(fileGroupReaderProps);
baseFileReader = (HoodieAvroFileReader)
HoodieIOFactory.getIOFactory(storage).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
- .getFileReader(newConfig,
fileSlice.getBaseFile().get().getStoragePath(),
metadataMetaClient.getTableConfig().getBaseFileFormat(), Option.empty());
+ .getFileReader(fileGroupReaderConfig,
fileSlice.getBaseFile().get().getStoragePath(),
metadataMetaClient.getTableConfig().getBaseFileFormat(), Option.empty());
}
return Pair.of(baseFileReader,
buildReusableRecordBufferLoader(fileSlice, latestMetadataInstantTime,
instantRange));
} catch (IOException ex) {
@@ -540,7 +533,8 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
metadataMetaClient.getTableConfig(),
instantRange,
Option.of(predicate),
- baseFileReaders);
+ baseFileReaders,
+ fileGroupReaderProps);
HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>newBuilder()
.withReaderContext(readerContext)
@@ -549,7 +543,7 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
.withFileSlice(fileSlice)
.withDataSchema(SCHEMA)
.withRequestedSchema(SCHEMA)
- .withProps(buildFileGroupReaderProperties(metadataConfig, shouldReuse))
+ .withProps(fileGroupReaderProps)
.withRecordBufferLoader(recordBufferLoader)
.withEnableOptimizedLogBlockScan(metadataConfig.isOptimizedLogBlocksScanEnabled())
.build();
@@ -564,7 +558,8 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
storageConf,
metadataMetaClient.getTableConfig(),
instantRangeOption,
- Option.empty());
+ Option.empty(),
+ ConfigUtils.buildFileGroupReaderProperties(metadataConfig, true));
readerContext.initRecordMerger(metadataConfig.getProps());
readerContext.setHasBootstrapBaseFile(false);
readerContext.setHasLogFiles(fileSlice.hasLogFiles());
@@ -858,41 +853,4 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
return readIndexRecords(rawKeys, partitionName, Option.empty())
.mapToPair(hoodieRecord ->
SecondaryIndexKeyUtils.getSecondaryKeyRecordKeyPair(hoodieRecord.getRecordKey()));
}
-
- /**
- * Derive necessary properties for FG reader.
- */
- TypedProperties buildFileGroupReaderProperties(HoodieMetadataConfig
metadataConfig, boolean shouldReuse) {
- HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder()
- .fromProperties(metadataConfig.getProps()).build();
- TypedProperties props = new TypedProperties();
- props.setProperty(
- MAX_MEMORY_FOR_MERGE.key(),
- Long.toString(metadataConfig.getMaxReaderMemory()));
- props.setProperty(
- SPILLABLE_MAP_BASE_PATH.key(),
- metadataConfig.getSplliableMapDir());
- props.setProperty(
- SPILLABLE_DISK_MAP_TYPE.key(),
- commonConfig.getSpillableDiskMapType().name());
- props.setProperty(
- DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
- Boolean.toString(commonConfig.isBitCaskDiskMapCompressionEnabled()));
- if (shouldReuse) {
- setHFileBlockCacheProps(props);
- } else {
- props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key(),
-
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED));
- }
- props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE.key(),
-
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE));
- props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES.key(),
-
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES));
- return props;
- }
-
- private void setHFileBlockCacheProps(Properties props) {
- // Enable HFile block caching for resue and full scan usage
- props.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key(),
"true");
- }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
index 739cd4bb55ea..c3b42c278778 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
@@ -21,7 +21,6 @@ package org.apache.hudi.common.testutils.reader;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.config.HoodieConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.LocalTaskContextSupplier;
@@ -198,8 +197,7 @@ public class HoodieFileSliceTestUtils {
records,
header,
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(),
- pathForReader,
- HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
+ pathForReader);
case PARQUET_DATA_BLOCK:
return new HoodieParquetDataBlock(
records,
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index e5f4140cdce8..55643d5b239d 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -18,7 +18,6 @@
package org.apache.hudi.common.testutils;
-import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
@@ -411,7 +410,7 @@ public class HoodieCommonTestHarness {
case AVRO_DATA_BLOCK:
return new HoodieAvroDataBlock(records, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
case HFILE_DATA_BLOCK:
- return new HoodieHFileDataBlock(records, header,
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), pathForReader,
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
+ return new HoodieHFileDataBlock(records, header,
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), pathForReader);
case PARQUET_DATA_BLOCK:
return new HoodieParquetDataBlock(records, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD,
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true);
default:
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index c766076d66a2..7468a5868106 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -19,7 +19,6 @@
package org.apache.hudi.hadoop.testutils;
import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -421,7 +420,7 @@ public class InputFormatTestUtil {
List<HoodieRecord> hoodieRecords =
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
if (logBlockType == HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK) {
dataBlock = new HoodieHFileDataBlock(
- hoodieRecords, header,
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), writer.getLogFile().getPath(),
HoodieReaderConfig.USE_NATIVE_HFILE_READER.defaultValue());
+ hoodieRecords, header,
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue(), writer.getLogFile().getPath());
} else if (logBlockType ==
HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) {
dataBlock = new HoodieParquetDataBlock(hoodieRecords, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD,
PARQUET_COMPRESSION_CODEC_NAME.defaultValue(), 0.1, true);
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index e0a02bd3b84c..2ee6c6c92fb5 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -24,7 +24,6 @@ import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
import org.apache.hudi.common.config.{ConfigProperty, HoodieConfig,
HoodieMetadataConfig}
-import org.apache.hudi.common.config.HoodieReaderConfig.USE_NATIVE_HFILE_READER
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
import org.apache.hudi.common.model.{FileSlice, HoodieFileFormat, HoodieRecord}
@@ -840,9 +839,8 @@ object HoodieBaseRelation extends SparkAdapterSupport {
partitionedFile => {
val hadoopConf = hadoopConfBroadcast.value.value
val filePath =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(partitionedFile)
- val hoodieConfig = new HoodieConfig()
- hoodieConfig.setValue(USE_NATIVE_HFILE_READER,
- options.getOrElse(USE_NATIVE_HFILE_READER.key(),
USE_NATIVE_HFILE_READER.defaultValue().toString))
+ val hoodieConfig = ConfigUtils.getHFileCacheConfigs(options.asJava)
+
val reader = new HoodieSparkIOFactory(
new HoodieHadoopStorage(filePath,
HadoopFSUtils.getStorageConf(hadoopConf)))
.getReaderFactory(HoodieRecordType.AVRO)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index 943126be87f5..3dcbce2186af 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -19,7 +19,7 @@ package org.apache.hudi.functional
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieReaderConfig}
import org.apache.hudi.common.data.HoodieListData
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
@@ -45,7 +45,7 @@ class RecordLevelIndexTestBase extends
HoodieStatsIndexTestBase {
PARTITIONPATH_FIELD.key -> "partition",
HoodieTableConfig.POPULATE_META_FIELDS.key -> "true",
HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "15",
- "hoodie.hfile.block.cache.enabled" -> "true"
+ HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE.key() -> "200"
) ++ baseOpts ++ metadataOpts
val secondaryIndexOpts: Map[String, String] = Map(
@@ -128,7 +128,7 @@ class RecordLevelIndexTestBase extends
HoodieStatsIndexTestBase {
deletedDf: DataFrame =
sparkSession.emptyDataFrame): Unit = {
val writeConfig = getWriteConfig(hudiOpts)
val metadata = metadataWriter(writeConfig).getTableMetadata
- val readDf = spark.read.format("hudi").load(basePath)
+ val readDf = spark.read.options(hudiOpts).format("hudi").load(basePath)
readDf.cache()
val rowArr = readDf.collect()
val recordIndexMap =
HoodieDataUtils.dedupeAndCollectAsMap(metadata.readRecordIndexLocationsWithKeys(
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index ef6e2f6a978a..a6b857952437 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -363,6 +363,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
{
saveMode = SaveMode.Overwrite)
insertDf.cache()
+ spark.sql(s"SET hoodie.hfile.block.cache.size = 200")
spark.sql(s"CREATE TABLE IF NOT EXISTS hudi_indexed_table USING hudi
OPTIONS (hoodie.metadata.enable = 'true', hoodie.metadata.record.index.enable =
'true', hoodie.write.merge.handle.class =
'org.apache.hudi.io.FileGroupReaderBasedMergeHandle') LOCATION '$basePath'")
val existingKeys = dataGen.getExistingKeys
spark.sql(s"DELETE FROM hudi_indexed_table WHERE _row_key IN
('${existingKeys.get(0)}', '${existingKeys.get(1)}')")
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index b9f3fd5e0004..90e3cddce15c 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
@@ -65,7 +64,6 @@ import
org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CleanerUtils;
-import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieDataUtils;
@@ -1982,12 +1980,9 @@ public class HoodieMetadataTableValidator implements
Serializable {
StoragePath path = new StoragePath(
FSUtils.constructAbsolutePath(metaClient.getBasePath(),
partitionPath).toString(), filename);
BloomFilter bloomFilter;
- HoodieConfig hoodieConfig = new HoodieConfig();
- hoodieConfig.setValue(HoodieReaderConfig.USE_NATIVE_HFILE_READER,
- Boolean.toString(ConfigUtils.getBooleanWithAltKeys(props,
HoodieReaderConfig.USE_NATIVE_HFILE_READER)));
try (HoodieFileReader fileReader =
getHoodieSparkIOFactory(metaClient.getStorage())
.getReaderFactory(HoodieRecordType.AVRO)
- .getFileReader(hoodieConfig, path)) {
+ .getFileReader(new HoodieConfig(), path)) {
bloomFilter = fileReader.readBloomFilter();
if (bloomFilter == null) {
LOG.error("Failed to read bloom filter for {}", path);