This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cce019ad0b9 [HUDI-6786] HoodieFileGroupReader integration (#9819)
cce019ad0b9 is described below
commit cce019ad0b92cc6a333839b4a50d123aca4c9c4a
Author: Lin Liu <[email protected]>
AuthorDate: Wed Oct 18 14:01:12 2023 -0700
[HUDI-6786] HoodieFileGroupReader integration (#9819)
This commit integrates HoodieFileGroupReader with the
NewHoodieParquetFileFormat.
---
.../SparkFileFormatInternalRowReaderContext.scala | 22 +-
.../hudi/common/config/HoodieReaderConfig.java | 9 +-
.../table/log/BaseHoodieLogRecordReader.java | 106 +-------
.../FullKeySpec.java} | 33 ++-
.../table/log/HoodieMergedLogRecordReader.java | 102 ++------
.../FileGroupReaderState.java => log/KeySpec.java} | 29 ++-
.../PrefixKeySpec.java} | 33 ++-
.../common/table/log/block/HoodieDataBlock.java | 4 +
.../read/HoodieBaseFileGroupRecordBuffer.java | 279 ++++++++++++++++++++
.../common/table/read/HoodieFileGroupReader.java | 126 +++++----
...rState.java => HoodieFileGroupReaderState.java} | 2 +-
.../table/read/HoodieFileGroupRecordBuffer.java | 119 +++++++++
.../read/HoodieKeyBasedFileGroupRecordBuffer.java | 159 ++++++++++++
.../HoodiePositionBasedFileGroupRecordBuffer.java | 191 ++++++++++++++
.../table/read/TestHoodieFileGroupReaderBase.java | 31 ++-
.../hudi/NewHoodieParquetFileFormatUtils.scala | 23 +-
...odieFileGroupReaderBasedParquetFileFormat.scala | 284 +++++++++++++++++++++
.../hudi/TestHoodieMergeHandleWithSparkMerger.java | 70 ++++-
.../read/TestHoodieFileGroupReaderOnSpark.scala | 35 ++-
19 files changed, 1333 insertions(+), 324 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 5bf2c3a0fbc..25ea9d3332b 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -26,6 +26,7 @@ import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.util.collection.ClosableIterator
import org.apache.hudi.util.CloseableInternalRowIterator
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{HoodieInternalRowUtils, SparkSession}
@@ -36,13 +37,11 @@ import org.apache.spark.sql.{HoodieInternalRowUtils,
SparkSession}
*
* This uses Spark parquet reader to read parquet data files or parquet log
blocks.
*
- * @param sparkSession {@link SparkSession} instance.
- * @param parquetFileFormat {@link ParquetFileFormat} instance for parquet
file format in Spark.
- * @param hadoopConf Hadoop configuration.
+ * @param baseFileReader A reader that transforms a {@link PartitionedFile}
to an iterator of {@link InternalRow}
+ * @param partitionValues The values for a partition in which the file group
lives.
*/
-class SparkFileFormatInternalRowReaderContext(sparkSession: SparkSession,
- parquetFileFormat:
ParquetFileFormat,
- hadoopConf: Configuration)
extends BaseSparkInternalRowReaderContext {
+class SparkFileFormatInternalRowReaderContext(baseFileReader: PartitionedFile
=> Iterator[InternalRow],
+ partitionValues: InternalRow)
extends BaseSparkInternalRowReaderContext {
lazy val sparkAdapter = SparkAdapterSupport.sparkAdapter
override def getFileRecordIterator(filePath: Path,
@@ -51,14 +50,7 @@ class SparkFileFormatInternalRowReaderContext(sparkSession:
SparkSession,
dataSchema: Schema,
requiredSchema: Schema,
conf: Configuration):
ClosableIterator[InternalRow] = {
- val fileInfo =
sparkAdapter.getSparkPartitionedFileUtils.createPartitionedFile(
- InternalRow.empty, filePath, 0, length)
-
- val dataStructSchema = HoodieInternalRowUtils.getCachedSchema(dataSchema)
- val requiredStructSchema =
HoodieInternalRowUtils.getCachedSchema(requiredSchema)
- new
CloseableInternalRowIterator(parquetFileFormat.buildReaderWithPartitionValues(
- sparkSession, dataStructSchema, StructType(Seq.empty),
requiredStructSchema, Seq.empty,
- Map(), conf
- ).apply(fileInfo))
+ val fileInfo =
sparkAdapter.getSparkPartitionedFileUtils.createPartitionedFile(partitionValues,
filePath, start, length)
+ new CloseableInternalRowIterator(baseFileReader.apply(fileInfo))
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
index d25d1898ff2..1738f75e9ec 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
@@ -28,7 +28,7 @@ import javax.annotation.concurrent.Immutable;
@ConfigClassProperty(name = "Reader Configs",
groupName = ConfigGroups.Names.READER,
description = "Configurations that control file group reading.")
-public class HoodieReaderConfig {
+public class HoodieReaderConfig extends HoodieConfig {
public static final ConfigProperty<String> COMPACTION_LAZY_BLOCK_READ_ENABLE
= ConfigProperty
.key("hoodie.compaction.lazy.block.read")
.defaultValue("true")
@@ -51,4 +51,11 @@ public class HoodieReaderConfig {
.sinceVersion("0.13.0")
.withDocumentation("New optimized scan for log blocks that handles all
multi-writer use-cases while appending to log files. "
+ "It also differentiates original blocks written by ingestion
writers and compacted blocks written log compaction.");
+
+ public static final ConfigProperty<Boolean> FILE_GROUP_READER_ENABLED =
ConfigProperty
+ .key("hoodie.file.group.reader.enabled")
+ .defaultValue(false)
+ .markAdvanced()
+ .sinceVersion("1.0.0")
+ .withDocumentation("Use engine agnostic file group reader if enabled");
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index 9885e19d05e..300b767ab70 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -21,7 +21,6 @@ package org.apache.hudi.common.table.log;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePayloadProps;
import org.apache.hudi.common.model.HoodieRecord;
@@ -32,6 +31,7 @@ import
org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.read.HoodieFileGroupRecordBuffer;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -138,6 +138,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
private final List<String> validBlockInstants = new ArrayList<>();
// Use scanV2 method.
private final boolean enableOptimizedLogBlocksScan;
+ protected HoodieFileGroupRecordBuffer<T> recordBuffer;
protected BaseHoodieLogRecordReader(HoodieReaderContext readerContext,
FileSystem fs, String basePath,
List<String> logFilePaths,
@@ -148,7 +149,8 @@ public abstract class BaseHoodieLogRecordReader<T> {
InternalSchema internalSchema,
Option<String> keyFieldOverride,
boolean enableOptimizedLogBlocksScan,
- HoodieRecordMerger recordMerger) {
+ HoodieRecordMerger recordMerger,
+ HoodieFileGroupRecordBuffer<T>
recordBuffer) {
this.readerContext = readerContext;
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
@@ -198,6 +200,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
this.partitionNameOverrideOpt = partitionNameOverride;
this.recordType = recordMerger.getRecordType();
+ this.recordBuffer = recordBuffer;
}
/**
@@ -270,6 +273,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
continue;
}
}
+
switch (logBlock.getBlockType()) {
case HFILE_DATA_BLOCK:
case AVRO_DATA_BLOCK:
@@ -731,47 +735,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
.contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME));
}
- /**
- * Iterate over the GenericRecord in the block, read the hoodie key and
partition path and call subclass processors to
- * handle it.
- */
- private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec>
keySpecOpt) throws Exception {
- checkState(partitionNameOverrideOpt.isPresent() ||
partitionPathFieldOpt.isPresent(),
- "Either partition-name override or partition-path field had to be
present");
-
- Option<Pair<String, String>> recordKeyPartitionPathFieldPair =
populateMetaFields
- ? Option.empty()
- : Option.of(Pair.of(recordKeyField,
partitionPathFieldOpt.orElse(null)));
-
- Pair<ClosableIterator<T>, Schema> recordsIteratorSchemaPair =
- getRecordsIterator(dataBlock, keySpecOpt);
-
- try (ClosableIterator<T> recordIterator =
recordsIteratorSchemaPair.getLeft()) {
- while (recordIterator.hasNext()) {
- T nextRecord = recordIterator.next();
- processNextRecord(nextRecord,
- readerContext.generateMetadataForRecord(nextRecord, readerSchema));
- totalLogRecords.incrementAndGet();
- }
- }
- }
-
- /**
- * Process next record.
- *
- * @param record The next record in engine-specific representation.
- * @param metadata The metadata of the record.
- * @throws Exception
- */
- public abstract void processNextRecord(T record, Map<String, Object>
metadata) throws Exception;
-
- /**
- * Process next deleted record.
- *
- * @param deleteRecord Deleted record(hoodie key and ordering value)
- */
- protected abstract void processNextDeletedRecord(DeleteRecord deleteRecord);
-
/**
* Process the set of log blocks belonging to the last instant which is read
fully.
*/
@@ -785,10 +748,10 @@ public abstract class BaseHoodieLogRecordReader<T> {
case AVRO_DATA_BLOCK:
case HFILE_DATA_BLOCK:
case PARQUET_DATA_BLOCK:
- processDataBlock((HoodieDataBlock) lastBlock, keySpecOpt);
+ recordBuffer.processDataBlock((HoodieDataBlock) lastBlock,
keySpecOpt);
break;
case DELETE_BLOCK:
- Arrays.stream(((HoodieDeleteBlock)
lastBlock).getRecordsToDelete()).forEach(this::processNextDeletedRecord);
+ recordBuffer.processDeleteBlock((HoodieDeleteBlock) lastBlock);
break;
case CORRUPT_BLOCK:
LOG.warn("Found a corrupt block which was not rolled back");
@@ -850,59 +813,6 @@ public abstract class BaseHoodieLogRecordReader<T> {
return payloadProps;
}
- /**
- * Key specification with a list of column names.
- */
- protected interface KeySpec {
- List<String> getKeys();
-
- boolean isFullKey();
-
- static KeySpec fullKeySpec(List<String> keys) {
- return new FullKeySpec(keys);
- }
-
- static KeySpec prefixKeySpec(List<String> keyPrefixes) {
- return new PrefixKeySpec(keyPrefixes);
- }
- }
-
- private static class FullKeySpec implements KeySpec {
- private final List<String> keys;
-
- private FullKeySpec(List<String> keys) {
- this.keys = keys;
- }
-
- @Override
- public List<String> getKeys() {
- return keys;
- }
-
- @Override
- public boolean isFullKey() {
- return true;
- }
- }
-
- private static class PrefixKeySpec implements KeySpec {
- private final List<String> keysPrefixes;
-
- private PrefixKeySpec(List<String> keysPrefixes) {
- this.keysPrefixes = keysPrefixes;
- }
-
- @Override
- public List<String> getKeys() {
- return keysPrefixes;
- }
-
- @Override
- public boolean isFullKey() {
- return false;
- }
- }
-
public Deque<HoodieLogBlock> getCurrentInstantLogBlocks() {
return currentInstantLogBlocks;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/FullKeySpec.java
similarity index 61%
copy from
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
copy to
hudi-common/src/main/java/org/apache/hudi/common/table/log/FullKeySpec.java
index 2caa146fc92..ede7918649b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/FullKeySpec.java
@@ -17,20 +17,29 @@
* under the License.
*/
-package org.apache.hudi.common.table.read;
+package org.apache.hudi.common.table.log;
-import org.apache.hudi.common.config.TypedProperties;
-
-import org.apache.avro.Schema;
+import java.util.List;
/**
- * A class holding the state that is needed by {@code HoodieFileGroupReader},
- * e.g., schema, merging strategy, etc.
+ * This class specifies a set of record keys that is a full key.
+ * That is, the comparison between a record key and an element
+ * of the set is {@link String#equals}.
*/
-public class FileGroupReaderState {
- public String tablePath;
- public String latestCommitTime;
- public Schema baseFileAvroSchema;
- public Schema logRecordAvroSchema;
- public TypedProperties mergeProps = new TypedProperties();
+public class FullKeySpec implements KeySpec {
+ private final List<String> keys;
+
+ public FullKeySpec(List<String> keys) {
+ this.keys = keys;
+ }
+
+ @Override
+ public List<String> getKeys() {
+ return keys;
+ }
+
+ @Override
+ public boolean isFullKey() {
+ return true;
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index aedb1a8e992..6f417be072b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -20,16 +20,15 @@
package org.apache.hudi.common.table.log;
import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
+import org.apache.hudi.common.table.read.HoodieFileGroupRecordBuffer;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.internal.schema.InternalSchema;
@@ -41,8 +40,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
-import java.io.IOException;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -63,8 +60,6 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
private static final Logger LOG =
LoggerFactory.getLogger(HoodieMergedLogRecordReader.class);
// A timer for calculating elapsed time in millis
public final HoodieTimer timer = HoodieTimer.create();
- // Map of compacted/merged records with metadata
- private final Map<String, Pair<Option<T>, Map<String, Object>>> records;
// Set of already scanned prefixes allowing us to avoid scanning same
prefixes again
private final Set<String> scannedPrefixes;
// count of merged records in log
@@ -81,11 +76,14 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
Option<String> partitionName,
InternalSchema internalSchema,
Option<String> keyFieldOverride,
- boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger) {
+ boolean enableOptimizedLogBlocksScan,
+ HoodieRecordMerger recordMerger,
+ HoodieFileGroupRecordBuffer<T>
recordBuffer) {
super(readerContext, fs, basePath, logFilePaths, readerSchema,
latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
- instantRange, withOperationField, forceFullScan, partitionName,
internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger);
- this.records = new HashMap<>();
+ instantRange, withOperationField, forceFullScan, partitionName,
internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan,
+ recordMerger, recordBuffer);
this.scannedPrefixes = new HashSet<>();
+
if (forceFullScan) {
performScan();
}
@@ -122,7 +120,7 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
}
List<String> missingKeys = keys.stream()
- .filter(key -> !records.containsKey(key))
+ .filter(key -> !recordBuffer.containsLogRecord(key))
.collect(Collectors.toList());
if (missingKeys.isEmpty()) {
@@ -174,19 +172,19 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
scanInternal(Option.empty(), false);
this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
- this.numMergedRecordsInLog = records.size();
+ this.numMergedRecordsInLog = recordBuffer.size();
LOG.info("Number of log files scanned => " + logFilePaths.size());
- LOG.info("Number of entries in Map => " + records.size());
+ LOG.info("Number of entries in Map => " + recordBuffer.size());
}
@Override
public Iterator<Pair<Option<T>, Map<String, Object>>> iterator() {
- return records.values().iterator();
+ return recordBuffer.getLogRecordIterator();
}
- public Map<String, Pair<Option<T>, Map<String, Object>>> getRecords() {
- return records;
+ public Map<Object, Pair<Option<T>, Map<String, Object>>> getRecords() {
+ return recordBuffer.getLogRecords();
}
public HoodieRecord.HoodieRecordType getRecordType() {
@@ -204,69 +202,13 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
return new Builder();
}
- @Override
- public void processNextRecord(T record, Map<String, Object> metadata) throws
IOException {
- String key = (String)
metadata.get(HoodieReaderContext.INTERNAL_META_RECORD_KEY);
- Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(key);
-
- if (existingRecordMetadataPair != null) {
- // Merge and store the combined record
- // Note that the incoming `record` is from an older commit, so it should
be put as
- // the `older` in the merge API
- HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(
- readerContext.constructHoodieRecord(Option.of(record), metadata,
readerSchema), readerSchema,
-
readerContext.constructHoodieRecord(existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), readerSchema),
- readerSchema, this.getPayloadProps()).get().getLeft();
- // If pre-combine returns existing record, no need to update it
- if (combinedRecord.getData() !=
existingRecordMetadataPair.getLeft().get()) {
- records.put(key,
Pair.of(Option.ofNullable(readerContext.seal(combinedRecord.getData())),
metadata));
- }
- } else {
- // Put the record as is
- // NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
- // payload pointing into a shared, mutable (underlying) buffer we
get a clean copy of
- // it since these records will be put into records(Map).
- records.put(key, Pair.of(Option.of(readerContext.seal(record)),
metadata));
- }
- }
-
- @Override
- protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
- String key = deleteRecord.getRecordKey();
- Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(key);
- if (existingRecordMetadataPair != null) {
- // Merge and store the merged record. The ordering val is taken to
decide whether the same key record
- // should be deleted or be kept. The old record is kept only if the
DELETE record has smaller ordering val.
- // For same ordering values, uses the natural order(arrival time
semantics).
-
- Comparable existingOrderingVal = readerContext.getOrderingValue(
- existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), readerSchema,
- this.hoodieTableMetaClient.getTableConfig().getProps());
- Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
- // Checks the ordering value does not equal to 0
- // because we use 0 as the default value which means natural order
- boolean chooseExisting = !deleteOrderingVal.equals(0)
- && ReflectionUtils.isSameClass(existingOrderingVal,
deleteOrderingVal)
- && existingOrderingVal.compareTo(deleteOrderingVal) > 0;
- if (chooseExisting) {
- // The DELETE message is obsolete if the old message has greater
orderingVal.
- return;
- }
- }
- // Put the DELETE record
- records.put(key, Pair.of(Option.empty(),
- readerContext.generateMetadataForRecord(key,
deleteRecord.getPartitionPath(), deleteRecord.getOrderingValue())));
- }
-
public long getTotalTimeTakenToReadAndMergeBlocks() {
return totalTimeTakenToReadAndMergeBlocks;
}
@Override
public void close() {
- if (records != null) {
- records.clear();
- }
+ // No op.
}
/**
@@ -296,6 +238,8 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
private boolean enableOptimizedLogBlocksScan = false;
private HoodieRecordMerger recordMerger =
HoodiePreCombineAvroRecordMerger.INSTANCE;
+ private HoodieFileGroupRecordBuffer<T> recordBuffer;
+
@Override
public Builder<T> withHoodieReaderContext(HoodieReaderContext<T>
readerContext) {
this.readerContext = readerContext;
@@ -397,18 +341,28 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
return this;
}
+ public Builder<T> withRecordBuffer(HoodieFileGroupRecordBuffer<T>
recordBuffer) {
+ this.recordBuffer = recordBuffer;
+ return this;
+ }
+
@Override
public HoodieMergedLogRecordReader<T> build() {
if (this.partitionName == null &&
CollectionUtils.nonEmpty(this.logFilePaths)) {
this.partitionName = getRelativePartitionPath(new Path(basePath), new
Path(this.logFilePaths.get(0)).getParent());
}
ValidationUtils.checkArgument(recordMerger != null);
+ ValidationUtils.checkArgument(recordBuffer != null);
- return new HoodieMergedLogRecordReader<>(readerContext, fs, basePath,
logFilePaths, readerSchema,
+ return new HoodieMergedLogRecordReader<>(
+ readerContext, fs, basePath, logFilePaths, readerSchema,
latestInstantTime, readBlocksLazily, reverseReader,
bufferSize, instantRange,
withOperationField, forceFullScan,
- Option.ofNullable(partitionName), internalSchema,
Option.ofNullable(keyFieldOverride), enableOptimizedLogBlocksScan,
recordMerger);
+ Option.ofNullable(partitionName), internalSchema,
+ Option.ofNullable(keyFieldOverride),
+ enableOptimizedLogBlocksScan, recordMerger,
+ recordBuffer);
}
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/KeySpec.java
similarity index 60%
copy from
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
copy to hudi-common/src/main/java/org/apache/hudi/common/table/log/KeySpec.java
index 2caa146fc92..c7ecfd383ef 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/KeySpec.java
@@ -17,20 +17,25 @@
* under the License.
*/
-package org.apache.hudi.common.table.read;
+package org.apache.hudi.common.table.log;
-import org.apache.hudi.common.config.TypedProperties;
-
-import org.apache.avro.Schema;
+import java.util.List;
/**
- * A class holding the state that is needed by {@code HoodieFileGroupReader},
- * e.g., schema, merging strategy, etc.
+ * An interface to specify a set of keys that used to filter records
+ * from log files. That is, only the records whose record key match
+ * any of the specified keys can go to the downstream operations.
*/
-public class FileGroupReaderState {
- public String tablePath;
- public String latestCommitTime;
- public Schema baseFileAvroSchema;
- public Schema logRecordAvroSchema;
- public TypedProperties mergeProps = new TypedProperties();
+public interface KeySpec {
+ List<String> getKeys();
+
+ boolean isFullKey();
+
+ static KeySpec fullKeySpec(List<String> keys) {
+ return new FullKeySpec(keys);
+ }
+
+ static KeySpec prefixKeySpec(List<String> keyPrefixes) {
+ return new PrefixKeySpec(keyPrefixes);
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/PrefixKeySpec.java
similarity index 58%
copy from
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
copy to
hudi-common/src/main/java/org/apache/hudi/common/table/log/PrefixKeySpec.java
index 2caa146fc92..0ab8c4eadb2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/PrefixKeySpec.java
@@ -17,20 +17,29 @@
* under the License.
*/
-package org.apache.hudi.common.table.read;
+package org.apache.hudi.common.table.log;
-import org.apache.hudi.common.config.TypedProperties;
-
-import org.apache.avro.Schema;
+import java.util.List;
/**
- * A class holding the state that is needed by {@code HoodieFileGroupReader},
- * e.g., schema, merging strategy, etc.
+ * This class specifies a set of record keys, each of which is a prefix.
+ * That is, the comparison between a record key and an element
+ * of the set is {@link String#startsWith(String)}.
*/
-public class FileGroupReaderState {
- public String tablePath;
- public String latestCommitTime;
- public Schema baseFileAvroSchema;
- public Schema logRecordAvroSchema;
- public TypedProperties mergeProps = new TypedProperties();
+public class PrefixKeySpec implements KeySpec {
+ private final List<String> keysPrefixes;
+
+ public PrefixKeySpec(List<String> keysPrefixes) {
+ this.keysPrefixes = keysPrefixes;
+ }
+
+ @Override
+ public List<String> getKeys() {
+ return keysPrefixes;
+ }
+
+ @Override
+ public boolean isFullKey() {
+ return false;
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index 2b3dfaf6a61..a0f9016f409 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -141,6 +141,10 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
return serializeRecords(records.get());
}
+ public String getKeyFieldName() {
+ return keyFieldName;
+ }
+
protected static Schema getWriterSchema(Map<HeaderMetadataType, String>
logBlockHeader) {
return new
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA));
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
new file mode 100644
index 00000000000..a1685b82a93
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.KeySpec;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieCorruptedDataException;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.apache.avro.Schema;
+import org.roaringbitmap.longlong.Roaring64NavigableMap;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public abstract class HoodieBaseFileGroupRecordBuffer<T> implements
HoodieFileGroupRecordBuffer<T> {
+ protected final HoodieReaderContext<T> readerContext;
+ protected final Schema readerSchema;
+ protected final Schema baseFileSchema;
+ protected final Option<String> partitionNameOverrideOpt;
+ protected final Option<String[]> partitionPathFieldOpt;
+ protected final HoodieRecordMerger recordMerger;
+ protected final TypedProperties payloadProps;
+ protected final HoodieTableMetaClient hoodieTableMetaClient;
+ protected final Map<Object, Pair<Option<T>, Map<String, Object>>> records;
+ protected ClosableIterator<T> baseFileIterator;
+ protected Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
+ protected T nextRecord;
+
+ public HoodieBaseFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
+ Schema readerSchema,
+ Schema baseFileSchema,
+ Option<String>
partitionNameOverrideOpt,
+ Option<String[]>
partitionPathFieldOpt,
+ HoodieRecordMerger recordMerger,
+ TypedProperties payloadProps,
+ HoodieTableMetaClient
hoodieTableMetaClient) {
+ this.readerContext = readerContext;
+ this.readerSchema = readerSchema;
+ this.baseFileSchema = baseFileSchema;
+ this.partitionNameOverrideOpt = partitionNameOverrideOpt;
+ this.partitionPathFieldOpt = partitionPathFieldOpt;
+ this.recordMerger = recordMerger;
+ this.payloadProps = payloadProps;
+ this.hoodieTableMetaClient = hoodieTableMetaClient;
+ this.records = new HashMap<>();
+ }
+
+ @Override
+ public void setBaseFileIteraotr(ClosableIterator<T> baseFileIterator) {
+ this.baseFileIterator = baseFileIterator;
+ }
+
+ @Override
+ public T next() {
+ return nextRecord;
+ }
+
+ @Override
+ public Map<Object, Pair<Option<T>, Map<String, Object>>> getLogRecords() {
+ return records;
+ }
+
+ @Override
+ public int size() {
+ return records.size();
+ }
+
+ @Override
+ public Iterator<Pair<Option<T>, Map<String, Object>>> getLogRecordIterator()
{
+ return records.values().iterator();
+ }
+
+ @Override
+ public void close() {
+ records.clear();
+ }
+
+ /**
+ * Merge two log data records if needed.
+ *
+ * @param record
+ * @param metadata
+ * @param existingRecordMetadataPair
+ * @return
+ * @throws IOException
+ */
+ protected Option<T> doProcessNextDataRecord(T record,
+ Map<String, Object> metadata,
+ Pair<Option<T>, Map<String,
Object>> existingRecordMetadataPair) throws IOException {
+ if (existingRecordMetadataPair != null) {
+ // Merge and store the combined record
+ // Note that the incoming `record` is from an older commit, so it should
be put as
+ // the `older` in the merge API
+ HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(
+ readerContext.constructHoodieRecord(Option.of(record), metadata,
readerSchema),
+ readerSchema,
+ readerContext.constructHoodieRecord(
+ existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), readerSchema),
+ readerSchema,
+ payloadProps).get().getLeft();
+ // If pre-combine returns existing record, no need to update it
+ if (combinedRecord.getData() !=
existingRecordMetadataPair.getLeft().get()) {
+ return Option.of(combinedRecord.getData());
+ }
+ return Option.empty();
+ } else {
+ // Put the record as is
+ // NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
+ // payload pointing into a shared, mutable (underlying) buffer we
get a clean copy of
+ // it since these records will be put into records(Map).
+ return Option.of(record);
+ }
+ }
+
+ /**
+ * Merge a delete record with another record (data, or delete).
+ *
+ * @param deleteRecord
+ * @param existingRecordMetadataPair
+ * @return
+ */
+ protected Option<DeleteRecord> doProcessNextDeletedRecord(DeleteRecord
deleteRecord,
+ Pair<Option<T>,
Map<String, Object>> existingRecordMetadataPair) {
+ if (existingRecordMetadataPair != null) {
+ // Merge and store the merged record. The ordering val is taken to
decide whether the same key record
+ // should be deleted or be kept. The old record is kept only if the
DELETE record has smaller ordering val.
+ // For same ordering values, uses the natural order(arrival time
semantics).
+ Comparable existingOrderingVal = readerContext.getOrderingValue(
+ existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), readerSchema,
+ this.hoodieTableMetaClient.getTableConfig().getProps());
+ Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
+ // Checks the ordering value does not equal to 0
+ // because we use 0 as the default value which means natural order
+ boolean chooseExisting = !deleteOrderingVal.equals(0)
+ && ReflectionUtils.isSameClass(existingOrderingVal,
deleteOrderingVal)
+ && existingOrderingVal.compareTo(deleteOrderingVal) > 0;
+ if (chooseExisting) {
+ // The DELETE message is obsolete if the old message has greater
orderingVal.
+ return Option.empty();
+ }
+ }
+ // Do delete.
+ return Option.of(deleteRecord);
+ }
+
+ /**
+ * Create a record iterator for a data block. The records are filtered by a
key set specified by {@code keySpecOpt}.
+ *
+ * @param dataBlock
+ * @param keySpecOpt
+ * @return
+ * @throws IOException
+ */
+ protected Pair<ClosableIterator<T>, Schema> getRecordsIterator(
+ HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws
IOException {
+ ClosableIterator<T> blockRecordsIterator;
+ if (keySpecOpt.isPresent()) {
+ KeySpec keySpec = keySpecOpt.get();
+ blockRecordsIterator = dataBlock.getEngineRecordIterator(readerContext,
keySpec.getKeys(), keySpec.isFullKey());
+ } else {
+ blockRecordsIterator = dataBlock.getEngineRecordIterator(readerContext);
+ }
+ return Pair.of(blockRecordsIterator, dataBlock.getSchema());
+ }
+
+ /**
+ * Merge two records using the configured record merger.
+ *
+ * @param older
+ * @param olderInfoMap
+ * @param newer
+ * @param newerInfoMap
+ * @return
+ * @throws IOException
+ */
+ protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
+ Option<T> newer, Map<String, Object> newerInfoMap)
throws IOException {
+ if (!older.isPresent()) {
+ return newer;
+ }
+
+ Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.merge(
+ readerContext.constructHoodieRecord(older, olderInfoMap,
baseFileSchema), baseFileSchema,
+ readerContext.constructHoodieRecord(newer, newerInfoMap,
readerSchema), readerSchema, payloadProps);
+ if (mergedRecord.isPresent()) {
+ return Option.ofNullable((T) mergedRecord.get().getLeft().getData());
+ }
+ return Option.empty();
+ }
+
+ /**
+ * Filter a record for downstream processing when:
+ * 1. A set of pre-specified keys exists.
+ * 2. The key of the record is not contained in the set.
+ */
+ protected boolean shouldSkip(T record, String keyFieldName, boolean
isFullKey, Set<String> keys) {
+ String recordKey = readerContext.getValue(record, readerSchema,
keyFieldName).toString();
+ // Can not extract the record key, throw.
+ if (recordKey == null || recordKey.isEmpty()) {
+ throw new HoodieKeyException("Can not extract the key for a record");
+ }
+
+ // No keys are specified. Cannot skip at all.
+ if (keys.isEmpty()) {
+ return false;
+ }
+
+ // When the record key matches with one of the keys or key prefixes, can
not skip.
+ if ((isFullKey && keys.contains(recordKey))
+ || (!isFullKey && keys.stream().anyMatch(recordKey::startsWith))) {
+ return false;
+ }
+
+ // Otherwise, this record is not needed.
+ return true;
+ }
+
+ /**
+ * Extract the record positions from a log block header.
+ *
+ * @param logBlock
+ * @return
+ * @throws IOException
+ */
+ protected static List<Long> extractRecordPositions(HoodieLogBlock logBlock)
throws IOException {
+ List<Long> blockPositions = new ArrayList<>();
+
+ Roaring64NavigableMap positions = logBlock.getRecordPositions();
+ if (positions == null || positions.isEmpty()) {
+ throw new HoodieValidationException("No record position info is found
when attempt to do position based merge.");
+ }
+
+ Iterator<Long> iterator = positions.iterator();
+ while (iterator.hasNext()) {
+ blockPositions.add(iterator.next());
+ }
+
+ if (blockPositions.isEmpty()) {
+ throw new HoodieCorruptedDataException("No positions are extracted.");
+ }
+
+ return blockPositions;
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index d72f3ccfcd6..296b8155e99 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieTableQueryType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -33,7 +32,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.EmptyIterator;
-import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
@@ -42,11 +41,7 @@ import org.apache.hadoop.fs.Path;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY;
@@ -73,16 +68,12 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
private final long start;
// Length of bytes to read from the base file
private final long length;
- // Key to record and metadata mapping from log files
- private final Map<String, Pair<Option<T>, Map<String, Object>>>
logFileRecordMapping = new HashMap<>();
- private final FileGroupReaderState readerState = new FileGroupReaderState();
+ // Core structure to store and process records.
+ private final HoodieFileGroupRecordBuffer<T> recordBuffer;
+ private final HoodieFileGroupReaderState readerState = new
HoodieFileGroupReaderState();
private ClosableIterator<T> baseFileIterator;
- // This is only initialized and used after all records from the base file
are iterated
- private Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
private HoodieRecordMerger recordMerger;
- T nextRecord;
-
public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient metaClient,
String fileGroupId,
@@ -90,7 +81,8 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
HoodieTimeline timeline,
HoodieTableQueryType queryType,
Option<String> instantTime,
- Option<String> startInstantTime) {
+ Option<String> startInstantTime,
+ HoodieFileGroupRecordBuffer<T> recordBuffer) {
// This constructor is a placeholder now to allow automatically fetching
the correct list of
// base and log files for a file group.
// Derive base and log files and call the corresponding constructor.
@@ -102,6 +94,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
this.start = 0;
this.length = Long.MAX_VALUE;
this.baseFileIterator = new EmptyIterator<>();
+ this.recordBuffer = recordBuffer;
}
public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
@@ -113,7 +106,8 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
Schema avroSchema,
TypedProperties props,
long start,
- long length) {
+ long length,
+ HoodieFileGroupRecordBuffer<T> recordBuffer) {
this.readerContext = readerContext;
this.hadoopConf = hadoopConf;
this.baseFilePath = baseFilePath;
@@ -128,6 +122,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
this.readerState.baseFileAvroSchema = avroSchema;
this.readerState.logRecordAvroSchema = avroSchema;
this.readerState.mergeProps.putAll(props);
+ this.recordBuffer = recordBuffer;
}
/**
@@ -136,10 +131,10 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
public void initRecordIterators() {
this.baseFileIterator = baseFilePath.isPresent()
? readerContext.getFileRecordIterator(
- baseFilePath.get().getHadoopPath(), 0, baseFilePath.get().getFileLen(),
- readerState.baseFileAvroSchema, readerState.baseFileAvroSchema,
hadoopConf)
+ baseFilePath.get().getHadoopPath(), start, length,
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
: new EmptyIterator<>();
scanLogFiles();
+ recordBuffer.setBaseFileIteraotr(baseFileIterator);
}
/**
@@ -147,49 +142,22 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
* @throws IOException on reader error.
*/
public boolean hasNext() throws IOException {
- while (baseFileIterator.hasNext()) {
- T baseRecord = baseFileIterator.next();
- String recordKey = readerContext.getRecordKey(baseRecord,
readerState.baseFileAvroSchema);
- Pair<Option<T>, Map<String, Object>> logRecordInfo =
logFileRecordMapping.remove(recordKey);
- Option<T> resultRecord = logRecordInfo != null
- ? merge(Option.of(baseRecord), Collections.emptyMap(),
logRecordInfo.getLeft(), logRecordInfo.getRight())
- : merge(Option.empty(), Collections.emptyMap(),
Option.of(baseRecord), Collections.emptyMap());
- if (resultRecord.isPresent()) {
- nextRecord = readerContext.seal(resultRecord.get());
- return true;
- }
- }
-
- if (logRecordIterator == null) {
- logRecordIterator = logFileRecordMapping.values().iterator();
- }
-
- while (logRecordIterator.hasNext()) {
- Pair<Option<T>, Map<String, Object>> nextRecordInfo =
logRecordIterator.next();
- Option<T> resultRecord = merge(Option.empty(), Collections.emptyMap(),
- nextRecordInfo.getLeft(), nextRecordInfo.getRight());
- if (resultRecord.isPresent()) {
- nextRecord = readerContext.seal(resultRecord.get());
- return true;
- }
- }
-
- return false;
+ return recordBuffer.hasNext();
}
/**
* @return The next record after calling {@link #hasNext}.
*/
public T next() {
- T result = nextRecord;
- nextRecord = null;
- return result;
+ return recordBuffer.next();
}
private void scanLogFiles() {
if (logFilePathList.isPresent()) {
- FileSystem fs = readerContext.getFs(logFilePathList.get().get(0),
hadoopConf);
- HoodieMergedLogRecordReader<T> logRecordReader =
HoodieMergedLogRecordReader.newBuilder()
+ String path = baseFilePath.isPresent() ? baseFilePath.get().getPath() :
logFilePathList.get().get(0);
+ FileSystem fs = readerContext.getFs(path, hadoopConf);
+
+ HoodieMergedLogRecordReader logRecordReader =
HoodieMergedLogRecordReader.newBuilder()
.withHoodieReaderContext(readerContext)
.withFileSystem(fs)
.withBasePath(readerState.tablePath)
@@ -200,34 +168,56 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
.withReverseReader(false)
.withBufferSize(getIntWithAltKeys(props,
HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE))
.withPartition(getRelativePartitionPath(
- new Path(readerState.tablePath), new
Path(logFilePathList.get().get(0)).getParent()
- ))
+ new Path(readerState.tablePath), new
Path(logFilePathList.get().get(0)).getParent()))
.withRecordMerger(recordMerger)
+ .withRecordBuffer(recordBuffer)
.build();
- logFileRecordMapping.putAll(logRecordReader.getRecords());
logRecordReader.close();
}
}
- private Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
- Option<T> newer, Map<String, Object> newerInfoMap)
throws IOException {
- if (!older.isPresent()) {
- return newer;
- }
-
- Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.merge(
- readerContext.constructHoodieRecord(older, olderInfoMap,
readerState.baseFileAvroSchema), readerState.baseFileAvroSchema,
- readerContext.constructHoodieRecord(newer, newerInfoMap,
readerState.logRecordAvroSchema), readerState.logRecordAvroSchema, props);
- if (mergedRecord.isPresent()) {
- return Option.ofNullable((T) mergedRecord.get().getLeft().getData());
- }
- return Option.empty();
- }
-
@Override
public void close() throws IOException {
if (baseFileIterator != null) {
baseFileIterator.close();
}
+ if (recordBuffer != null) {
+ recordBuffer.close();
+ }
+ }
+
+ public HoodieFileGroupReaderIterator<T> getClosableIterator() {
+ return new HoodieFileGroupReaderIterator<>(this);
+ }
+
+ public static class HoodieFileGroupReaderIterator<T> implements
ClosableIterator<T> {
+ private final HoodieFileGroupReader<T> reader;
+
+ public HoodieFileGroupReaderIterator(HoodieFileGroupReader<T> reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ return reader.hasNext();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to read record", e);
+ }
+ }
+
+ @Override
+ public T next() {
+ return reader.next();
+ }
+
+ @Override
+ public void close() {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to close the reader", e);
+ }
+ }
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderState.java
similarity index 96%
rename from
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
rename to
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderState.java
index 2caa146fc92..e50713bb40a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderState.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderState.java
@@ -27,7 +27,7 @@ import org.apache.avro.Schema;
* A class holding the state that is needed by {@code HoodieFileGroupReader},
* e.g., schema, merging strategy, etc.
*/
-public class FileGroupReaderState {
+public class HoodieFileGroupReaderState {
public String tablePath;
public String latestCommitTime;
public Schema baseFileAvroSchema;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
new file mode 100644
index 00000000000..680bbf9d705
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.table.log.KeySpec;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+public interface HoodieFileGroupRecordBuffer<T> {
+ enum BufferType {
+ KEY_BASED,
+ POSITION_BASED
+ }
+
+ /**
+ * @return The merge strategy implemented.
+ */
+ BufferType getBufferType();
+
+ /**
+ * Process a log data block, and store the resulting records into the buffer.
+ *
+ * @param dataBlock
+ * @param keySpecOpt
+ * @throws IOException
+ */
+ void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt)
throws IOException;
+
+ /**
+ * Process a next record in a log data block.
+ *
+ * @param record
+ * @param metadata
+ * @throws Exception
+ */
+ void processNextDataRecord(T record, Map<String, Object> metadata, Object
index) throws IOException;
+
+ /**
+ * Process a log delete block, and store the resulting records into the
buffer.
+ *
+ * @param deleteBlock
+ * @throws IOException
+ */
+ void processDeleteBlock(HoodieDeleteBlock deleteBlock) throws IOException;
+
+ /**
+ * Process next delete record.
+ *
+ * @param deleteRecord
+ */
+ void processNextDeletedRecord(DeleteRecord deleteRecord, Object index);
+
+ /**
+ * Check if a record exists in the buffered records.
+ */
+ boolean containsLogRecord(String recordKey);
+
+ /**
+ * @return the number of log records in the buffer.
+ */
+ int size();
+
+ /**
+ * @return An iterator on the log records.
+ */
+ Iterator<Pair<Option<T>, Map<String, Object>>> getLogRecordIterator();
+
+ /**
+ * @return The underlying data stored in the buffer.
+ */
+ Map<Object, Pair<Option<T>, Map<String, Object>>> getLogRecords();
+
+ /**
+ * Link the base file iterator for consequential merge.
+ *
+ * @param baseFileIterator
+ */
+ void setBaseFileIteraotr(ClosableIterator<T> baseFileIterator);
+
+ /**
+ * Check if next merged record exists.
+ *
+ * @return true if it has, otherwise false.
+ */
+ boolean hasNext() throws IOException;
+
+ /**
+ *
+ * @return output the next merged record.
+ */
+ T next();
+
+ void close();
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
new file mode 100644
index 00000000000..d055246cfa8
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.KeySpec;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+/**
+ * A buffer that is used to store log records by {@link
org.apache.hudi.common.table.log.HoodieMergedLogRecordReader}
+ * by calling the {@link #processDataBlock} and {@link #processDeleteBlock}
methods into a record key based map.
+ * The records from the base file is accessed from an iterator object. These
records are merged when the
+ * {@link #hasNext} method is called.
+ */
+public class HoodieKeyBasedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupRecordBuffer<T> {
+ public HoodieKeyBasedFileGroupRecordBuffer(HoodieReaderContext<T>
readerContext,
+ Schema readerSchema,
+ Schema baseFileSchema,
+ Option<String>
partitionNameOverrideOpt,
+ Option<String[]>
partitionPathFieldOpt,
+ HoodieRecordMerger recordMerger,
+ TypedProperties payloadProps,
+ HoodieTableMetaClient
hoodieTableMetaClient) {
+ super(readerContext, readerSchema, baseFileSchema,
partitionNameOverrideOpt, partitionPathFieldOpt,
+ recordMerger, payloadProps, hoodieTableMetaClient);
+ }
+
+ @Override
+ public BufferType getBufferType() {
+ return BufferType.KEY_BASED;
+ }
+
+ @Override
+ public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec>
keySpecOpt) throws IOException {
+ checkState(partitionNameOverrideOpt.isPresent() ||
partitionPathFieldOpt.isPresent(),
+ "Either partition-name override or partition-path field had to be
present");
+
+
+ Pair<ClosableIterator<T>, Schema> recordsIteratorSchemaPair =
+ getRecordsIterator(dataBlock, keySpecOpt);
+
+ try (ClosableIterator<T> recordIterator =
recordsIteratorSchemaPair.getLeft()) {
+ while (recordIterator.hasNext()) {
+ T nextRecord = recordIterator.next();
+ Map<String, Object> metadata =
readerContext.generateMetadataForRecord(nextRecord, readerSchema);
+ String recordKey = (String)
metadata.get(HoodieReaderContext.INTERNAL_META_RECORD_KEY);
+ processNextDataRecord(nextRecord, metadata, recordKey);
+ }
+ }
+ }
+
+ @Override
+ public void processNextDataRecord(T record, Map<String, Object> metadata,
Object recordKey) throws IOException {
+ Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(recordKey);
+ Option<T> mergedRecord = doProcessNextDataRecord(record, metadata,
existingRecordMetadataPair);
+ if (mergedRecord.isPresent()) {
+ records.put(recordKey,
Pair.of(Option.ofNullable(readerContext.seal(mergedRecord.get())), metadata));
+ }
+ }
+
+ @Override
+ public void processDeleteBlock(HoodieDeleteBlock deleteBlock) throws
IOException {
+ Iterator<DeleteRecord> it =
Arrays.stream(deleteBlock.getRecordsToDelete()).iterator();
+ while (it.hasNext()) {
+ DeleteRecord record = it.next();
+ String recordKey = record.getRecordKey();
+ processNextDeletedRecord(record, recordKey);
+ }
+ }
+
+ @Override
+ public void processNextDeletedRecord(DeleteRecord deleteRecord, Object
recordKey) {
+ Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(recordKey);
+ Option<DeleteRecord> recordOpt = doProcessNextDeletedRecord(deleteRecord,
existingRecordMetadataPair);
+ if (recordOpt.isPresent()) {
+ records.put(recordKey, Pair.of(Option.empty(),
readerContext.generateMetadataForRecord(
+ (String) recordKey, recordOpt.get().getPartitionPath(),
recordOpt.get().getOrderingValue())));
+ }
+ }
+
+ @Override
+ public boolean containsLogRecord(String recordKey) {
+ return records.containsKey(recordKey);
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ ValidationUtils.checkState(baseFileIterator != null, "Base file iterator
has not been set yet");
+
+ // Handle merging.
+ while (baseFileIterator.hasNext()) {
+ T baseRecord = baseFileIterator.next();
+
+ String recordKey = readerContext.getRecordKey(baseRecord,
baseFileSchema);
+ Pair<Option<T>, Map<String, Object>> logRecordInfo =
records.remove(recordKey);
+
+ Option<T> resultRecord = logRecordInfo != null
+ ? merge(Option.of(baseRecord), Collections.emptyMap(),
logRecordInfo.getLeft(), logRecordInfo.getRight())
+ : merge(Option.empty(), Collections.emptyMap(),
Option.of(baseRecord), Collections.emptyMap());
+ if (resultRecord.isPresent()) {
+ nextRecord = readerContext.seal(resultRecord.get());
+ return true;
+ }
+ }
+
+ // Handle records solely from log files.
+ if (logRecordIterator == null) {
+ logRecordIterator = records.values().iterator();
+ }
+
+ while (logRecordIterator.hasNext()) {
+ Pair<Option<T>, Map<String, Object>> nextRecordInfo =
logRecordIterator.next();
+ Option<T> resultRecord;
+ resultRecord = merge(Option.empty(), Collections.emptyMap(),
+ nextRecordInfo.getLeft(), nextRecordInfo.getRight());
+ if (resultRecord.isPresent()) {
+ nextRecord = readerContext.seal(resultRecord.get());
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
new file mode 100644
index 00000000000..7fc99523e19
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.log.KeySpec;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+/**
+ * A buffer that is used to store log records by {@link
org.apache.hudi.common.table.log.HoodieMergedLogRecordReader}
+ * by calling the {@link #processDataBlock} and {@link #processDeleteBlock}
methods into record position based map.
+ * Here the position means that record position in the base file. The records
from the base file is accessed from an iterator object. These records are
merged when the
+ * {@link #hasNext} method is called.
+ */
+public class HoodiePositionBasedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupRecordBuffer<T> {
+ private long nextRecordPosition = 0L;
+
+ public HoodiePositionBasedFileGroupRecordBuffer(HoodieReaderContext<T>
readerContext,
+ Schema readerSchema,
+ Schema baseFileSchema,
+ Option<String>
partitionNameOverrideOpt,
+ Option<String[]>
partitionPathFieldOpt,
+ HoodieRecordMerger
recordMerger,
+ TypedProperties payloadProps,
+ HoodieTableMetaClient
hoodieTableMetaClient) {
+ super(readerContext, readerSchema, baseFileSchema,
partitionNameOverrideOpt, partitionPathFieldOpt,
+ recordMerger, payloadProps, hoodieTableMetaClient);
+ }
+
+ @Override
+ public BufferType getBufferType() {
+ return BufferType.POSITION_BASED;
+ }
+
+ @Override
+ public void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec>
keySpecOpt) throws IOException {
+ checkState(partitionNameOverrideOpt.isPresent() ||
partitionPathFieldOpt.isPresent(),
+ "Either partition-name override or partition-path field had to be
present");
+
+ // Prepare key filters.
+ Set<String> keys = new HashSet<>();
+ boolean isFullKey = true;
+ if (keySpecOpt.isPresent()) {
+ if (!keySpecOpt.get().getKeys().isEmpty()) {
+ keys = new HashSet<>(keySpecOpt.get().getKeys());
+ }
+ isFullKey = keySpecOpt.get().isFullKey();
+ }
+
+ // Extract positions from data block.
+ List<Long> recordPositions = extractRecordPositions(dataBlock);
+
+ // TODO: return an iterator that can generate sequence number with the
record.
+ // Then we can hide this logic into data block.
+ try (ClosableIterator<T> recordIterator =
dataBlock.getEngineRecordIterator(readerContext)) {
+ int recordIndex = 0;
+ while (recordIterator.hasNext()) {
+ T nextRecord = recordIterator.next();
+
+ // Skip a record if it is not contained in the specified keys.
+ if (shouldSkip(nextRecord, dataBlock.getKeyFieldName(), isFullKey,
keys)) {
+ recordIndex++;
+ continue;
+ }
+
+ long recordPosition = recordPositions.get(recordIndex++);
+ processNextDataRecord(
+ nextRecord,
+ readerContext.generateMetadataForRecord(nextRecord, readerSchema),
+ recordPosition
+ );
+ }
+ }
+ }
+
+ @Override
+ public void processNextDataRecord(T record, Map<String, Object> metadata,
Object recordPosition) throws IOException {
+ Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(recordPosition);
+ Option<T> mergedRecord = doProcessNextDataRecord(record, metadata,
existingRecordMetadataPair);
+ if (mergedRecord.isPresent()) {
+ records.put(recordPosition,
Pair.of(Option.ofNullable(readerContext.seal(mergedRecord.get())), metadata));
+ }
+ }
+
+ @Override
+ public void processDeleteBlock(HoodieDeleteBlock deleteBlock) throws
IOException {
+ List<Long> recordPositions = extractRecordPositions(deleteBlock);
+
+ int recordIndex = 0;
+ Iterator<DeleteRecord> it =
Arrays.stream(deleteBlock.getRecordsToDelete()).iterator();
+ while (it.hasNext()) {
+ DeleteRecord record = it.next();
+ long recordPosition = recordPositions.get(recordIndex++);
+ processNextDeletedRecord(record, recordPosition);
+ }
+ }
+
+ @Override
+ public void processNextDeletedRecord(DeleteRecord deleteRecord, Object
recordPosition) {
+ Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(recordPosition);
+ Option<DeleteRecord> recordOpt = doProcessNextDeletedRecord(deleteRecord,
existingRecordMetadataPair);
+ if (recordOpt.isPresent()) {
+ String recordKey = recordOpt.get().getRecordKey();
+ records.put(recordPosition, Pair.of(Option.empty(),
readerContext.generateMetadataForRecord(
+ recordKey, recordOpt.get().getPartitionPath(),
recordOpt.get().getOrderingValue())));
+ }
+ }
+
+ @Override
+ public boolean containsLogRecord(String recordKey) {
+ return records.values().stream()
+ .filter(r -> r.getLeft().isPresent())
+ .map(r -> readerContext.getRecordKey(r.getKey().get(),
readerSchema)).anyMatch(recordKey::equals);
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ ValidationUtils.checkState(baseFileIterator != null, "Base file iterator
has not been set yet");
+
+ // Handle merging.
+ while (baseFileIterator.hasNext()) {
+ T baseRecord = baseFileIterator.next();
+ Pair<Option<T>, Map<String, Object>> logRecordInfo =
records.remove(nextRecordPosition++);
+
+ Option<T> resultRecord = logRecordInfo != null
+ ? merge(Option.of(baseRecord), Collections.emptyMap(),
logRecordInfo.getLeft(), logRecordInfo.getRight())
+ : merge(Option.empty(), Collections.emptyMap(),
Option.of(baseRecord), Collections.emptyMap());
+ if (resultRecord.isPresent()) {
+ nextRecord = readerContext.seal(resultRecord.get());
+ return true;
+ }
+ }
+
+ // Handle records solely from log files.
+ if (logRecordIterator == null) {
+ logRecordIterator = records.values().iterator();
+ }
+
+ while (logRecordIterator.hasNext()) {
+ Pair<Option<T>, Map<String, Object>> nextRecordInfo =
logRecordIterator.next();
+ Option<T> resultRecord;
+ resultRecord = merge(Option.empty(), Collections.emptyMap(),
+ nextRecordInfo.getLeft(), nextRecordInfo.getRight());
+ if (resultRecord.isPresent()) {
+ nextRecord = readerContext.seal(resultRecord.get());
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 4f7c4d9595f..f3e142b2f57 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -38,6 +38,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -47,10 +48,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
+import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.getLogFileListFromFileSlice;
import static
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings;
+import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
@@ -64,7 +68,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
public abstract String getBasePath();
- public abstract HoodieReaderContext<T> getHoodieReaderContext();
+ public abstract HoodieReaderContext<T> getHoodieReaderContext(String[]
partitionPath);
public abstract void commitToTable(List<String> recordList, String operation,
Map<String, String> writeConfigs);
@@ -80,12 +84,14 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
writeConfigs.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
"_row_key");
writeConfigs.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
"partition_path");
writeConfigs.put("hoodie.datasource.write.precombine.field", "timestamp");
+ writeConfigs.put("hoodie.payload.ordering.field", "timestamp");
writeConfigs.put(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "hoodie_test");
writeConfigs.put("hoodie.insert.shuffle.parallelism", "4");
writeConfigs.put("hoodie.upsert.shuffle.parallelism", "4");
writeConfigs.put("hoodie.bulkinsert.shuffle.parallelism", "2");
writeConfigs.put("hoodie.delete.shuffle.parallelism", "1");
writeConfigs.put("hoodie.merge.small.file.group.candidates.limit", "0");
+ writeConfigs.put("hoodie.compact.inline", "false");
writeConfigs.put(HoodieMetadataConfig.ENABLE.key(), "false");
try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
@@ -123,10 +129,22 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
List<T> actualRecordList = new ArrayList<>();
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.datasource.write.precombine.field", "timestamp");
- props.setProperty(HoodieTableConfig.RECORD_MERGER_STRATEGY.key(),
- HoodieTableConfig.RECORD_MERGER_STRATEGY.defaultValue());
+ props.setProperty("hoodie.payload.ordering.field", "timestamp");
+ props.setProperty(RECORD_MERGER_STRATEGY.key(),
RECORD_MERGER_STRATEGY.defaultValue());
+ String filePath = fileSlice.getBaseFile().isPresent() ?
fileSlice.getBaseFile().get().getPath() : logFilePathList.get(0);
+ String[] partitionValues = {partitionPaths[0]};
+ HoodieFileGroupRecordBuffer<T> recordBuffer = new
HoodieKeyBasedFileGroupRecordBuffer<>(
+ getHoodieReaderContext(partitionValues),
+ avroSchema,
+ avroSchema,
+ Option.of(getRelativePartitionPath(new Path(basePath), new
Path(filePath).getParent())),
+ metaClient.getTableConfig().getPartitionFields(),
+ getHoodieReaderContext(partitionValues).getRecordMerger(
+ getStringWithAltKeys(props, RECORD_MERGER_STRATEGY,
RECORD_MERGER_STRATEGY.defaultValue())),
+ props,
+ metaClient);
HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<T>(
- getHoodieReaderContext(),
+ getHoodieReaderContext(partitionValues),
hadoopConf,
basePath,
metaClient.getActiveTimeline().lastInstant().get().getTimestamp(),
@@ -134,8 +152,9 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
logFilePathList.isEmpty() ? Option.empty() :
Option.of(logFilePathList),
avroSchema,
props,
- -1,
- -1);
+ 0,
+ fileSlice.getTotalFileSize(),
+ recordBuffer);
fileGroupReader.initRecordIterators();
while (fileGroupReader.hasNext()) {
actualRecordList.add(fileGroupReader.next());
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
index 34214be1bd2..a76d4bfc77f 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation._
import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.common.config.ConfigProperty
+import org.apache.hudi.common.config.{ConfigProperty, HoodieReaderConfig}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
import
org.apache.hudi.common.model.HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX
@@ -36,7 +36,7 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
import org.apache.spark.sql.catalyst.analysis.Resolver
-import
org.apache.spark.sql.execution.datasources.parquet.NewHoodieParquetFileFormat
+import
org.apache.spark.sql.execution.datasources.parquet.{HoodieFileGroupReaderBasedParquetFileFormat,
NewHoodieParquetFileFormat}
import org.apache.spark.sql.execution.datasources.{FileStatusCache,
HadoopFsRelation}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.sources.BaseRelation
@@ -150,6 +150,12 @@ class NewHoodieParquetFileFormatUtils(val sqlContext:
SQLContext,
sqlContext.getConf(config.key(),
defaultValueOption.getOrElse(config.defaultValue())))
}
+ private def checkIfAConfigurationEnabled(config:
ConfigProperty[java.lang.Boolean],
+ defaultValueOption: Option[String] =
Option.empty): Boolean = {
+ optParams.getOrElse(config.key(),
+ sqlContext.getConf(config.key(),
defaultValueOption.getOrElse(String.valueOf(config.defaultValue())))).toBoolean
+ }
+
protected val mergeType: String =
optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
DataSourceReadOptions.REALTIME_MERGE.defaultValue)
@@ -180,6 +186,9 @@ class NewHoodieParquetFileFormatUtils(val sqlContext:
SQLContext,
val recordMergerStrategy =
getConfigValue(HoodieWriteConfig.RECORD_MERGER_STRATEGY,
Option(metaClient.getTableConfig.getRecordMergerStrategy))
+ val fileGroupReaderEnabled =
checkIfAConfigurationEnabled(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
+ val shouldUseRecordPosition =
checkIfAConfigurationEnabled(HoodieWriteConfig.WRITE_RECORD_POSITIONS)
+
val tableState = // Subset of the state of table's configuration as of at
the time of the query
HoodieTableState(
tablePath = basePath.toString,
@@ -199,14 +208,18 @@ class NewHoodieParquetFileFormatUtils(val sqlContext:
SQLContext,
Seq.empty
}
fileIndex.shouldEmbedFileSlices = true
+ val fileGroupReaderBasedFileFormat = new
HoodieFileGroupReaderBasedParquetFileFormat(
+ tableState, HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
isMOR, isBootstrap, shouldUseRecordPosition)
+ val newHoodieParquetFileFormat = new
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
+ sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
+ metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
isMOR, isBootstrap)
HadoopFsRelation(
location = fileIndex,
partitionSchema = fileIndex.partitionSchema,
dataSchema = fileIndex.dataSchema,
bucketSpec = None,
- fileFormat = new
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
-
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString, internalSchemaOpt)),
- metaClient.getTableConfig.getTableName, mergeType, mandatoryFields,
isMOR, isBootstrap),
+ fileFormat = if (fileGroupReaderEnabled) fileGroupReaderBasedFileFormat
else newHoodieParquetFileFormat,
optParams)(sparkSession)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
new file mode 100644
index 00000000000..5bc4e4a80f9
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import kotlin.NotImplementedError
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.engine.HoodieReaderContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieLogFile,
HoodieRecord, HoodieRecordMerger}
+import org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.read.{HoodieFileGroupReader,
HoodieFileGroupRecordBuffer, HoodieKeyBasedFileGroupRecordBuffer,
HoodiePositionBasedFileGroupRecordBuffer}
+import org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys
+import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.{HoodieBaseRelation, HoodieSparkUtils,
HoodieTableSchema, HoodieTableState, MergeOnReadSnapshotRelation,
PartitionFileSliceMapping, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.jdk.CollectionConverters.asScalaIteratorConverter
+
+/**
+ * This class utilizes {@link HoodieFileGroupReader} and its related classes
to support reading
+ * from Parquet formatted base files and their log files.
+ */
+class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
+ tableSchema:
HoodieTableSchema,
+ tableName: String,
+ mergeType: String,
+ mandatoryFields: Seq[String],
+ isMOR: Boolean,
+ isBootstrap: Boolean,
+ shouldUseRecordPosition:
Boolean
+ ) extends ParquetFileFormat with
SparkAdapterSupport {
+ var isProjected = false
+
+ /**
+ * Support batch needs to remain consistent, even if one side of a bootstrap
merge can support
+ * while the other side can't
+ */
+ private var supportBatchCalled = false
+ private var supportBatchResult = false
+
+ override def supportBatch(sparkSession: SparkSession, schema: StructType):
Boolean = {
+ if (!supportBatchCalled) {
+ supportBatchCalled = true
+ supportBatchResult = !isMOR && super.supportBatch(sparkSession, schema)
+ }
+ supportBatchResult
+ }
+
+ override def isSplitable(
+ sparkSession: SparkSession,
+ options: Map[String, String],
+ path: Path): Boolean = {
+ false
+ }
+
+ override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
+ val requiredSchemaWithMandatory =
generateRequiredSchemaWithMandatory(requiredSchema, dataSchema)
+ val requiredSchemaSplits = requiredSchemaWithMandatory.fields.partition(f
=> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name))
+ val requiredMeta = StructType(requiredSchemaSplits._1)
+ val requiredWithoutMeta = StructType(requiredSchemaSplits._2)
+ val augmentedHadoopConf = FSUtils.buildInlineConf(hadoopConf)
+ val (baseFileReader, preMergeBaseFileReader, _, _) = buildFileReaders(
+ sparkSession, dataSchema, partitionSchema, requiredSchema, filters,
options, augmentedHadoopConf,
+ requiredSchemaWithMandatory, requiredWithoutMeta, requiredMeta)
+ val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new
SerializableConfiguration(augmentedHadoopConf))
+
+ (file: PartitionedFile) => {
+ file.partitionValues match {
+ case fileSliceMapping: PartitionFileSliceMapping =>
+ val filePath =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
+ if (FSUtils.isLogFile(filePath)) {
+ // TODO: Use FileGroupReader here: HUDI-6942.
+ throw new NotImplementedError("Not support reading with only log
files")
+ } else {
+ fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName))
match {
+ case Some(fileSlice) =>
+ val hoodieBaseFile = fileSlice.getBaseFile.get()
+ val bootstrapFileOpt = hoodieBaseFile.getBootstrapBaseFile
+ val partitionValues = fileSliceMapping.getInternalRow
+ val logFiles = getLogFilesFromSlice(fileSlice)
+ if (requiredSchemaWithMandatory.isEmpty) {
+ val baseFile = createPartitionedFile(partitionValues,
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
+ // TODO: Use FileGroupReader here: HUDI-6942.
+ baseFileReader(baseFile)
+ } else if (bootstrapFileOpt.isPresent) {
+ // TODO: Use FileGroupReader here: HUDI-6942.
+ throw new NotImplementedError("Not support reading bootstrap
file")
+ } else {
+ if (logFiles.isEmpty) {
+ throw new IllegalStateException(
+ "should not be here since file slice should not have
been broadcasted "
+ + "since it has no log or data files")
+ }
+ buildFileGroupIterator(
+ preMergeBaseFileReader,
+ partitionValues,
+ hoodieBaseFile,
+ logFiles,
+ requiredSchemaWithMandatory,
+ broadcastedHadoopConf.value.value,
+ file.start,
+ file.length,
+ shouldUseRecordPosition
+ )
+ }
+ // TODO: Use FileGroupReader here: HUDI-6942.
+ case _ => baseFileReader(file)
+ }
+ }
+ // TODO: Use FileGroupReader here: HUDI-6942.
+ case _ => baseFileReader(file)
+ }
+ }
+ }
+
+ protected def buildFileGroupIterator(preMergeBaseFileReader: PartitionedFile
=> Iterator[InternalRow],
+ partitionValues: InternalRow,
+ baseFile: HoodieBaseFile,
+ logFiles: List[HoodieLogFile],
+ requiredSchemaWithMandatory: StructType,
+ hadoopConf: Configuration,
+ start: Long,
+ length: Long,
+ shouldUseRecordPosition: Boolean):
Iterator[InternalRow] = {
+ val readerContext: HoodieReaderContext[InternalRow] = new
SparkFileFormatInternalRowReaderContext(
+ preMergeBaseFileReader, partitionValues)
+ val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
+ .builder().setConf(hadoopConf).setBasePath(tableState.tablePath).build
+ val avroSchema: Schema =
HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory, tableName)
+ val filePath: String = if (baseFile != null) baseFile.getPath else
logFiles.head.toString
+ val partitionNameOpt: HOption[String] =
HOption.of(FSUtils.getRelativePartitionPath(
+ new Path(tableState.tablePath), new Path(filePath).getParent))
+ val partitionPathFieldOpt = metaClient.getTableConfig.getPartitionFields
+ val recordMerger: HoodieRecordMerger =
readerContext.getRecordMerger(getStringWithAltKeys(
+ new TypedProperties, RECORD_MERGER_STRATEGY,
RECORD_MERGER_STRATEGY.defaultValue))
+ val recordBuffer: HoodieFileGroupRecordBuffer[InternalRow] = if
(shouldUseRecordPosition) {
+ new HoodiePositionBasedFileGroupRecordBuffer[InternalRow](
+ readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
+ recordMerger, new TypedProperties, metaClient)
+ } else {
+ new HoodieKeyBasedFileGroupRecordBuffer[InternalRow](
+ readerContext, avroSchema, avroSchema, partitionNameOpt,
partitionPathFieldOpt,
+ recordMerger, new TypedProperties, metaClient)
+ }
+ val reader = new HoodieFileGroupReader[InternalRow](
+ readerContext,
+ hadoopConf,
+ tableState.tablePath,
+ tableState.latestCommitTimestamp.get,
+ HOption.of(baseFile),
+ HOption.of(logFiles.map(f => f.getPath.toString).asJava),
+ HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory,
tableName),
+ new TypedProperties(),
+ start,
+ length,
+ recordBuffer
+ )
+ reader.initRecordIterators()
+
reader.getClosableIterator.asInstanceOf[java.util.Iterator[InternalRow]].asScala
+ }
+
+ def generateRequiredSchemaWithMandatory(requiredSchema: StructType,
dataSchema: StructType): StructType = {
+ if (!isMOR ||
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState)) {
+ val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
+ for (field <- mandatoryFields) {
+ if (requiredSchema.getFieldIndex(field).isEmpty) {
+ val fieldToAdd =
dataSchema.fields(dataSchema.getFieldIndex(field).get)
+ added.append(fieldToAdd)
+ }
+ }
+ val addedFields = StructType(added.toArray)
+ StructType(requiredSchema.toArray ++ addedFields.fields)
+ } else {
+ dataSchema
+ }
+ }
+
+ protected def buildFileReaders(sparkSession: SparkSession, dataSchema:
StructType, partitionSchema: StructType,
+ requiredSchema: StructType, filters:
Seq[Filter], options: Map[String, String],
+ hadoopConf: Configuration,
requiredSchemaWithMandatory: StructType,
+ requiredWithoutMeta: StructType,
requiredMeta: StructType):
+ (PartitionedFile => Iterator[InternalRow],
+ PartitionedFile => Iterator[InternalRow],
+ PartitionedFile => Iterator[InternalRow],
+ PartitionedFile => Iterator[InternalRow]) = {
+
+ //file reader when you just read a hudi parquet file and don't do any
merging
+ val baseFileReader = super.buildReaderWithPartitionValues(sparkSession,
dataSchema, partitionSchema, requiredSchema,
+ filters, options, new Configuration(hadoopConf))
+
+ //file reader for reading a hudi base file that needs to be merged with
log files
+ val preMergeBaseFileReader = if (isMOR) {
+ // Add support for reading files using inline file system.
+ super.buildReaderWithPartitionValues(sparkSession, dataSchema,
partitionSchema,
+ requiredSchemaWithMandatory, Seq.empty, options, new
Configuration(hadoopConf))
+ } else {
+ _: PartitionedFile => Iterator.empty
+ }
+
+ //Rules for appending partitions and filtering in the bootstrap readers:
+ // 1. if it is mor, we don't want to filter data or append partitions
+ // 2. if we need to merge the bootstrap base and skeleton files then we
cannot filter
+ // 3. if we need to merge the bootstrap base and skeleton files then we
should never append partitions to the
+ // skeleton reader
+
+ val needMetaCols = requiredMeta.nonEmpty
+ val needDataCols = requiredWithoutMeta.nonEmpty
+
+ //file reader for bootstrap skeleton files
+ val skeletonReader = if (needMetaCols && isBootstrap) {
+ if (needDataCols || isMOR) {
+ // no filter and no append
+ super.buildReaderWithPartitionValues(sparkSession,
HoodieSparkUtils.getMetaSchema, StructType(Seq.empty),
+ requiredMeta, Seq.empty, options, new Configuration(hadoopConf))
+ } else {
+ // filter and append
+ super.buildReaderWithPartitionValues(sparkSession,
HoodieSparkUtils.getMetaSchema, partitionSchema,
+ requiredMeta, filters, options, new Configuration(hadoopConf))
+ }
+ } else {
+ _: PartitionedFile => Iterator.empty
+ }
+
+ //file reader for bootstrap base files
+ val bootstrapBaseReader = if (needDataCols && isBootstrap) {
+ val dataSchemaWithoutMeta = StructType(dataSchema.fields.filterNot(sf =>
isMetaField(sf.name)))
+ if (isMOR) {
+ // no filter and no append
+ super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, StructType(Seq.empty), requiredWithoutMeta,
+ Seq.empty, options, new Configuration(hadoopConf))
+ } else if (needMetaCols) {
+ // no filter but append
+ super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
+ Seq.empty, options, new Configuration(hadoopConf))
+ } else {
+ // filter and append
+ super.buildReaderWithPartitionValues(sparkSession,
dataSchemaWithoutMeta, partitionSchema, requiredWithoutMeta,
+ filters, options, new Configuration(hadoopConf))
+ }
+ } else {
+ _: PartitionedFile => Iterator.empty
+ }
+
+ (baseFileReader, preMergeBaseFileReader, skeletonReader,
bootstrapBaseReader)
+ }
+
+ protected def getLogFilesFromSlice(fileSlice: FileSlice):
List[HoodieLogFile] = {
+
fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
index 36f0f3ca351..57243e97125 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java
@@ -55,11 +55,18 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT;
+import static
org.apache.hudi.common.model.HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY;
+import static org.apache.hudi.config.HoodieWriteConfig.RECORD_MERGER_IMPLS;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_RECORD_POSITIONS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -79,8 +86,8 @@ public class TestHoodieMergeHandleWithSparkMerger extends
SparkClientFunctionalT
HoodieTableConfig.BASE_FILE_FORMAT.key(),
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
properties.setProperty(
- "hoodie.payload.ordering.field",
- "_hoodie_record_key");
+ PAYLOAD_ORDERING_FIELD_PROP_KEY,
+
HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName());
metaClient = getHoodieMetaClient(hadoopConf(), basePath(),
HoodieTableType.MERGE_ON_READ, properties);
}
@@ -89,6 +96,7 @@ public class TestHoodieMergeHandleWithSparkMerger extends
SparkClientFunctionalT
HoodieWriteConfig writeConfig = buildDefaultWriteConfig(SCHEMA);
HoodieRecordMerger merger = writeConfig.getRecordMerger();
assertTrue(merger instanceof DefaultMerger);
+
assertTrue(writeConfig.getBooleanOrDefault(FILE_GROUP_READER_ENABLED.key(),
false));
insertAndUpdate(writeConfig, 114);
}
@@ -97,6 +105,7 @@ public class TestHoodieMergeHandleWithSparkMerger extends
SparkClientFunctionalT
HoodieWriteConfig writeConfig = buildNoFlushWriteConfig(SCHEMA);
HoodieRecordMerger merger = writeConfig.getRecordMerger();
assertTrue(merger instanceof NoFlushMerger);
+
assertTrue(writeConfig.getBooleanOrDefault(FILE_GROUP_READER_ENABLED.key(),
false));
insertAndUpdate(writeConfig, 64);
}
@@ -105,6 +114,7 @@ public class TestHoodieMergeHandleWithSparkMerger extends
SparkClientFunctionalT
HoodieWriteConfig writeConfig = buildCustomWriteConfig(SCHEMA);
HoodieRecordMerger merger = writeConfig.getRecordMerger();
assertTrue(merger instanceof CustomMerger);
+
assertTrue(writeConfig.getBooleanOrDefault(FILE_GROUP_READER_ENABLED.key(),
false));
insertAndUpdate(writeConfig, 95);
}
@@ -149,14 +159,20 @@ public class TestHoodieMergeHandleWithSparkMerger extends
SparkClientFunctionalT
public HoodieWriteConfig getWriteConfig(Schema avroSchema) {
Properties extraProperties = new Properties();
extraProperties.setProperty(
- "hoodie.datasource.write.record.merger.impls",
+ RECORD_MERGER_IMPLS.key(),
"org.apache.hudi.HoodieSparkRecordMerger");
extraProperties.setProperty(
- "hoodie.logfile.data.block.format",
+ LOGFILE_DATA_BLOCK_FORMAT.key(),
"parquet");
extraProperties.setProperty(
- "hoodie.payload.ordering.field",
- "_hoodie_record_key");
+ PAYLOAD_ORDERING_FIELD_PROP_KEY,
+
HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName());
+ extraProperties.setProperty(
+ FILE_GROUP_READER_ENABLED.key(),
+ "true");
+ extraProperties.setProperty(
+ WRITE_RECORD_POSITIONS.key(),
+ "true");
return getConfigBuilder(true)
.withPath(basePath())
@@ -210,11 +226,32 @@ public class TestHoodieMergeHandleWithSparkMerger extends
SparkClientFunctionalT
}
public void checkDataEquality(int numRecords) {
- List<Row> rows = spark()
+ Map<String, String> properties = new HashMap<>();
+ properties.put(
+ RECORD_MERGER_IMPLS.key(),
+ "org.apache.hudi.HoodieSparkRecordMerger");
+ properties.put(
+ LOGFILE_DATA_BLOCK_FORMAT.key(),
+ "parquet");
+ properties.put(
+ PAYLOAD_ORDERING_FIELD_PROP_KEY,
+
HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName());
+ properties.put(
+ FILE_GROUP_READER_ENABLED.key(),
+ "true");
+ properties.put(
+ DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT().key(),
+ "true");
+ properties.put(
+ WRITE_RECORD_POSITIONS.key(),
+ "true");
+ Dataset<Row> rows = spark()
.read()
+ .options(properties)
.format("org.apache.hudi")
- .load(basePath() + "/" + getPartitionPath()).collectAsList();
- assertEquals(numRecords, rows.size());
+ .load(basePath() + "/" + getPartitionPath());
+ List<Row> result = rows.collectAsList();
+ assertEquals(numRecords, result.size());
}
public void insertAndUpdate(HoodieWriteConfig writeConfig, int
expectedRecordNum) throws Exception {
@@ -276,6 +313,15 @@ public class TestHoodieMergeHandleWithSparkMerger extends
SparkClientFunctionalT
// Check data after
checkDataEquality(expectedRecordNum);
+
+ // (3) Write: append, generate the log file.
+ instantTime = "003";
+ writeClient.startCommitWithTime(instantTime);
+
+ List<HoodieRecord> records5 =
generateEmptyRecords(getKeys(records).subList(50, 59)); // 9 deletes only
+ assertEquals(9, records5.size());
+ updateRecordsInMORTable(reloadedMetaClient, records5, writeClient,
writeConfig, instantTime, false);
+ checkDataEquality(expectedRecordNum - 9);
}
}
@@ -335,11 +381,7 @@ public class TestHoodieMergeHandleWithSparkMerger extends
SparkClientFunctionalT
public static class CustomMerger extends HoodieSparkRecordMerger {
@Override
public boolean shouldFlush(HoodieRecord record, Schema schema,
TypedProperties props) throws IOException {
- if (((HoodieSparkRecord)record).getData().getString(0).equals("001")) {
- return false;
- } else {
- return true;
- }
+ return !((HoodieSparkRecord)
record).getData().getString(0).equals("001");
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 70c1bcb480e..50056954a12 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -22,11 +22,17 @@ package org.apache.hudi.common.table.read
import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.hudi.common.engine.HoodieReaderContext
+import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
-import org.apache.hudi.{SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
+import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.{AvroConversionUtils,
SparkFileFormatInternalRowReaderContext}
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Dataset, HoodieInternalRowUtils,
HoodieUnsafeUtils, Row, SaveMode, SparkSession}
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.{HoodieSparkKryoRegistrar, SparkConf}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
@@ -55,22 +61,39 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.kryo.registrator",
"org.apache.spark.HoodieSparkKryoRegistrar")
+ sparkConf.set("spark.sql.parquet.enableVectorizedReader", "false")
HoodieSparkKryoRegistrar.register(sparkConf)
spark = SparkSession.builder.config(sparkConf).getOrCreate
}
override def getHadoopConf: Configuration = {
- new Configuration()
+ FSUtils.buildInlineConf(new Configuration)
}
override def getBasePath: String = {
tempDir.toAbsolutePath.toUri.toString
}
- override def getHoodieReaderContext: HoodieReaderContext[InternalRow] = {
- new SparkFileFormatInternalRowReaderContext(spark,
-
SparkAdapterSupport.sparkAdapter.createLegacyHoodieParquetFileFormat(false).get,
- getHadoopConf)
+ override def getHoodieReaderContext(partitionValues: Array[String]):
HoodieReaderContext[InternalRow] = {
+ val parquetFileFormat = new ParquetFileFormat
+ val metaClient =
HoodieTableMetaClient.builder.setConf(getHadoopConf).setBasePath(getBasePath).build
+ val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema
+ val structTypeSchema =
AvroConversionUtils.convertAvroSchemaToStructType(avroSchema)
+ val partitionFields = metaClient.getTableConfig.getPartitionFields
+ val partitionSchema = new StructType(structTypeSchema.fields.filter(f =>
partitionFields.get().contains(f.name)))
+
+ val recordReaderIterator =
parquetFileFormat.buildReaderWithPartitionValues(
+ spark, structTypeSchema, partitionSchema, structTypeSchema, Seq.empty,
Map.empty, getHadoopConf)
+ val numPartitionFields = if (partitionFields.isPresent)
partitionFields.get().length else 0
+ assertEquals(numPartitionFields, partitionValues.length)
+
+ val partitionValuesEncoded = new Array[UTF8String](partitionValues.length)
+ for (i <- Range(0, numPartitionFields)) {
+ partitionValuesEncoded.update(i,
UTF8String.fromString(partitionValues.apply(i)))
+ }
+
+ val partitionValueRow = new
GenericInternalRow(partitionValuesEncoded.toArray[Any])
+ new SparkFileFormatInternalRowReaderContext(recordReaderIterator,
partitionValueRow)
}
override def commitToTable(recordList: util.List[String], operation: String,
options: util.Map[String, String]): Unit = {