This is an automated email from the ASF dual-hosted git repository.
jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bf1df335442 [HUDI-7876] use properties to store log spill map configs
for fg reader (#11455)
bf1df335442 is described below
commit bf1df335442d38932cf7f8c6ef4228c316278569
Author: Jon Vexler <[email protected]>
AuthorDate: Tue Jun 18 12:30:56 2024 -0400
[HUDI-7876] use properties to store log spill map configs for fg reader
(#11455)
* use properties to store log spill map configs for fg reader
* use constant for the max buffer size
* rename payloadProps to props
---------
Co-authored-by: Jonathan Vexler <=>
---
.../read/HoodieBaseFileGroupRecordBuffer.java | 46 +++++++++++++---------
.../common/table/read/HoodieFileGroupReader.java | 13 ++----
.../read/HoodieKeyBasedFileGroupRecordBuffer.java | 10 +----
.../HoodiePositionBasedFileGroupRecordBuffer.java | 10 +----
.../read/HoodieUnmergedFileGroupRecordBuffer.java | 10 +----
.../table/read/TestHoodieFileGroupReaderBase.java | 9 +++--
.../reader/HoodieFileGroupReaderTestUtils.java | 12 +++---
.../HoodieFileGroupReaderBasedRecordReader.java | 24 +++++------
...odieFileGroupReaderBasedParquetFileFormat.scala | 15 +++----
...stHoodiePositionBasedFileGroupRecordBuffer.java | 11 +++---
10 files changed, 68 insertions(+), 92 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index 88ec42673ac..aea50e44fbe 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.KeySpec;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
@@ -50,9 +51,14 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;
+import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
+import static
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_SCHEMA;
import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
import static
org.apache.hudi.common.table.read.HoodieFileGroupReader.getRecordMergeMode;
@@ -64,7 +70,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
protected final Option<String[]> partitionPathFieldOpt;
protected final RecordMergeMode recordMergeMode;
protected final HoodieRecordMerger recordMerger;
- protected final TypedProperties payloadProps;
+ protected final TypedProperties props;
protected final ExternalSpillableMap<Serializable, Pair<Option<T>,
Map<String, Object>>> records;
protected ClosableIterator<T> baseFileIterator;
protected Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
@@ -78,24 +84,26 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
Option<String>
partitionNameOverrideOpt,
Option<String[]>
partitionPathFieldOpt,
HoodieRecordMerger recordMerger,
- TypedProperties payloadProps,
- long maxMemorySizeInBytes,
- String spillableMapBasePath,
- ExternalSpillableMap.DiskMapType
diskMapType,
- boolean
isBitCaskDiskMapCompressionEnabled) {
+ TypedProperties props) {
this.readerContext = readerContext;
this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
this.partitionNameOverrideOpt = partitionNameOverrideOpt;
this.partitionPathFieldOpt = partitionPathFieldOpt;
- this.recordMergeMode = getRecordMergeMode(payloadProps);
+ this.recordMergeMode = getRecordMergeMode(props);
this.recordMerger = recordMerger;
//Custom merge mode should produce the same results for any merger so we
won't fail if there is a mismatch
if (recordMerger.getRecordMergeMode() != this.recordMergeMode &&
this.recordMergeMode != RecordMergeMode.CUSTOM) {
throw new IllegalStateException("Record merger is " +
recordMerger.getClass().getName() + " but merge mode is " +
this.recordMergeMode);
}
- this.payloadProps = payloadProps;
+ this.props = props;
this.internalSchema = readerContext.getSchemaHandler().getInternalSchema();
this.hoodieTableMetaClient = hoodieTableMetaClient;
+ long maxMemorySizeInBytes = props.getLong(MAX_MEMORY_FOR_MERGE.key(),
MAX_MEMORY_FOR_MERGE.defaultValue());
+ String spillableMapBasePath =
props.getString(SPILLABLE_MAP_BASE_PATH.key(),
FileIOUtils.getDefaultSpillableMapBasePath());
+ ExternalSpillableMap.DiskMapType diskMapType =
ExternalSpillableMap.DiskMapType.valueOf(props.getString(SPILLABLE_DISK_MAP_TYPE.key(),
+
SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT));
+ boolean isBitCaskDiskMapCompressionEnabled =
props.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
+ DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
try {
// Store merged records for all versions for this log file, set the
in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes,
spillableMapBasePath, new DefaultSizeEstimator<>(),
@@ -213,7 +221,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
(Schema)
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
readerSchema,
- payloadProps);
+ props);
if (!combinedRecordAndSchemaOpt.isPresent()) {
return Option.empty();
}
@@ -233,12 +241,12 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
return Option.empty();
case EVENT_TIME_ORDERING:
Comparable existingOrderingValue = readerContext.getOrderingValue(
- existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), readerSchema, payloadProps);
+ existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), readerSchema, props);
if
(isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(),
existingOrderingValue)) {
return Option.empty();
}
Comparable incomingOrderingValue = readerContext.getOrderingValue(
- Option.of(record), metadata, readerSchema, payloadProps);
+ Option.of(record), metadata, readerSchema, props);
if (compareTo(readerContext, incomingOrderingValue,
existingOrderingValue) > 0) {
return Option.of(Pair.of(record, metadata));
}
@@ -254,7 +262,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
readerContext.constructHoodieRecord(
existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
(Schema)
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
- payloadProps);
+ props);
if (!combinedRecordAndSchemaOpt.isPresent()) {
return Option.empty();
@@ -297,7 +305,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
default:
Comparable existingOrderingVal = readerContext.getOrderingValue(
existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), readerSchema,
- payloadProps);
+ props);
if
(isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(),
existingOrderingVal)) {
return Option.empty();
}
@@ -383,10 +391,10 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
Option<Pair<HoodieRecord, Schema>> mergedRecord =
recordMerger.partialMerge(
readerContext.constructHoodieRecord(older, olderInfoMap), (Schema)
olderInfoMap.get(INTERNAL_META_SCHEMA),
readerContext.constructHoodieRecord(newer, newerInfoMap), (Schema)
newerInfoMap.get(INTERNAL_META_SCHEMA),
- readerSchema, payloadProps);
+ readerSchema, props);
if (mergedRecord.isPresent()
- &&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(),
payloadProps)) {
+ &&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
if (!mergedRecord.get().getRight().equals(readerSchema)) {
return Option.ofNullable((T)
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
null, readerSchema).getData());
}
@@ -399,12 +407,12 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
return newer;
case EVENT_TIME_ORDERING:
Comparable oldOrderingValue = readerContext.getOrderingValue(
- older, olderInfoMap, readerSchema, payloadProps);
+ older, olderInfoMap, readerSchema, props);
if (isDeleteRecordWithNaturalOrder(older, oldOrderingValue)) {
return newer;
}
Comparable newOrderingValue = readerContext.getOrderingValue(
- newer, newerInfoMap, readerSchema, payloadProps);
+ newer, newerInfoMap, readerSchema, props);
if (isDeleteRecordWithNaturalOrder(newer, newOrderingValue)) {
return Option.empty();
}
@@ -416,10 +424,10 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
default:
Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.merge(
readerContext.constructHoodieRecord(older, olderInfoMap),
(Schema) olderInfoMap.get(INTERNAL_META_SCHEMA),
- readerContext.constructHoodieRecord(newer, newerInfoMap),
(Schema) newerInfoMap.get(INTERNAL_META_SCHEMA), payloadProps);
+ readerContext.constructHoodieRecord(newer, newerInfoMap),
(Schema) newerInfoMap.get(INTERNAL_META_SCHEMA), props);
if (mergedRecord.isPresent()
- &&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(),
payloadProps)) {
+ &&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
if (!mergedRecord.get().getRight().equals(readerSchema)) {
return Option.ofNullable((T)
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
null, readerSchema).getData());
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 97d655a9e47..1c59e8f0ba2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -36,7 +36,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.CachingIterator;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.EmptyIterator;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
@@ -94,11 +93,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
TypedProperties props,
long start,
long length,
- boolean shouldUseRecordPosition,
- long maxMemorySizeInBytes,
- String spillableMapBasePath,
- ExternalSpillableMap.DiskMapType diskMapType,
- boolean isBitCaskDiskMapCompressionEnabled) {
+ boolean shouldUseRecordPosition) {
this.readerContext = readerContext;
this.storage = storage;
this.hoodieBaseFileOption = fileSlice.getBaseFile();
@@ -122,10 +117,8 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
this.recordBuffer = this.logFiles.isEmpty()
? null
: shouldUseRecordPosition
- ? new HoodiePositionBasedFileGroupRecordBuffer<>(readerContext,
hoodieTableMetaClient, Option.empty(),
- Option.empty(), recordMerger, props, maxMemorySizeInBytes,
spillableMapBasePath, diskMapType, isBitCaskDiskMapCompressionEnabled)
- : new HoodieKeyBasedFileGroupRecordBuffer<>(readerContext,
hoodieTableMetaClient, Option.empty(),
- Option.empty(), recordMerger, props, maxMemorySizeInBytes,
spillableMapBasePath, diskMapType, isBitCaskDiskMapCompressionEnabled);
+ ? new HoodiePositionBasedFileGroupRecordBuffer<>(readerContext,
hoodieTableMetaClient, Option.empty(), Option.empty(), recordMerger, props)
+ : new HoodieKeyBasedFileGroupRecordBuffer<>(readerContext,
hoodieTableMetaClient, Option.empty(), Option.empty(), recordMerger, props);
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
index d311923c625..1c53bfedf3d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -30,7 +30,6 @@ import
org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.avro.Schema;
@@ -54,13 +53,8 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupR
Option<String>
partitionNameOverrideOpt,
Option<String[]>
partitionPathFieldOpt,
HoodieRecordMerger recordMerger,
- TypedProperties payloadProps,
- long maxMemorySizeInBytes,
- String spillableMapBasePath,
- ExternalSpillableMap.DiskMapType
diskMapType,
- boolean
isBitCaskDiskMapCompressionEnabled) {
- super(readerContext, hoodieTableMetaClient, partitionNameOverrideOpt,
partitionPathFieldOpt,
- recordMerger, payloadProps, maxMemorySizeInBytes,
spillableMapBasePath, diskMapType, isBitCaskDiskMapCompressionEnabled);
+ TypedProperties props) {
+ super(readerContext, hoodieTableMetaClient, partitionNameOverrideOpt,
partitionPathFieldOpt, recordMerger, props);
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index 79011a81d6b..03ee23b91d5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -30,7 +30,6 @@ import
org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieKeyException;
@@ -72,13 +71,8 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieKeyBasedF
Option<String>
partitionNameOverrideOpt,
Option<String[]>
partitionPathFieldOpt,
HoodieRecordMerger
recordMerger,
- TypedProperties payloadProps,
- long maxMemorySizeInBytes,
- String spillableMapBasePath,
-
ExternalSpillableMap.DiskMapType diskMapType,
- boolean
isBitCaskDiskMapCompressionEnabled) {
- super(readerContext, hoodieTableMetaClient, partitionNameOverrideOpt,
partitionPathFieldOpt,
- recordMerger, payloadProps, maxMemorySizeInBytes,
spillableMapBasePath, diskMapType, isBitCaskDiskMapCompressionEnabled);
+ TypedProperties props) {
+ super(readerContext, hoodieTableMetaClient, partitionNameOverrideOpt,
partitionPathFieldOpt, recordMerger, props);
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
index 8338c96f5a0..f766645999d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
@@ -30,7 +30,6 @@ import
org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
@@ -53,13 +52,8 @@ public class HoodieUnmergedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupR
Option<String> partitionNameOverrideOpt,
Option<String[]> partitionPathFieldOpt,
HoodieRecordMerger recordMerger,
- TypedProperties payloadProps,
- long maxMemorySizeInBytes,
- String spillableMapBasePath,
- ExternalSpillableMap.DiskMapType diskMapType,
- boolean isBitCaskDiskMapCompressionEnabled) {
- super(readerContext, hoodieTableMetaClient, partitionNameOverrideOpt,
partitionPathFieldOpt, recordMerger,
- payloadProps, maxMemorySizeInBytes, spillableMapBasePath, diskMapType,
isBitCaskDiskMapCompressionEnabled);
+ TypedProperties props) {
+ super(readerContext, hoodieTableMetaClient, partitionNameOverrideOpt,
partitionPathFieldOpt, recordMerger, props);
}
@Override
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index b1bb2dfe545..6c7b3400427 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -20,6 +20,7 @@
package org.apache.hudi.common.table.read;
import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.RecordMergeMode;
@@ -276,6 +277,10 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
props.setProperty("hoodie.payload.ordering.field", "timestamp");
props.setProperty(RECORD_MERGER_STRATEGY.key(),
RECORD_MERGER_STRATEGY.defaultValue());
props.setProperty(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+ props.setProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),
String.valueOf(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.defaultValue()));
+ props.setProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(),
metaClient.getTempFolderPath());
+ props.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(),
ExternalSpillableMap.DiskMapType.ROCKS_DB.name());
+
props.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
"false");
if (metaClient.getTableConfig().contains(PARTITION_FIELDS)) {
props.setProperty(PARTITION_FIELDS.key(),
metaClient.getTableConfig().getString(PARTITION_FIELDS));
}
@@ -293,10 +298,6 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
props,
0,
fileSlice.getTotalFileSize(),
- false,
- 1024 * 1024 * 1000,
- metaClient.getTempFolderPath(),
- ExternalSpillableMap.DiskMapType.ROCKS_DB,
false);
fileGroupReader.initRecordIterators();
while (fileGroupReader.hasNext()) {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
index 998bd4ab079..e3faae07fa0 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java
@@ -19,6 +19,8 @@
package org.apache.hudi.common.testutils.reader;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.FileSlice;
@@ -102,6 +104,10 @@ public class HoodieFileGroupReaderTestUtils {
boolean shouldUseRecordPosition,
HoodieTableMetaClient metaClient
) {
+
props.setProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),String.valueOf(1024
* 1024 * 1000));
+ props.setProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(),
basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME);
+ props.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(),
ExternalSpillableMap.DiskMapType.ROCKS_DB.name());
+
props.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
"false");
return new HoodieFileGroupReader<>(
readerContext,
storage,
@@ -115,11 +121,7 @@ public class HoodieFileGroupReaderTestUtils {
props,
start,
length,
- shouldUseRecordPosition,
- 1024 * 1024 * 1000,
- basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME,
- ExternalSpillableMap.DiskMapType.ROCKS_DB,
- false);
+ shouldUseRecordPosition);
}
}
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
index acd499a8c60..e26e4b0efa9 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
@@ -20,6 +20,7 @@
package org.apache.hudi.hadoop;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
@@ -28,10 +29,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
@@ -64,10 +63,6 @@ import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import static
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
-import static
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
-import static
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
-import static
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
import static org.apache.hudi.common.fs.FSUtils.getCommitTime;
import static org.apache.hudi.common.fs.FSUtils.getFileId;
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
@@ -125,18 +120,17 @@ public class HoodieFileGroupReaderBasedRecordReader
implements RecordReader<Null
Map<String, String[]> hosts = new HashMap<>();
this.readerContext = new HiveHoodieReaderContext(readerCreator, split,
jobConfCopy, reporter, tableSchema, hosts, metaClient);
this.arrayWritable = new ArrayWritable(Writable.class, new
Writable[requestedSchema.getFields().size()]);
- // get some config values
- long maxMemoryForMerge = jobConf.getLong(MAX_MEMORY_FOR_MERGE.key(),
MAX_MEMORY_FOR_MERGE.defaultValue());
- String spillableMapPath = jobConf.get(SPILLABLE_MAP_BASE_PATH.key(),
FileIOUtils.getDefaultSpillableMapBasePath());
- ExternalSpillableMap.DiskMapType spillMapType =
ExternalSpillableMap.DiskMapType.valueOf(jobConf.get(SPILLABLE_DISK_MAP_TYPE.key(),
-
SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT));
- boolean bitmaskCompressEnabled =
jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
- DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
+ TypedProperties props = metaClient.getTableConfig().getProps();
+ jobConf.forEach(e -> {
+ if (e.getKey().startsWith("hoodie")) {
+ props.setProperty(e.getKey(), e.getValue());
+ }
+ });
LOG.debug("Creating HoodieFileGroupReaderRecordReader with
tableBasePath={}, latestCommitTime={}, fileSplit={}", tableBasePath,
latestCommitTime, fileSplit.getPath());
this.fileGroupReader = new HoodieFileGroupReader<>(readerContext,
metaClient.getStorage(), tableBasePath,
latestCommitTime, getFileSliceFromSplit(fileSplit, hosts,
getFs(tableBasePath, jobConfCopy), tableBasePath),
- tableSchema, requestedSchema, Option.empty(), metaClient,
metaClient.getTableConfig().getProps(), fileSplit.getStart(),
- fileSplit.getLength(), false, maxMemoryForMerge, spillableMapPath,
spillMapType, bitmaskCompressEnabled);
+ tableSchema, requestedSchema, Option.empty(), metaClient, props,
fileSplit.getStart(),
+ fileSplit.getLength(), false);
this.fileGroupReader.initRecordIterators();
// it expects the partition columns to be at the end
Schema outputSchema = HoodieAvroUtils.generateProjectionSchema(tableSchema,
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 318aa286525..71dbf9ae017 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -23,12 +23,10 @@ import
org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation,
HoodieCDCFileGroupSplit}
import org.apache.hudi.client.utils.SparkInternalSchemaConverter
-import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig,
TypedProperties}
+import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.read.HoodieFileGroupReader
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
-import org.apache.hudi.common.util.FileIOUtils
-import org.apache.hudi.common.util.collection.ExternalSpillableMap.DiskMapType
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.utils.SerDeHelper
import org.apache.hudi.storage.StorageConfiguration
@@ -44,7 +42,6 @@ import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import java.io.Closeable
-import java.util.Locale
trait HoodieFormatTrait {
@@ -153,6 +150,8 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
val storageConf = broadcastedStorageConf.value
val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
.builder().setConf(storageConf).setBasePath(tableState.tablePath).build
+ val props = metaClient.getTableConfig.getProps
+ options.foreach(kv => props.setProperty(kv._1, kv._2))
val reader = new HoodieFileGroupReader[InternalRow](
readerContext,
new HoodieHadoopStorage(metaClient.getBasePath, storageConf),
@@ -163,14 +162,10 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
broadcastedRequestedSchema.value,
internalSchemaOpt,
metaClient,
- metaClient.getTableConfig.getProps,
+ props,
file.start,
file.length,
- shouldUseRecordPosition,
-
options.getOrElse(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),
HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.defaultValue() + "").toLong,
-
options.getOrElse(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(),
FileIOUtils.getDefaultSpillableMapBasePath),
-
DiskMapType.valueOf(options.getOrElse(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(),
HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT)),
-
options.getOrElse(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue().toString).toBoolean)
+ shouldUseRecordPosition)
reader.initRecordIterators()
// Append partition values to rows and project to output schema
appendPartitionAndProject(
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
index 62a84bb6f8b..edcc1f6dad2 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
@@ -20,6 +20,7 @@
package org.apache.hudi;
import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
@@ -125,17 +126,17 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
Option.empty(), metaClient.getTableConfig()));
TypedProperties props = new TypedProperties();
props.put(HoodieCommonConfig.RECORD_MERGE_MODE.key(), mergeMode.name());
+
props.setProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),String.valueOf(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.defaultValue()));
+ props.setProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(),
metaClient.getTempFolderPath());
+ props.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(),
ExternalSpillableMap.DiskMapType.ROCKS_DB.name());
+
props.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
"false");
buffer = new HoodiePositionBasedFileGroupRecordBuffer<>(
ctx,
metaClient,
partitionNameOpt,
partitionFields,
ctx.getRecordMerger(),
- props,
- 1024 * 1024 * 1000,
- metaClient.getTempFolderPath(),
- ExternalSpillableMap.DiskMapType.ROCKS_DB,
- false);
+ props);
}
public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader() {