This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 39d66455fee [HUDI-9316] Add support for creating iterator of
HoodieRecord from FGReader (#13314)
39d66455fee is described below
commit 39d66455feea921016b77e4208c5c6ff33a37df0
Author: Tim Brown <[email protected]>
AuthorDate: Tue May 20 11:19:47 2025 -0500
[HUDI-9316] Add support for creating iterator of HoodieRecord from FGReader
(#13314)
---
.../v2/FlinkFileGroupReaderBasedMergeHandle.java | 4 +-
...HoodieSparkFileGroupReaderBasedMergeHandle.java | 5 +-
.../hudi/BaseSparkInternalRowReaderContext.java | 5 +-
.../apache/hudi/avro/HoodieAvroReaderContext.java | 6 +-
.../hudi/common/engine/HoodieReaderContext.java | 5 ++
.../common/table/read/FileGroupRecordBuffer.java | 22 +-----
.../common/table/read/HoodieFileGroupReader.java | 52 ++++++++++----
.../table/read/KeyBasedFileGroupRecordBuffer.java | 7 +-
.../read/PositionBasedFileGroupRecordBuffer.java | 7 +-
.../table/read/UnmergedFileGroupRecordBuffer.java | 4 +-
.../table/read/TestFileGroupRecordBuffer.java | 17 ++---
.../table/read/TestHoodieFileGroupReaderBase.java | 83 ++++++++++++++++++----
.../table/format/FlinkRowDataReaderContext.java | 2 +-
.../hudi/hadoop/HiveHoodieReaderContext.java | 5 +-
...odieFileGroupReaderBasedParquetFileFormat.scala | 7 +-
.../TestPositionBasedFileGroupRecordBuffer.java | 13 ++--
16 files changed, 150 insertions(+), 94 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/FlinkFileGroupReaderBasedMergeHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/FlinkFileGroupReaderBasedMergeHandle.java
index 6ac72a5d298..520b5a6de76 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/FlinkFileGroupReaderBasedMergeHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/FlinkFileGroupReaderBasedMergeHandle.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.internal.schema.InternalSchema;
@@ -86,8 +87,7 @@ public class FlinkFileGroupReaderBasedMergeHandle<T, I, K, O>
extends BaseFileGr
.withLatestCommitTime(instantTime).withFileSlice(fileSlice).withDataSchema(writeSchemaWithMetaFields).withRequestedSchema(writeSchemaWithMetaFields)
.withInternalSchema(internalSchemaOption).withProps(props).withShouldUseRecordPosition(false).build())
{
// Reads the records from the file slice
- try (HoodieFileGroupReader.HoodieFileGroupReaderIterator<RowData>
recordIterator =
- (HoodieFileGroupReader.HoodieFileGroupReaderIterator<RowData>)
fileGroupReader.getClosableIterator()) {
+ try (ClosableIterator<RowData> recordIterator =
(ClosableIterator<RowData>) fileGroupReader.getClosableIterator()) {
while (recordIterator.hasNext()) {
// Constructs Flink record for the Flink Parquet file writer
RowData row = recordIterator.next();
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
index 5446e6d6660..d604fe68b3f 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java
@@ -29,9 +29,9 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
-import
org.apache.hudi.common.table.read.HoodieFileGroupReader.HoodieFileGroupReaderIterator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.internal.schema.InternalSchema;
@@ -95,8 +95,7 @@ public class HoodieSparkFileGroupReaderBasedMergeHandle<T, I,
K, O> extends Base
.withLatestCommitTime(instantTime).withFileSlice(fileSlice).withDataSchema(writeSchemaWithMetaFields).withRequestedSchema(writeSchemaWithMetaFields)
.withInternalSchema(internalSchemaOption).withProps(props).withShouldUseRecordPosition(usePosition).build())
{
// Reads the records from the file slice
- try (HoodieFileGroupReaderIterator<InternalRow> recordIterator
- = (HoodieFileGroupReaderIterator<InternalRow>)
fileGroupReader.getClosableIterator()) {
+ try (ClosableIterator<InternalRow> recordIterator =
(ClosableIterator<InternalRow>) fileGroupReader.getClosableIterator()) {
StructType sparkSchema =
AvroConversionUtils.convertAvroSchemaToStructType(writeSchemaWithMetaFields);
while (recordIterator.hasNext()) {
// Constructs Spark record for the Spark Parquet file writer
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 8c26ddf559c..0084fb93b3c 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -92,15 +92,16 @@ public abstract class BaseSparkInternalRowReaderContext
extends HoodieReaderCont
@Override
public HoodieRecord<InternalRow>
constructHoodieRecord(BufferedRecord<InternalRow> bufferedRecord) {
+ HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(),
partitionPath);
if (bufferedRecord.isDelete()) {
return new HoodieEmptyRecord<>(
- new HoodieKey(bufferedRecord.getRecordKey(), null),
+ hoodieKey,
HoodieRecord.HoodieRecordType.SPARK);
}
Schema schema = getSchemaFromBufferRecord(bufferedRecord);
InternalRow row = bufferedRecord.getRecord();
- return new HoodieSparkRecord(row,
HoodieInternalRowUtils.getCachedSchema(schema));
+ return new HoodieSparkRecord(hoodieKey, row,
HoodieInternalRowUtils.getCachedSchema(schema), false);
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index 811a7020deb..ef100f1c718 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.OverwriteWithLatestMerger;
@@ -125,11 +126,12 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
if (bufferedRecord.isDelete()) {
return SpillableMapUtils.generateEmptyPayload(
bufferedRecord.getRecordKey(),
- null,
+ partitionPath,
bufferedRecord.getOrderingValue(),
payloadClass);
}
- return new HoodieAvroIndexedRecord(bufferedRecord.getRecord());
+ HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(),
partitionPath);
+ return new HoodieAvroIndexedRecord(hoodieKey, bufferedRecord.getRecord());
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index c07b9b87e0c..638c58aa4d1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -73,6 +73,7 @@ public abstract class HoodieReaderContext<T> {
private Boolean hasBootstrapBaseFile = null;
private Boolean needsBootstrapMerge = null;
private Boolean shouldMergeUseRecordPosition = null;
+ protected String partitionPath;
// for encoding and decoding schemas to the spillable map
private final LocalAvroSchemaCache localAvroSchemaCache =
LocalAvroSchemaCache.getInstance();
@@ -129,6 +130,10 @@ public abstract class HoodieReaderContext<T> {
this.hasLogFiles = hasLogFiles;
}
+ public void setPartitionPath(String partitionPath) {
+ this.partitionPath = partitionPath;
+ }
+
// Getter and Setter for hasBootstrapBaseFile
public boolean getHasBootstrapBaseFile() {
return hasBootstrapBaseFile;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
index d19ed4a56e3..0f89a6c426a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
@@ -35,7 +35,6 @@ import org.apache.hudi.common.serialization.DefaultSerializer;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.KeySpec;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
-import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
@@ -43,7 +42,6 @@ import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
-import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
@@ -80,8 +78,6 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
protected final HoodieReaderContext<T> readerContext;
protected final Schema readerSchema;
protected final Option<String> orderingFieldName;
- protected final Option<String> partitionNameOverrideOpt;
- protected final Option<String[]> partitionPathFieldOpt;
protected final RecordMergeMode recordMergeMode;
protected final Option<HoodieRecordMerger> recordMerger;
protected final Option<String> payloadClass;
@@ -101,14 +97,11 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
protected FileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
- Option<String> partitionNameOverrideOpt,
- Option<String[]> partitionPathFieldOpt,
TypedProperties props,
- HoodieReadStats readStats) {
+ HoodieReadStats readStats,
+ Option<String> orderingFieldName) {
this.readerContext = readerContext;
this.readerSchema =
AvroSchemaCache.intern(readerContext.getSchemaHandler().getRequiredSchema());
- this.partitionNameOverrideOpt = partitionNameOverrideOpt;
- this.partitionPathFieldOpt = partitionPathFieldOpt;
this.recordMergeMode = recordMergeMode;
this.recordMerger = readerContext.getRecordMerger();
if (recordMerger.isPresent() &&
recordMerger.get().getMergingStrategy().equals(PAYLOAD_BASED_MERGE_STRATEGY_UUID))
{
@@ -116,16 +109,7 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
} else {
this.payloadClass = Option.empty();
}
- this.orderingFieldName = recordMergeMode ==
RecordMergeMode.COMMIT_TIME_ORDERING
- ? Option.empty()
- : Option.ofNullable(ConfigUtils.getOrderingField(props))
- .or(() -> {
- String preCombineField =
hoodieTableMetaClient.getTableConfig().getPreCombineField();
- if (StringUtils.isNullOrEmpty(preCombineField)) {
- return Option.empty();
- }
- return Option.of(preCombineField);
- });
+ this.orderingFieldName = orderingFieldName;
this.props = props;
this.internalSchema = readerContext.getSchemaHandler().getInternalSchema();
this.hoodieTableMetaClient = hoodieTableMetaClient;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index f466db8d7cf..ed9e2582716 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
@@ -35,9 +36,11 @@ import org.apache.hudi.common.table.PartitionPathParser;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordReader;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.CachingIterator;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.EmptyIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Triple;
@@ -77,6 +80,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
private final List<HoodieLogFile> logFiles;
private final String partitionPath;
private final Option<String[]> partitionPathFields;
+ private final Option<String> orderingFieldName;
private final HoodieStorage storage;
private final TypedProperties props;
// Byte offset to start reading from the base file
@@ -143,6 +147,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
boolean isSkipMerge = ConfigUtils.getStringWithAltKeys(props,
HoodieReaderConfig.MERGE_TYPE,
true).equalsIgnoreCase(HoodieReaderConfig.REALTIME_SKIP_MERGE);
readerContext.setShouldMergeUseRecordPosition(shouldUseRecordPosition &&
!isSkipMerge);
readerContext.setHasLogFiles(!this.logFiles.isEmpty());
+ readerContext.setPartitionPath(partitionPath);
if (readerContext.getHasLogFiles() && start != 0) {
throw new IllegalArgumentException("Filegroup reader is doing log file
merge but not reading from the start of the base file");
}
@@ -151,6 +156,16 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
? new PositionBasedSchemaHandler<>(readerContext, dataSchema,
requestedSchema, internalSchemaOpt, tableConfig, props)
: new FileGroupReaderSchemaHandler<>(readerContext, dataSchema,
requestedSchema, internalSchemaOpt, tableConfig, props));
this.outputConverter =
readerContext.getSchemaHandler().getOutputConverter();
+ this.orderingFieldName = recordMergeMode ==
RecordMergeMode.COMMIT_TIME_ORDERING
+ ? Option.empty()
+ : Option.ofNullable(ConfigUtils.getOrderingField(props))
+ .or(() -> {
+ String preCombineField =
hoodieTableMetaClient.getTableConfig().getPreCombineField();
+ if (StringUtils.isNullOrEmpty(preCombineField)) {
+ return Option.empty();
+ }
+ return Option.of(preCombineField);
+ });
this.readStats = new HoodieReadStats();
this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
recordMergeMode, props, hoodieBaseFileOption, this.logFiles.isEmpty(),
@@ -161,27 +176,26 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
/**
* Initialize correct record buffer
*/
- private static FileGroupRecordBuffer getRecordBuffer(HoodieReaderContext
readerContext,
- HoodieTableMetaClient
hoodieTableMetaClient,
- RecordMergeMode
recordMergeMode,
- TypedProperties props,
- Option<HoodieBaseFile>
baseFileOption,
- boolean hasNoLogFiles,
- boolean isSkipMerge,
- boolean
shouldUseRecordPosition,
- HoodieReadStats
readStats) {
+ private FileGroupRecordBuffer<T> getRecordBuffer(HoodieReaderContext<T>
readerContext,
+ HoodieTableMetaClient
hoodieTableMetaClient,
+ RecordMergeMode
recordMergeMode,
+ TypedProperties props,
+ Option<HoodieBaseFile>
baseFileOption,
+ boolean hasNoLogFiles,
+ boolean isSkipMerge,
+ boolean
shouldUseRecordPosition,
+ HoodieReadStats readStats) {
if (hasNoLogFiles) {
return null;
} else if (isSkipMerge) {
return new UnmergedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, recordMergeMode,
Option.empty(), Option.empty(), props, readStats);
+ readerContext, hoodieTableMetaClient, recordMergeMode, props,
readStats);
} else if (shouldUseRecordPosition && baseFileOption.isPresent()) {
return new PositionBasedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, recordMergeMode,
Option.empty(),
- Option.empty(), baseFileOption.get().getCommitTime(), props,
readStats);
+ readerContext, hoodieTableMetaClient, recordMergeMode,
baseFileOption.get().getCommitTime(), props, readStats, orderingFieldName);
} else {
return new KeyBasedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, recordMergeMode,
Option.empty(), Option.empty(), props, readStats);
+ readerContext, hoodieTableMetaClient, recordMergeMode, props,
readStats, orderingFieldName);
}
}
@@ -352,11 +366,21 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
}
}
- public HoodieFileGroupReaderIterator<T> getClosableIterator() throws
IOException {
+ public ClosableIterator<T> getClosableIterator() throws IOException {
initRecordIterators();
return new HoodieFileGroupReaderIterator<>(this);
}
+ /**
+ * @return An iterator over the records that wraps the engine-specific
record in a HoodieRecord.
+ */
+ public ClosableIterator<HoodieRecord<T>> getClosableHoodieRecordIterator()
throws IOException {
+ return new CloseableMappingIterator<>(getClosableIterator(), nextRecord ->
{
+ BufferedRecord<T> bufferedRecord =
BufferedRecord.forRecordWithContext(nextRecord,
readerContext.getSchemaHandler().getRequestedSchema(), readerContext,
orderingFieldName, false);
+ return readerContext.constructHoodieRecord(bufferedRecord);
+ });
+ }
+
public static class HoodieFileGroupReaderIterator<T> implements
ClosableIterator<T> {
private HoodieFileGroupReader<T> reader;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
index 7a0e1aacd22..2ccc740fb77 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
@@ -51,11 +51,10 @@ public class KeyBasedFileGroupRecordBuffer<T> extends
FileGroupRecordBuffer<T> {
public KeyBasedFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient
hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
- Option<String> partitionNameOverrideOpt,
- Option<String[]> partitionPathFieldOpt,
TypedProperties props,
- HoodieReadStats readStats) {
- super(readerContext, hoodieTableMetaClient, recordMergeMode,
partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
+ HoodieReadStats readStats,
+ Option<String> orderingFieldName) {
+ super(readerContext, hoodieTableMetaClient, recordMergeMode, props,
readStats, orderingFieldName);
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
index 92bf9a98137..5df2ea91e07 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
@@ -68,12 +68,11 @@ public class PositionBasedFileGroupRecordBuffer<T> extends
KeyBasedFileGroupReco
public PositionBasedFileGroupRecordBuffer(HoodieReaderContext<T>
readerContext,
HoodieTableMetaClient
hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
- Option<String>
partitionNameOverrideOpt,
- Option<String[]>
partitionPathFieldOpt,
String baseFileInstantTime,
TypedProperties props,
- HoodieReadStats readStats) {
- super(readerContext, hoodieTableMetaClient, recordMergeMode,
partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
+ HoodieReadStats readStats,
+ Option<String> orderingFieldName) {
+ super(readerContext, hoodieTableMetaClient, recordMergeMode, props,
readStats, orderingFieldName);
this.baseFileInstantTime = baseFileInstantTime;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
index ef686fb7bfc..c853eb088b7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
@@ -49,11 +49,9 @@ public class UnmergedFileGroupRecordBuffer<T> extends
FileGroupRecordBuffer<T> {
HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
- Option<String> partitionNameOverrideOpt,
- Option<String[]> partitionPathFieldOpt,
TypedProperties props,
HoodieReadStats readStats) {
- super(readerContext, hoodieTableMetaClient, recordMergeMode,
partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
+ super(readerContext, hoodieTableMetaClient, recordMergeMode, props,
readStats, Option.empty());
this.currentInstantLogBlocks = new ArrayDeque<>();
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
index daac251d187..06552a9391c 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
@@ -84,8 +84,6 @@ class TestFileGroupRecordBuffer {
private final FileGroupReaderSchemaHandler schemaHandler =
mock(FileGroupReaderSchemaHandler.class);
private HoodieTableMetaClient hoodieTableMetaClient =
mock(HoodieTableMetaClient.class);
- private Option<String> partitionNameOverrideOpt = Option.empty();
- private Option<String[]> partitionPathFieldOpt = Option.empty();
private TypedProperties props = new TypedProperties();
private HoodieReadStats readStats = mock(HoodieReadStats.class);
@@ -290,10 +288,9 @@ class TestFileGroupRecordBuffer {
readerContext,
hoodieTableMetaClient,
RecordMergeMode.COMMIT_TIME_ORDERING,
- partitionNameOverrideOpt,
- partitionPathFieldOpt,
props,
- readStats);
+ readStats,
+ Option.empty());
when(readerContext.getValue(any(), any(), any())).thenReturn(null);
assertFalse(keyBasedBuffer.isCustomDeleteRecord(record));
@@ -303,10 +300,9 @@ class TestFileGroupRecordBuffer {
readerContext,
hoodieTableMetaClient,
RecordMergeMode.COMMIT_TIME_ORDERING,
- partitionNameOverrideOpt,
- partitionPathFieldOpt,
props,
- readStats);
+ readStats,
+ Option.empty());
when(readerContext.getValue(any(), any(), any())).thenReturn("i");
assertFalse(keyBasedBuffer.isCustomDeleteRecord(record));
when(readerContext.getValue(any(), any(), any())).thenReturn("d");
@@ -325,10 +321,9 @@ class TestFileGroupRecordBuffer {
readerContext,
hoodieTableMetaClient,
RecordMergeMode.COMMIT_TIME_ORDERING,
- partitionNameOverrideOpt,
- partitionPathFieldOpt,
props,
- readStats);
+ readStats,
+ Option.empty());
// CASE 1: With custom delete marker.
GenericRecord record = new GenericData.Record(schema);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 0890e574920..7c8d7527cb0 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -68,6 +68,7 @@ import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -318,6 +319,13 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
// validate size is equivalent to ensure no duplicates are returned
assertEquals(expectedRecords.size(), actualRecordList.size());
assertEquals(new HashSet<>(expectedRecords), new
HashSet<>(actualRecordList));
+ // validate records can be read from file group as HoodieRecords
+ actualRecordList = convertHoodieRecords(
+ readHoodieRecordsFromFileGroup(storageConf, tablePath, metaClient,
fileSlices, avroSchema, recordMergeMode),
+ avroSchema, readerContext);
+ assertEquals(expectedRecords.size(), actualRecordList.size());
+ assertEquals(new HashSet<>(expectedRecords), new
HashSet<>(actualRecordList));
+ // validate unmerged records
actualRecordList = convertEngineRecords(
readRecordsFromFileGroup(storageConf, tablePath, metaClient,
fileSlices, avroSchema, recordMergeMode, true),
avroSchema, readerContext);
@@ -372,21 +380,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
boolean isSkipMerge) {
List<T> actualRecordList = new ArrayList<>();
- TypedProperties props = new TypedProperties();
- props.setProperty("hoodie.datasource.write.precombine.field",
PRECOMBINE_FIELD_NAME);
- props.setProperty("hoodie.payload.ordering.field", PRECOMBINE_FIELD_NAME);
- props.setProperty(RECORD_MERGE_MODE.key(), recordMergeMode.name());
- if (recordMergeMode.equals(RecordMergeMode.CUSTOM)) {
- props.setProperty(RECORD_MERGE_STRATEGY_ID.key(),
PAYLOAD_BASED_MERGE_STRATEGY_UUID);
- props.setProperty(PAYLOAD_CLASS_NAME.key(), getCustomPayload());
- }
- props.setProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),
String.valueOf(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.defaultValue()));
- props.setProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(),
metaClient.getTempFolderPath());
- props.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(),
ExternalSpillableMap.DiskMapType.ROCKS_DB.name());
-
props.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
"false");
- if (metaClient.getTableConfig().contains(PARTITION_FIELDS)) {
- props.setProperty(PARTITION_FIELDS.key(),
metaClient.getTableConfig().getString(PARTITION_FIELDS));
- }
+ TypedProperties props = buildProperties(metaClient, recordMergeMode);
if (isSkipMerge) {
props.setProperty(HoodieReaderConfig.MERGE_TYPE.key(),
HoodieReaderConfig.REALTIME_SKIP_MERGE);
}
@@ -431,6 +425,47 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
}
}
+ private List<HoodieRecord<T>>
readHoodieRecordsFromFileGroup(StorageConfiguration<?> storageConf,
+ String
tablePath,
+
HoodieTableMetaClient metaClient,
+ List<FileSlice>
fileSlices,
+ Schema
avroSchema,
+ RecordMergeMode
recordMergeMode) {
+
+ List<HoodieRecord<T>> actualRecordList = new ArrayList<>();
+ TypedProperties props = buildProperties(metaClient, recordMergeMode);
+ fileSlices.forEach(fileSlice -> {
+ try (HoodieFileGroupReader<T> fileGroupReader =
getHoodieFileGroupReader(storageConf, tablePath, metaClient, avroSchema,
fileSlice, 0, props);
+ ClosableIterator<HoodieRecord<T>> iter =
fileGroupReader.getClosableHoodieRecordIterator()) {
+ while (iter.hasNext()) {
+ actualRecordList.add(iter.next());
+ }
+ } catch (IOException ex) {
+ throw new UncheckedIOException(ex);
+ }
+ });
+ return actualRecordList;
+ }
+
+ private TypedProperties buildProperties(HoodieTableMetaClient metaClient,
RecordMergeMode recordMergeMode) {
+ TypedProperties props = new TypedProperties();
+ props.setProperty("hoodie.datasource.write.precombine.field",
PRECOMBINE_FIELD_NAME);
+ props.setProperty("hoodie.payload.ordering.field", PRECOMBINE_FIELD_NAME);
+ props.setProperty(RECORD_MERGE_MODE.key(), recordMergeMode.name());
+ if (recordMergeMode.equals(RecordMergeMode.CUSTOM)) {
+ props.setProperty(RECORD_MERGE_STRATEGY_ID.key(),
PAYLOAD_BASED_MERGE_STRATEGY_UUID);
+ props.setProperty(PAYLOAD_CLASS_NAME.key(), getCustomPayload());
+ }
+ props.setProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),
String.valueOf(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.defaultValue()));
+ props.setProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(),
metaClient.getTempFolderPath());
+ props.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(),
ExternalSpillableMap.DiskMapType.ROCKS_DB.name());
+
props.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
"false");
+ if (metaClient.getTableConfig().contains(PARTITION_FIELDS)) {
+ props.setProperty(PARTITION_FIELDS.key(),
metaClient.getTableConfig().getString(PARTITION_FIELDS));
+ }
+ return props;
+ }
+
private boolean shouldValidatePartialRead(FileSlice fileSlice, Schema
requestedSchema) {
if (fileSlice.getLogFiles().findAny().isPresent()) {
return true;
@@ -466,6 +501,24 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
.collect(Collectors.toList());
}
+ private List<HoodieTestDataGenerator.RecordIdentifier>
convertHoodieRecords(List<HoodieRecord<T>> records, Schema schema,
HoodieReaderContext<T> readerContext) {
+ return records.stream()
+ .map(record -> new HoodieTestDataGenerator.RecordIdentifier(
+ record.getRecordKey(),
+ removeHiveStylePartition(record.getPartitionPath()),
+ record.getOrderingValue(schema, new TypedProperties()).toString(),
+ readerContext.getValue(record.getData(), schema,
RIDER_FIELD_NAME).toString()))
+ .collect(Collectors.toList());
+ }
+
+ private static String removeHiveStylePartition(String partitionPath) {
+ int indexOf = partitionPath.indexOf("=");
+ if (indexOf > 0) {
+ return partitionPath.substring(indexOf + 1);
+ }
+ return partitionPath;
+ }
+
private void extract(Path target) throws IOException {
try (ZipInputStream zip = new
ZipInputStream(this.getClass().getClassLoader().getResourceAsStream("file-group-reader/bootstrap_data.zip")))
{
ZipEntry entry;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 79077c888b7..808860478b3 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -146,7 +146,7 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
@Override
public HoodieRecord<RowData> constructHoodieRecord(BufferedRecord<RowData>
bufferedRecord) {
- HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), null);
+ HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(),
partitionPath);
// delete record
if (bufferedRecord.isDelete()) {
return new HoodieEmptyRecord<>(hoodieKey, HoodieOperation.DELETE,
bufferedRecord.getOrderingValue(), HoodieRecord.HoodieRecordType.FLINK);
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index e5440c4e5c5..fa051360b3f 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -214,14 +214,15 @@ public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable>
@Override
public HoodieRecord<ArrayWritable>
constructHoodieRecord(BufferedRecord<ArrayWritable> bufferedRecord) {
+ HoodieKey key = new HoodieKey(bufferedRecord.getRecordKey(),
partitionPath);
if (bufferedRecord.isDelete()) {
return new HoodieEmptyRecord<>(
- new HoodieKey(bufferedRecord.getRecordKey(), null),
+ key,
HoodieRecord.HoodieRecordType.HIVE);
}
Schema schema = getSchemaFromBufferRecord(bufferedRecord);
ArrayWritable writable = bufferedRecord.getRecord();
- return new HoodieHiveRecord(new HoodieKey(bufferedRecord.getRecordKey(),
null), writable, schema, objectInspectorCache);
+ return new HoodieHiveRecord(key, writable, schema, objectInspectorCache);
}
@Override
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index fc9d0bcb208..afab0f391bc 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -26,6 +26,7 @@ import org.apache.hudi.common.config.{HoodieMemoryConfig,
TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.read.HoodieFileGroupReader
+import org.apache.hudi.common.util.collection.ClosableIterator
import org.apache.hudi.data.CloseableIteratorListener
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.io.IOUtils
@@ -250,7 +251,7 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
props)
}
- private def appendPartitionAndProject(iter:
HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow],
+ private def appendPartitionAndProject(iter: ClosableIterator[InternalRow],
inputSchema: StructType,
partitionSchema: StructType,
to: StructType,
@@ -273,14 +274,14 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tablePath: String,
}
}
- private def projectSchema(iter:
HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow],
+ private def projectSchema(iter: ClosableIterator[InternalRow],
from: StructType,
to: StructType): Iterator[InternalRow] = {
val unsafeProjection = generateUnsafeProjection(from, to)
makeCloseableFileGroupMappingRecordIterator(iter, d => unsafeProjection(d))
}
- private def
makeCloseableFileGroupMappingRecordIterator(closeableFileGroupRecordIterator:
HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow],
+ private def
makeCloseableFileGroupMappingRecordIterator(closeableFileGroupRecordIterator:
ClosableIterator[InternalRow],
mappingFunction:
Function[InternalRow, InternalRow]): Iterator[InternalRow] = {
CloseableIteratorListener.addListener(closeableFileGroupRecordIterator)
new Iterator[InternalRow] with Closeable {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
index e0a59b563c8..47e7f42d12d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
@@ -40,7 +40,6 @@ import
org.apache.hudi.common.table.read.TestHoodieFileGroupReaderOnSpark;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -82,7 +81,7 @@ public class TestPositionBasedFileGroupRecordBuffer extends
TestHoodieFileGroupR
writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
"parquet");
writeConfigs.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
"_row_key");
writeConfigs.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
"partition_path");
-
writeConfigs.put("hoodie.datasource.write.precombine.field",mergeMode.equals(RecordMergeMode.COMMIT_TIME_ORDERING)
? "" : "timestamp");
+ writeConfigs.put("hoodie.datasource.write.precombine.field",
mergeMode.equals(RecordMergeMode.COMMIT_TIME_ORDERING) ? "" : "timestamp");
writeConfigs.put("hoodie.payload.ordering.field", "timestamp");
writeConfigs.put(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "hoodie_test");
writeConfigs.put("hoodie.insert.shuffle.parallelism", "4");
@@ -106,11 +105,8 @@ public class TestPositionBasedFileGroupRecordBuffer
extends TestHoodieFileGroupR
metaClient = createMetaClient(getStorageConf(), getBasePath());
avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
- Option<String[]> partitionFields =
metaClient.getTableConfig().getPartitionFields();
- Option<String> partitionNameOpt =
StringUtils.isNullOrEmpty(partitionPaths[0])
- ? Option.empty() : Option.of(partitionPaths[0]);
- HoodieReaderContext ctx = getHoodieReaderContext(getBasePath(),
avroSchema, getStorageConf(), metaClient);
+ HoodieReaderContext<InternalRow> ctx =
getHoodieReaderContext(getBasePath(), avroSchema, getStorageConf(), metaClient);
ctx.setTablePath(getBasePath());
ctx.setLatestCommitTime(metaClient.createNewInstantTime());
ctx.setShouldMergeUseRecordPosition(true);
@@ -139,11 +135,10 @@ public class TestPositionBasedFileGroupRecordBuffer
extends TestHoodieFileGroupR
ctx,
metaClient,
mergeMode,
- partitionNameOpt,
- partitionFields,
baseFileInstantTime,
props,
- readStats);
+ readStats,
+ Option.of("timestamp"));
}
public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader(boolean
shouldWriteRecordPositions,