This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b36ba43fdee [HUDI-9146][part2] Integrating FileGroup reader into Flink
merge reader (#13343)
b36ba43fdee is described below
commit b36ba43fdeec253e9a355b734709fc908ffb62c7
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri May 23 22:26:18 2025 +0800
[HUDI-9146][part2] Integrating FileGroup reader into Flink merge reader
(#13343)
---
.../hudi/BaseSparkInternalRowReaderContext.java | 10 +
.../SparkFileFormatInternalRowReaderContext.scala | 2 +-
.../apache/hudi/avro/HoodieAvroReaderContext.java | 10 +
.../hudi/common/engine/HoodieReaderContext.java | 53 ++++
.../table/read/FileGroupReaderSchemaHandler.java | 41 ++-
.../common/table/read/FileGroupRecordBuffer.java | 34 ++-
.../common/table/read/HoodieFileGroupReader.java | 30 +-
.../table/read/KeyBasedFileGroupRecordBuffer.java | 7 +-
.../read/PositionBasedFileGroupRecordBuffer.java | 7 +-
.../table/read/UnmergedFileGroupRecordBuffer.java | 5 +-
...rIterator.java => CloseableFilterIterator.java} | 37 +--
.../common/util/collection/FilterIterator.java | 2 +-
.../table/read/TestFileGroupRecordBuffer.java | 10 +-
.../examples/quickstart/TestQuickstartData.java | 1 +
.../apache/hudi/sink/compact/CompactOperator.java | 2 +-
.../table/format/FlinkRowDataReaderContext.java | 72 ++++-
.../org/apache/hudi/table/format/FormatUtils.java | 2 +
.../table/format/HoodieRowDataParquetReader.java | 14 +-
.../apache/hudi/table/format/RecordIterators.java | 6 +-
.../hudi/table/format/cdc/CdcInputFormat.java | 2 +-
.../table/format/mor/MergeOnReadInputFormat.java | 328 ++-------------------
.../table/format/mor/MergeOnReadTableState.java | 27 --
.../apache/hudi/util/StringToRowDataConverter.java | 21 +-
.../table/TestHoodieFileGroupReaderOnFlink.java | 11 +-
.../apache/hudi/table/format/TestInputFormat.java | 25 ++
.../test/java/org/apache/hudi/utils/TestData.java | 1 +
.../hudi/utils/TestStringToRowDataConverter.java | 20 +-
.../hudi/hadoop/HiveHoodieReaderContext.java | 10 +
.../TestPositionBasedFileGroupRecordBuffer.java | 3 +-
29 files changed, 355 insertions(+), 438 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 0084fb93b3c..56ebbc89497 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -90,6 +90,11 @@ public abstract class BaseSparkInternalRowReaderContext
extends HoodieReaderCont
return getFieldValueFromInternalRow(row, schema, fieldName);
}
+ @Override
+ public String getMetaFieldValue(InternalRow record, int pos) {
+ return record.getString(pos);
+ }
+
@Override
public HoodieRecord<InternalRow>
constructHoodieRecord(BufferedRecord<InternalRow> bufferedRecord) {
HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(),
partitionPath);
@@ -162,4 +167,9 @@ public abstract class BaseSparkInternalRowReaderContext
extends HoodieReaderCont
}
return value;
}
+
+ @Override
+ public InternalRow getDeleteRow(InternalRow record, String recordKey) {
+ throw new UnsupportedOperationException("Not supported for " +
this.getClass().getSimpleName());
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index ec9efab469a..1a08c69cd4b 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -25,6 +25,7 @@ import org.apache.hudi.avro.AvroSchemaUtils.isNullable
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.HoodieTableConfig
import
org.apache.hudi.common.table.read.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.collection.{CachingIterator,
ClosableIterator, Pair => HPair}
@@ -35,7 +36,6 @@ import org.apache.hudi.util.CloseableInternalRowIterator
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.hadoop.conf.Configuration
-import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.spark.sql.HoodieInternalRowUtils
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.InternalRow
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index ef100f1c718..c15d2b618cc 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -98,6 +98,11 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
return (GenericRecord) record;
}
+ @Override
+ public IndexedRecord getDeleteRow(IndexedRecord record, String recordKey) {
+ throw new UnsupportedOperationException("Not supported for " +
this.getClass().getSimpleName());
+ }
+
@Override
public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode,
String mergeStrategyId, String mergeImplClasses) {
switch (mergeMode) {
@@ -121,6 +126,11 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
return getFieldValueFromIndexedRecord(record, schema, fieldName);
}
+ @Override
+ public String getMetaFieldValue(IndexedRecord record, int pos) {
+ return record.get(pos).toString();
+ }
+
@Override
public HoodieRecord<IndexedRecord>
constructHoodieRecord(BufferedRecord<IndexedRecord> bufferedRecord) {
if (bufferedRecord.isDelete()) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 638c58aa4d1..61f2deaaadf 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -23,11 +23,13 @@ import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
import org.apache.hudi.common.util.LocalAvroSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableFilterIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.storage.HoodieStorage;
@@ -46,6 +48,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
+import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
@@ -74,6 +77,7 @@ public abstract class HoodieReaderContext<T> {
private Boolean needsBootstrapMerge = null;
private Boolean shouldMergeUseRecordPosition = null;
protected String partitionPath;
+ protected Option<InstantRange> instantRangeOpt = Option.empty();
// for encoding and decoding schemas to the spillable map
private final LocalAvroSchemaCache localAvroSchemaCache =
LocalAvroSchemaCache.getInstance();
@@ -208,6 +212,23 @@ public abstract class HoodieReaderContext<T> {
public abstract T convertAvroRecord(IndexedRecord avroRecord);
public abstract GenericRecord convertToAvroRecord(T record, Schema schema);
+
+ /**
+ * There are two cases to handle:
+ * 1). Return the delete record if it's not null;
+ * 2). otherwise fills an empty row with record key fields and returns.
+ *
+ * <p>For case2, when `emitDelete` is true for FileGroup reader and payload
for DELETE record is empty,
+ * a record key row is emitted to downstream to delete data from storage by
record key with the best effort.
+ * Returns null if the primary key semantics been lost: the requested schema
does not include all the record key fields.
+ *
+ * @param record delete record
+ * @param recordKey record key
+ *
+ * @return Engine specific row which contains record key fields.
+ */
+ @Nullable
+ public abstract T getDeleteRow(T record, String recordKey);
/**
* @param mergeMode record merge mode
@@ -228,6 +249,16 @@ public abstract class HoodieReaderContext<T> {
*/
public abstract Object getValue(T record, Schema schema, String fieldName);
+ /**
+ * Get value of metadata field in a more efficient way than #getValue.
+ *
+ * @param record The record in engine-specific type.
+ * @param pos The position of the metadata field.
+ *
+ * @return The value for the target metadata field.
+ */
+ public abstract String getMetaFieldValue(T record, int pos);
+
/**
* Cast to Java boolean value.
* If the object is not compatible with boolean type, throws.
@@ -241,6 +272,28 @@ public abstract class HoodieReaderContext<T> {
}
}
+ /**
+ * Get the {@link InstantRange} filter.
+ */
+ public Option<InstantRange> getInstantRange() {
+ return instantRangeOpt;
+ }
+
+ /**
+ * Apply the {@link InstantRange} filter to the file record iterator.
+ *
+ * @param fileRecordIterator File record iterator.
+ *
+ * @return File record iterator filter by {@link InstantRange}.
+ */
+ public ClosableIterator<T> applyInstantRangeFilter(ClosableIterator<T>
fileRecordIterator) {
+ InstantRange instantRange = getInstantRange().get();
+ final Schema.Field commitTimeField =
schemaHandler.getRequiredSchema().getField(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
+ final int commitTimePos = commitTimeField.pos();
+ Predicate<T> instantFilter = row ->
instantRange.isInRange(getMetaFieldValue(row, commitTimePos));
+ return new CloseableFilterIterator<>(fileRecordIterator, instantFilter);
+ }
+
/**
* Gets the record key in String.
*
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
index e39dbf3cd52..816ec21a9fb 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
@@ -90,6 +90,7 @@ public class FileGroupReaderSchemaHandler<T> {
private final Option<Pair<String, String>> customDeleteMarkerKeyValue;
private final boolean hasBuiltInDelete;
+ private final int hoodieOperationPos;
public FileGroupReaderSchemaHandler(HoodieReaderContext<T> readerContext,
Schema tableSchema,
@@ -109,6 +110,7 @@ public class FileGroupReaderSchemaHandler<T> {
this.customDeleteMarkerKeyValue = deleteConfigs.getLeft();
this.hasBuiltInDelete = deleteConfigs.getRight();
this.requiredSchema = AvroSchemaCache.intern(prepareRequiredSchema());
+ this.hoodieOperationPos =
Option.ofNullable(requiredSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD)).map(Schema.Field::pos).orElse(-1);
this.internalSchema = pruneInternalSchema(requiredSchema,
internalSchemaOpt);
this.internalSchemaOpt = getInternalSchemaOpt(internalSchemaOpt);
readerContext.setNeedsBootstrapMerge(this.needsBootstrapMerge);
@@ -150,6 +152,10 @@ public class FileGroupReaderSchemaHandler<T> {
return hasBuiltInDelete;
}
+ public int getHoodieOperationPos() {
+ return hoodieOperationPos;
+ }
+
private InternalSchema pruneInternalSchema(Schema requiredSchema,
Option<InternalSchema> internalSchemaOption) {
if (!internalSchemaOption.isPresent()) {
return InternalSchema.getEmptyInternalSchema();
@@ -172,8 +178,13 @@ public class FileGroupReaderSchemaHandler<T> {
@VisibleForTesting
Schema generateRequiredSchema() {
- //might need to change this if other queries than mor have mandatory fields
+ boolean hasInstantRange = readerContext.getInstantRange().isPresent();
if (!needsMORMerge) {
+ if (hasInstantRange && !findNestedField(requestedSchema,
HoodieRecord.COMMIT_TIME_METADATA_FIELD).isPresent()) {
+ List<Schema.Field> addedFields = new ArrayList<>();
+ addedFields.add(getField(tableSchema,
HoodieRecord.COMMIT_TIME_METADATA_FIELD));
+ return appendFieldsToSchemaDedupNested(requestedSchema, addedFields);
+ }
return requestedSchema;
}
@@ -186,14 +197,9 @@ public class FileGroupReaderSchemaHandler<T> {
List<Schema.Field> addedFields = new ArrayList<>();
for (String field : getMandatoryFieldsForMerging(
hoodieTableConfig, properties, tableSchema, recordMerger,
- hasBuiltInDelete, customDeleteMarkerKeyValue)) {
+ hasBuiltInDelete, customDeleteMarkerKeyValue, hasInstantRange)) {
if (!findNestedField(requestedSchema, field).isPresent()) {
- Option<Schema.Field> foundFieldOpt = findNestedField(tableSchema,
field);
- if (!foundFieldOpt.isPresent()) {
- throw new IllegalArgumentException("Field: " + field + " does not
exist in the table schema");
- }
- Schema.Field foundField = foundFieldOpt.get();
- addedFields.add(foundField);
+ addedFields.add(getField(tableSchema, field));
}
}
@@ -209,7 +215,8 @@ public class FileGroupReaderSchemaHandler<T> {
Schema tableSchema,
Option<HoodieRecordMerger> recordMerger,
boolean
hasBuiltInDelete,
- Option<Pair<String,
String>> customDeleteMarkerKeyAndValue) {
+ Option<Pair<String,
String>> customDeleteMarkerKeyAndValue,
+ boolean
hasInstantRange) {
Triple<RecordMergeMode, String, String> mergingConfigs =
HoodieTableConfig.inferCorrectMergingBehavior(
cfg.getRecordMergeMode(),
cfg.getPayloadClass(),
@@ -223,6 +230,11 @@ public class FileGroupReaderSchemaHandler<T> {
// Use Set to avoid duplicated fields.
Set<String> requiredFields = new HashSet<>();
+
+ if (hasInstantRange) {
+ requiredFields.add(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
+ }
+
// Add record key fields.
if (cfg.populateMetaFields()) {
requiredFields.add(HoodieRecord.RECORD_KEY_METADATA_FIELD);
@@ -291,6 +303,17 @@ public class FileGroupReaderSchemaHandler<T> {
return createNewSchemaFromFieldsWithReference(tableSchema, fields);
}
+ /**
+ * Get {@link Schema.Field} from {@link Schema} by field name.
+ */
+ private static Schema.Field getField(Schema schema, String fieldName) {
+ Option<Schema.Field> foundFieldOpt = findNestedField(schema, fieldName);
+ if (!foundFieldOpt.isPresent()) {
+ throw new IllegalArgumentException("Field: " + fieldName + " does not
exist in the table schema");
+ }
+ return foundFieldOpt.get();
+ }
+
/**
* Fetches the delete configs from the configs.
*
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
index 0f89a6c426a..020e19cf0fa 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
@@ -86,6 +86,7 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
protected final HoodieReadStats readStats;
protected final boolean shouldCheckCustomDeleteMarker;
protected final boolean shouldCheckBuiltInDeleteMarker;
+ protected final boolean emitDelete;
protected ClosableIterator<T> baseFileIterator;
protected Iterator<BufferedRecord<T>> logRecordIterator;
protected T nextRecord;
@@ -99,7 +100,8 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
RecordMergeMode recordMergeMode,
TypedProperties props,
HoodieReadStats readStats,
- Option<String> orderingFieldName) {
+ Option<String> orderingFieldName,
+ boolean emitDelete) {
this.readerContext = readerContext;
this.readerSchema =
AvroSchemaCache.intern(readerContext.getSchemaHandler().getRequiredSchema());
this.recordMergeMode = recordMergeMode;
@@ -120,6 +122,7 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
boolean isBitCaskDiskMapCompressionEnabled =
props.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
this.readStats = readStats;
+ this.emitDelete = emitDelete;
try {
// Store merged records for all versions for this log file, set the
in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes,
spillableMapBasePath, new DefaultSizeEstimator<>(),
@@ -163,6 +166,18 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
return columnValue != null && readerContext.castToBoolean(columnValue);
}
+ /**
+ * Returns whether the record is a DELETE marked by the '_hoodie_operation'
field.
+ */
+ protected final boolean isDeleteHoodieOperation(T record) {
+ int hoodieOperationPos =
readerContext.getSchemaHandler().getHoodieOperationPos();
+ if (hoodieOperationPos < 0) {
+ return false;
+ }
+ String hoodieOperation = readerContext.getMetaFieldValue(record,
hoodieOperationPos);
+ return hoodieOperation != null &&
HoodieOperation.isDeleteRecord(hoodieOperation);
+ }
+
@Override
public void setBaseFileIterator(ClosableIterator<T> baseFileIterator) {
this.baseFileIterator = baseFileIterator;
@@ -539,15 +554,20 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
protected boolean hasNextBaseRecord(T baseRecord, BufferedRecord<T>
logRecordInfo) throws IOException {
if (logRecordInfo != null) {
- BufferedRecord<T> bufferedRecord =
BufferedRecord.forRecordWithContext(baseRecord, readerSchema, readerContext,
orderingFieldName, false);
- Pair<Boolean, T> isDeleteAndRecord = merge(bufferedRecord,
logRecordInfo);
+ BufferedRecord<T> baseRecordInfo =
BufferedRecord.forRecordWithContext(baseRecord, readerSchema, readerContext,
orderingFieldName, false);
+ Pair<Boolean, T> isDeleteAndRecord = merge(baseRecordInfo,
logRecordInfo);
if (!isDeleteAndRecord.getLeft()) {
// Updates
nextRecord = readerContext.seal(isDeleteAndRecord.getRight());
readStats.incrementNumUpdates();
return true;
+ } else if (emitDelete) {
+ // emit Deletes
+ nextRecord = readerContext.getDeleteRow(isDeleteAndRecord.getRight(),
baseRecordInfo.getRecordKey());
+ readStats.incrementNumDeletes();
+ return nextRecord != null;
} else {
- // Deletes
+ // not emit Deletes
readStats.incrementNumDeletes();
return false;
}
@@ -570,6 +590,12 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
nextRecord = nextRecordInfo.getRecord();
readStats.incrementNumInserts();
return true;
+ } else if (emitDelete) {
+ nextRecord = readerContext.getDeleteRow(nextRecordInfo.getRecord(),
nextRecordInfo.getRecordKey());
+ readStats.incrementNumDeletes();
+ if (nextRecord != null) {
+ return true;
+ }
} else {
readStats.incrementNumDeletes();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index ed9e2582716..13dc39af3b1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -112,13 +112,13 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
long start, long length, boolean shouldUseRecordPosition) {
this(readerContext, storage, tablePath, latestCommitTime, fileSlice,
dataSchema,
requestedSchema, internalSchemaOpt, hoodieTableMetaClient, props,
start, length,
- shouldUseRecordPosition, false);
+ shouldUseRecordPosition, false, false);
}
private HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
HoodieStorage storage, String tablePath,
String latestCommitTime, FileSlice fileSlice,
Schema dataSchema, Schema requestedSchema,
Option<InternalSchema> internalSchemaOpt,
HoodieTableMetaClient hoodieTableMetaClient, TypedProperties props,
- long start, long length, boolean
shouldUseRecordPosition, boolean allowInflightInstants) {
+ long start, long length, boolean
shouldUseRecordPosition, boolean allowInflightInstants, boolean emitDelete) {
this.readerContext = readerContext;
this.storage = storage;
this.hoodieBaseFileOption = fileSlice.getBaseFile();
@@ -169,7 +169,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
this.readStats = new HoodieReadStats();
this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
recordMergeMode, props, hoodieBaseFileOption, this.logFiles.isEmpty(),
- isSkipMerge, shouldUseRecordPosition, readStats);
+ isSkipMerge, shouldUseRecordPosition, readStats, emitDelete);
this.allowInflightInstants = allowInflightInstants;
}
@@ -184,18 +184,19 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
boolean hasNoLogFiles,
boolean isSkipMerge,
boolean
shouldUseRecordPosition,
- HoodieReadStats readStats) {
+ HoodieReadStats readStats,
+ boolean emitDelete) {
if (hasNoLogFiles) {
return null;
} else if (isSkipMerge) {
return new UnmergedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, recordMergeMode, props,
readStats);
+ readerContext, hoodieTableMetaClient, recordMergeMode, props,
readStats, emitDelete);
} else if (shouldUseRecordPosition && baseFileOption.isPresent()) {
return new PositionBasedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, recordMergeMode,
baseFileOption.get().getCommitTime(), props, readStats, orderingFieldName);
+ readerContext, hoodieTableMetaClient, recordMergeMode,
baseFileOption.get().getCommitTime(), props, readStats, orderingFieldName,
emitDelete);
} else {
return new KeyBasedFileGroupRecordBuffer<>(
- readerContext, hoodieTableMetaClient, recordMergeMode, props,
readStats, orderingFieldName);
+ readerContext, hoodieTableMetaClient, recordMergeMode, props,
readStats, orderingFieldName, emitDelete);
}
}
@@ -224,17 +225,19 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
}
StoragePathInfo baseFileStoragePathInfo = baseFile.getPathInfo();
+ final ClosableIterator<T> recordIterator;
if (baseFileStoragePathInfo != null) {
- return readerContext.getFileRecordIterator(
+ recordIterator = readerContext.getFileRecordIterator(
baseFileStoragePathInfo, start, length,
readerContext.getSchemaHandler().getTableSchema(),
readerContext.getSchemaHandler().getRequiredSchema(), storage);
} else {
- return readerContext.getFileRecordIterator(
+ recordIterator = readerContext.getFileRecordIterator(
baseFile.getStoragePath(), start, length,
readerContext.getSchemaHandler().getTableSchema(),
readerContext.getSchemaHandler().getRequiredSchema(), storage);
}
+ return readerContext.getInstantRange().isPresent() ?
readerContext.applyInstantRangeFilter(recordIterator) : recordIterator;
}
private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile
baseFile) throws IOException {
@@ -341,6 +344,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
.withLogFiles(logFiles)
.withReverseReader(false)
.withBufferSize(getIntWithAltKeys(props,
HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE))
+ .withInstantRange(readerContext.getInstantRange())
.withPartition(getRelativePartitionPath(
new StoragePath(path), logFiles.get(0).getPath().getParent()))
.withRecordBuffer(recordBuffer)
@@ -435,6 +439,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
private long length = Long.MAX_VALUE;
private boolean shouldUseRecordPosition = false;
private boolean allowInflightInstants = false;
+ private boolean emitDelete;
public Builder<T> withReaderContext(HoodieReaderContext<T> readerContext) {
this.readerContext = readerContext;
@@ -497,6 +502,11 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
return this;
}
+ public Builder<T> withEmitDelete(boolean emitDelete) {
+ this.emitDelete = emitDelete;
+ return this;
+ }
+
public HoodieFileGroupReader<T> build() {
ValidationUtils.checkArgument(readerContext != null, "Reader context is
required");
ValidationUtils.checkArgument(hoodieTableMetaClient != null, "Hoodie
table meta client is required");
@@ -515,7 +525,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
return new HoodieFileGroupReader<>(
readerContext, storage, tablePath, latestCommitTime, fileSlice,
dataSchema, requestedSchema, internalSchemaOpt,
hoodieTableMetaClient,
- props, start, length, shouldUseRecordPosition,
allowInflightInstants);
+ props, start, length, shouldUseRecordPosition,
allowInflightInstants, emitDelete);
}
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
index 2ccc740fb77..5bbce2ad256 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
@@ -53,8 +53,9 @@ public class KeyBasedFileGroupRecordBuffer<T> extends
FileGroupRecordBuffer<T> {
RecordMergeMode recordMergeMode,
TypedProperties props,
HoodieReadStats readStats,
- Option<String> orderingFieldName) {
- super(readerContext, hoodieTableMetaClient, recordMergeMode, props,
readStats, orderingFieldName);
+ Option<String> orderingFieldName,
+ boolean emitDelete) {
+ super(readerContext, hoodieTableMetaClient, recordMergeMode, props,
readStats, orderingFieldName, emitDelete);
}
@Override
@@ -77,7 +78,7 @@ public class KeyBasedFileGroupRecordBuffer<T> extends
FileGroupRecordBuffer<T> {
try (ClosableIterator<T> recordIterator =
recordsIteratorSchemaPair.getLeft()) {
while (recordIterator.hasNext()) {
T nextRecord = recordIterator.next();
- boolean isDelete = isBuiltInDeleteRecord(nextRecord) ||
isCustomDeleteRecord(nextRecord);
+ boolean isDelete = isBuiltInDeleteRecord(nextRecord) ||
isCustomDeleteRecord(nextRecord) || isDeleteHoodieOperation(nextRecord);
BufferedRecord<T> bufferedRecord =
BufferedRecord.forRecordWithContext(nextRecord, schema, readerContext,
orderingFieldName, isDelete);
processNextDataRecord(bufferedRecord, bufferedRecord.getRecordKey());
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
index 5df2ea91e07..152b7107b9b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
@@ -71,8 +71,9 @@ public class PositionBasedFileGroupRecordBuffer<T> extends
KeyBasedFileGroupReco
String baseFileInstantTime,
TypedProperties props,
HoodieReadStats readStats,
- Option<String> orderingFieldName) {
- super(readerContext, hoodieTableMetaClient, recordMergeMode, props,
readStats, orderingFieldName);
+ Option<String> orderingFieldName,
+ boolean emitDelete) {
+ super(readerContext, hoodieTableMetaClient, recordMergeMode, props,
readStats, orderingFieldName, emitDelete);
this.baseFileInstantTime = baseFileInstantTime;
}
@@ -130,7 +131,7 @@ public class PositionBasedFileGroupRecordBuffer<T> extends
KeyBasedFileGroupReco
long recordPosition = recordPositions.get(recordIndex++);
T evolvedNextRecord =
schemaTransformerWithEvolvedSchema.getLeft().apply(nextRecord);
- boolean isDelete = isBuiltInDeleteRecord(evolvedNextRecord) ||
isCustomDeleteRecord(evolvedNextRecord);
+ boolean isDelete = isBuiltInDeleteRecord(evolvedNextRecord) ||
isCustomDeleteRecord(evolvedNextRecord) ||
isDeleteHoodieOperation(evolvedNextRecord);
BufferedRecord<T> bufferedRecord =
BufferedRecord.forRecordWithContext(evolvedNextRecord, schema, readerContext,
orderingFieldName, isDelete);
processNextDataRecord(bufferedRecord, recordPosition);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
index c853eb088b7..a9e1f66ef19 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
@@ -50,8 +50,9 @@ public class UnmergedFileGroupRecordBuffer<T> extends
FileGroupRecordBuffer<T> {
HoodieTableMetaClient hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
TypedProperties props,
- HoodieReadStats readStats) {
- super(readerContext, hoodieTableMetaClient, recordMergeMode, props,
readStats, Option.empty());
+ HoodieReadStats readStats,
+ boolean emitDelete) {
+ super(readerContext, hoodieTableMetaClient, recordMergeMode, props,
readStats, Option.empty(), emitDelete);
this.currentInstantLogBlocks = new ArrayDeque<>();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/CloseableFilterIterator.java
similarity index 52%
copy from
hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
copy to
hudi-common/src/main/java/org/apache/hudi/common/util/collection/CloseableFilterIterator.java
index 4ebc7c10129..c706d847821 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/CloseableFilterIterator.java
@@ -18,45 +18,20 @@
package org.apache.hudi.common.util.collection;
-import org.apache.hudi.common.util.ValidationUtils;
-
import java.util.Iterator;
import java.util.function.Predicate;
/**
- * An iterator that filters elements from a source iterator based on a
predicate.
- * @param <R> Type of elements in the iterator
+ * {@link FilterIterator} requiring to be closed after iteration (to cleanup
resources)
*/
-public class FilterIterator<R> implements Iterator<R> {
-
- private final Iterator<R> source;
-
- private final Predicate<R> filter;
-
- private R current;
+public class CloseableFilterIterator<R> extends FilterIterator<R> implements
ClosableIterator<R> {
- public FilterIterator(Iterator<R> source, Predicate<R> filter) {
- this.source = source;
- this.filter = filter;
- }
-
- @Override
- public boolean hasNext() {
- while (current == null && source.hasNext()) {
- R next = source.next();
- if (filter.test(next)) {
- current = next;
- break;
- }
- }
- return current != null;
+ public CloseableFilterIterator(Iterator<R> source, Predicate<R> filter) {
+ super(source, filter);
}
@Override
- public R next() {
- ValidationUtils.checkArgument(hasNext(), "No more elements to iterate");
- R next = current;
- current = null;
- return next;
+ public void close() {
+ ((ClosableIterator<R>) source).close();
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
index 4ebc7c10129..37b5565ce7e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FilterIterator.java
@@ -29,7 +29,7 @@ import java.util.function.Predicate;
*/
public class FilterIterator<R> implements Iterator<R> {
- private final Iterator<R> source;
+ protected final Iterator<R> source;
private final Predicate<R> filter;
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
index 06552a9391c..71f0b359999 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
@@ -153,6 +153,7 @@ class TestFileGroupRecordBuffer {
HoodieTableVersion tableVersion,
String mergeStrategyId) {
HoodieReaderContext readerContext = mock(HoodieReaderContext.class);
+ when(readerContext.getInstantRange()).thenReturn(Option.empty());
when(readerContext.getHasBootstrapBaseFile()).thenReturn(false);
when(readerContext.getHasLogFiles()).thenReturn(true);
HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class);
@@ -290,7 +291,8 @@ class TestFileGroupRecordBuffer {
RecordMergeMode.COMMIT_TIME_ORDERING,
props,
readStats,
- Option.empty());
+ Option.empty(),
+ false);
when(readerContext.getValue(any(), any(), any())).thenReturn(null);
assertFalse(keyBasedBuffer.isCustomDeleteRecord(record));
@@ -302,7 +304,8 @@ class TestFileGroupRecordBuffer {
RecordMergeMode.COMMIT_TIME_ORDERING,
props,
readStats,
- Option.empty());
+ Option.empty(),
+ false);
when(readerContext.getValue(any(), any(), any())).thenReturn("i");
assertFalse(keyBasedBuffer.isCustomDeleteRecord(record));
when(readerContext.getValue(any(), any(), any())).thenReturn("d");
@@ -323,7 +326,8 @@ class TestFileGroupRecordBuffer {
RecordMergeMode.COMMIT_TIME_ORDERING,
props,
readStats,
- Option.empty());
+ Option.empty(),
+ false);
// CASE 1: With custom delete marker.
GenericRecord record = new GenericData.Record(schema);
diff --git
a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
index 5e7613f225a..565543eb833 100644
---
a/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
+++
b/hudi-examples/hudi-examples-flink/src/test/java/org/apache/hudi/examples/quickstart/TestQuickstartData.java
@@ -342,6 +342,7 @@ public class TestQuickstartData {
/**
* Returns the scanner to read avro log files.
*/
+ @Deprecated
private static HoodieMergedLogRecordScanner getScanner(
HoodieStorage storage,
String basePath,
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
index 690ef55e8b8..7159f9a1f32 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactOperator.java
@@ -185,7 +185,7 @@ public class CompactOperator extends
TableStreamOperator<CompactionCommitEvent>
Supplier<InternalSchemaManager> internalSchemaManager = () ->
InternalSchemaManager.get(conf, metaClient);
// initialize storage conf lazily.
StorageConfiguration<?> readerConf =
writeClient.getEngineContext().getStorageConf();
- return Option.of(new FlinkRowDataReaderContext(readerConf,
internalSchemaManager, Collections.emptyList(), metaClient.getTableConfig()));
+ return Option.of(new FlinkRowDataReaderContext(readerConf,
internalSchemaManager, Collections.emptyList(), metaClient.getTableConfig(),
Option.empty()));
} else {
// always using avro record merger for legacy compaction since log
scanner do not support rowdata reading yet.
writeClient.getConfig().setRecordMergerClass(HoodieAvroRecordMerger.class.getName());
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 808860478b3..cfea2d2c64a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -22,7 +22,6 @@ import org.apache.hudi.client.model.BootstrapRowData;
import org.apache.hudi.client.model.CommitTimeFlinkRecordMerger;
import org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
import org.apache.hudi.client.model.HoodieFlinkRecord;
-import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieReaderContext;
@@ -34,7 +33,9 @@ import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.read.BufferedRecord;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
@@ -43,7 +44,8 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.io.storage.HoodieIOFactory;
-import org.apache.hudi.source.ExpressionPredicates.Predicate;
+import org.apache.hudi.keygen.KeyGenUtils;
+import org.apache.hudi.source.ExpressionPredicates;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
@@ -52,6 +54,7 @@ import org.apache.hudi.util.RowDataAvroQueryContexts;
import org.apache.hudi.util.RowDataUtils;
import org.apache.hudi.util.RowProjection;
import org.apache.hudi.util.SchemaEvolvingRowDataProjection;
+import org.apache.hudi.util.StringToRowDataConverter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
@@ -65,6 +68,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
@@ -79,21 +83,28 @@ import static
org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
* log files with Flink parquet reader.
*/
public class FlinkRowDataReaderContext extends HoodieReaderContext<RowData> {
- private final List<Predicate> predicates;
+ private final List<ExpressionPredicates.Predicate> predicates;
private final Supplier<InternalSchemaManager> internalSchemaManager;
private final boolean utcTimezone;
- private final HoodieConfig hoodieConfig;
+ private final HoodieTableConfig tableConfig;
+ // the converter is used to create a RowData contains primary key fields only
+ // for DELETE cases, it'll not be initialized if primary key semantics is
lost.
+ // For e.g, if the pk fields are [a, b] but user only select a, then the pk
+ // semantics is lost.
+ private StringToRowDataConverter recordKeyRowConverter;
public FlinkRowDataReaderContext(
StorageConfiguration<?> storageConfiguration,
Supplier<InternalSchemaManager> internalSchemaManager,
- List<Predicate> predicates,
- HoodieTableConfig tableConfig) {
+ List<ExpressionPredicates.Predicate> predicates,
+ HoodieTableConfig tableConfig,
+ Option<InstantRange> instantRangeOpt) {
super(storageConfiguration, tableConfig);
- this.hoodieConfig = tableConfig;
+ this.tableConfig = tableConfig;
this.internalSchemaManager = internalSchemaManager;
this.predicates = predicates;
this.utcTimezone =
getStorageConfiguration().getBoolean(FlinkOptions.READ_UTC_TIMEZONE.key(),
FlinkOptions.READ_UTC_TIMEZONE.defaultValue());
+ this.instantRangeOpt = instantRangeOpt;
}
@Override
@@ -111,9 +122,32 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
HoodieRowDataParquetReader rowDataParquetReader =
(HoodieRowDataParquetReader) HoodieIOFactory.getIOFactory(storage)
.getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
- .getFileReader(hoodieConfig, filePath, HoodieFileFormat.PARQUET,
Option.empty());
+ .getFileReader(tableConfig, filePath, HoodieFileFormat.PARQUET,
Option.empty());
DataType rowType =
RowDataAvroQueryContexts.fromAvroSchema(dataSchema).getRowType();
- return rowDataParquetReader.getRowDataIterator(schemaManager, rowType,
requiredSchema);
+ return rowDataParquetReader.getRowDataIterator(schemaManager, rowType,
requiredSchema, predicates);
+ }
+
+ @Override
+ public void setSchemaHandler(FileGroupReaderSchemaHandler<RowData>
schemaHandler) {
+ super.setSchemaHandler(schemaHandler);
+
+ Option<String[]> recordKeysOpt = tableConfig.getRecordKeyFields();
+ if (recordKeysOpt.isEmpty()) {
+ return;
+ }
+ // primary key semantic is lost if not all primary key fields are included
in the request schema.
+ boolean pkSemanticLost = Arrays.stream(recordKeysOpt.get()).anyMatch(k ->
schemaHandler.getRequestedSchema().getField(k) == null);
+ if (pkSemanticLost) {
+ return;
+ }
+ // get primary key field position in required schema.
+ Schema requiredSchema = schemaHandler.getRequiredSchema();
+ int[] pkFieldsPos = Arrays.stream(recordKeysOpt.get())
+ .map(k ->
Option.ofNullable(requiredSchema.getField(k)).map(Schema.Field::pos).orElse(-1))
+ .mapToInt(Integer::intValue)
+ .toArray();
+ recordKeyRowConverter = new StringToRowDataConverter(
+ pkFieldsPos, (RowType)
RowDataAvroQueryContexts.fromAvroSchema(requiredSchema).getRowType().getLogicalType());
}
@Override
@@ -144,6 +178,11 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
}
}
+ @Override
+ public String getMetaFieldValue(RowData record, int pos) {
+ return record.getString(pos).toString();
+ }
+
@Override
public HoodieRecord<RowData> constructHoodieRecord(BufferedRecord<RowData>
bufferedRecord) {
HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(),
partitionPath);
@@ -256,6 +295,21 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
return (GenericRecord)
RowDataAvroQueryContexts.fromAvroSchema(schema).getRowDataToAvroConverter().convert(schema,
record);
}
+ @Override
+ public RowData getDeleteRow(RowData record, String recordKey) {
+ if (record != null) {
+ return record;
+ }
+ // don't need to emit record key row if primary key semantic is lost
+ if (recordKeyRowConverter == null) {
+ return null;
+ }
+ final String[] pkVals = KeyGenUtils.extractRecordKeys(recordKey);
+ RowData recordKeyRow = recordKeyRowConverter.convert(pkVals);
+ recordKeyRow.setRowKind(RowKind.DELETE);
+ return recordKeyRow;
+ }
+
@Override
public RowData convertAvroRecord(IndexedRecord avroRecord) {
Schema recordSchema = avroRecord.getSchema();
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index 811f68d61ed..b4e5d14a9e0 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -135,6 +135,7 @@ public class FormatUtils {
}
}
+ @Deprecated
public static HoodieMergedLogRecordScanner logScanner(
MergeOnReadInputSplit split,
Schema logSchema,
@@ -163,6 +164,7 @@ public class FormatUtils {
.build();
}
+ @Deprecated
public static HoodieMergedLogRecordScanner logScanner(
List<String> logPaths,
Schema logSchema,
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
index 2902aea8089..e68ebcc442b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.io.storage.row.parquet.ParquetSchemaConverter;
+import org.apache.hudi.source.ExpressionPredicates.Predicate;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.AvroSchemaConverter;
@@ -43,6 +44,7 @@ import org.apache.parquet.schema.MessageType;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -81,7 +83,7 @@ public class HoodieRowDataParquetReader implements
HoodieFileReader<RowData> {
@Override
public ClosableIterator<HoodieRecord<RowData>> getRecordIterator(Schema
readerSchema, Schema requestedSchema) throws IOException {
- ClosableIterator<RowData> rowDataItr =
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(),
requestedSchema);
+ ClosableIterator<RowData> rowDataItr =
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(),
requestedSchema, Collections.emptyList());
readerIterators.add(rowDataItr);
return new CloseableMappingIterator<>(rowDataItr, HoodieFlinkRecord::new);
}
@@ -89,12 +91,16 @@ public class HoodieRowDataParquetReader implements
HoodieFileReader<RowData> {
@Override
public ClosableIterator<String> getRecordKeyIterator() throws IOException {
Schema schema = HoodieAvroUtils.getRecordKeySchema();
- ClosableIterator<RowData> rowDataItr =
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), schema);
+ ClosableIterator<RowData> rowDataItr =
getRowDataIterator(InternalSchemaManager.DISABLED, getRowType(), schema,
Collections.emptyList());
return new CloseableMappingIterator<>(rowDataItr, rowData ->
Objects.toString(rowData.getString(0)));
}
- public ClosableIterator<RowData> getRowDataIterator(InternalSchemaManager
internalSchemaManager, DataType dataType, Schema requestedSchema) throws
IOException {
- return RecordIterators.getParquetRecordIterator(storage.getConf(),
internalSchemaManager, dataType, requestedSchema, path);
+ public ClosableIterator<RowData> getRowDataIterator(
+ InternalSchemaManager internalSchemaManager,
+ DataType dataType,
+ Schema requestedSchema,
+ List<Predicate> predicates) throws IOException {
+ return RecordIterators.getParquetRecordIterator(storage.getConf(),
internalSchemaManager, dataType, requestedSchema, path, predicates);
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
index 8fd0f83197b..120e50e5388 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
@@ -43,7 +43,6 @@ import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.hadoop.util.SerializationUtil;
import java.io.IOException;
-import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -64,7 +63,8 @@ public abstract class RecordIterators {
InternalSchemaManager internalSchemaManager,
DataType dataType,
Schema requestedSchema,
- StoragePath path) throws IOException {
+ StoragePath path,
+ List<Predicate> predicates) throws IOException {
List<String> fieldNames = ((RowType)
dataType.getLogicalType()).getFieldNames();
List<DataType> fieldTypes = dataType.getChildren();
int[] selectedFields =
requestedSchema.getFields().stream().map(Schema.Field::name)
@@ -87,7 +87,7 @@ public abstract class RecordIterators {
new org.apache.flink.core.fs.Path(path.toUri()),
0L,
Long.MAX_VALUE,
- Collections.emptyList());
+ predicates);
}
public static ClosableIterator<RowData> getParquetRecordIterator(
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index e8b766b4c46..84c905364ca 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -135,7 +135,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
return getBaseFileIteratorWithMetadata(split.getBasePath().get());
} else if (!split.getBasePath().isPresent()) {
// log files only
- return new LogFileOnlyIterator(getFullLogFileIterator(split));
+ return getFullLogFileIterator(split);
} else {
Schema tableSchema = new
Schema.Parser().parse(this.tableState.getAvroSchema());
return new MergeIterator(
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index dcd7a5f914e..ae35cee9616 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -34,6 +34,7 @@ import
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -42,10 +43,8 @@ import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.internal.schema.InternalSchema;
-import org.apache.hudi.keygen.KeyGenUtils;
import org.apache.hudi.source.ExpressionPredicates.Predicate;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.FlinkRowDataReaderContext;
@@ -58,11 +57,9 @@ import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.RowDataProjection;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.hudi.util.StreamerUtil;
-import org.apache.hudi.util.StringToRowDataConverter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
@@ -70,15 +67,12 @@ import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@@ -91,7 +85,6 @@ import java.util.stream.IntStream;
import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS;
import static
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS;
-import static org.apache.hudi.table.format.FormatUtils.buildAvroRecordBySchema;
/**
* The base InputFormat class to read from Hoodie data + log files.
@@ -208,51 +201,24 @@ public class MergeOnReadInputFormat
}
protected ClosableIterator<RowData> initIterator(MergeOnReadInputSplit
split) throws IOException {
- if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size()
> 0)) {
- if (split.getInstantRange().isPresent()) {
- // base file only with commit time filtering
- return new BaseFileOnlyFilteringIterator(
- split.getInstantRange().get(),
- this.tableState.getRequiredRowType(),
- this.requiredPos,
- getBaseFileIterator(split.getBasePath().get(),
getRequiredPosWithCommitTime(this.requiredPos)));
- } else {
- // base file only
- return getBaseFileIterator(split.getBasePath().get());
- }
- } else if (!split.getBasePath().isPresent()) {
- // log files only
+ String mergeType = split.getMergeType();
+ if (!split.getBasePath().isPresent()) {
if (OptionsResolver.emitDeletes(conf)) {
- return new LogFileOnlyIterator(getUnMergedLogFileIterator(split));
+ mergeType = FlinkOptions.REALTIME_SKIP_MERGE;
} else {
- return new LogFileOnlyIterator(getLogFileIterator(split));
+ // always merge records in log files if there is no base file (aligned
with legacy behaviour)
+ mergeType = FlinkOptions.REALTIME_PAYLOAD_COMBINE;
}
- } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
- return new SkipMergeIterator(
- getBaseFileIterator(split.getBasePath().get()),
- getLogFileIterator(split));
- } else if
(split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
- return new MergeIterator(
- conf,
- hadoopConf,
- split,
- this.tableState.getRowType(),
- this.tableState.getRequiredRowType(),
- new Schema.Parser().parse(this.tableState.getAvroSchema()),
- new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
- internalSchemaManager.getQuerySchema(),
- this.requiredPos,
- this.emitDelete,
- this.tableState.getOperationPos(),
- getBaseFileIteratorWithMetadata(split.getBasePath().get()));
- } else {
- throw new HoodieException("Unable to select an Iterator to read the
Hoodie MOR File Split for "
- + "file path: " + split.getBasePath()
- + "log paths: " + split.getLogPaths()
- + "hoodie table path: " + split.getTablePath()
- + "flink partition Index: " + split.getSplitNumber()
- + "merge type: " + split.getMergeType());
}
+ ValidationUtils.checkArgument(
+ mergeType.equals(FlinkOptions.REALTIME_SKIP_MERGE) ||
mergeType.equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE),
+ "Unable to select an Iterator to read the Hoodie MOR File Split for "
+ + "file path: " + split.getBasePath()
+ + "log paths: " + split.getLogPaths()
+ + "hoodie table path: " + split.getTablePath()
+ + "flink partition Index: " + split.getSplitNumber()
+ + "merge type: " + split.getMergeType());
+ return getSplitIterator(split, mergeType);
}
@Override
@@ -363,93 +329,15 @@ public class MergeOnReadInputFormat
predicates);
}
- private ClosableIterator<RowData> getLogFileIterator(MergeOnReadInputSplit
split) {
- final Schema tableSchema = new
Schema.Parser().parse(tableState.getAvroSchema());
- final Schema requiredSchema = new
Schema.Parser().parse(tableState.getRequiredAvroSchema());
- final GenericRecordBuilder recordBuilder = new
GenericRecordBuilder(requiredSchema);
- final AvroToRowDataConverters.AvroToRowDataConverter
avroToRowDataConverter =
-
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType(),
conf.getBoolean(FlinkOptions.READ_UTC_TIMEZONE));
- final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split,
tableSchema, internalSchemaManager.getQuerySchema(), conf, hadoopConf);
- final Iterator<String> logRecordsKeyIterator =
scanner.getRecords().keySet().iterator();
- final int[] pkOffset = tableState.getPkOffsetsInRequired();
- // flag saying whether the pk semantics has been dropped by user specified
- // projections. For e.g, if the pk fields are [a, b] but user only select
a,
- // then the pk semantics is lost.
- final boolean pkSemanticLost = Arrays.stream(pkOffset).anyMatch(offset ->
offset == -1);
- final LogicalType[] pkTypes = pkSemanticLost ? null :
tableState.getPkTypes(pkOffset);
- final StringToRowDataConverter converter = pkSemanticLost ? null : new
StringToRowDataConverter(pkTypes);
-
- return new ClosableIterator<RowData>() {
- private RowData currentRecord;
-
- @Override
- public boolean hasNext() {
- while (logRecordsKeyIterator.hasNext()) {
- String curAvroKey = logRecordsKeyIterator.next();
- Option<IndexedRecord> curAvroRecord = null;
- final HoodieAvroRecord<?> hoodieRecord = (HoodieAvroRecord)
scanner.getRecords().get(curAvroKey);
- try {
- curAvroRecord = hoodieRecord.getData().getInsertValue(tableSchema);
- } catch (IOException e) {
- throw new HoodieException("Get avro insert value error for key: "
+ curAvroKey, e);
- }
- if (!curAvroRecord.isPresent()) {
- // delete record found
- if (emitDelete && !pkSemanticLost) {
- GenericRowData delete = new
GenericRowData(tableState.getRequiredRowType().getFieldCount());
-
- final String recordKey = hoodieRecord.getRecordKey();
- final String[] pkFields =
KeyGenUtils.extractRecordKeys(recordKey);
- final Object[] converted = converter.convert(pkFields);
- for (int i = 0; i < pkOffset.length; i++) {
- delete.setField(pkOffset[i], converted[i]);
- }
- delete.setRowKind(RowKind.DELETE);
-
- this.currentRecord = delete;
- return true;
- }
- // skipping if the condition is unsatisfied
- // continue;
- } else {
- final IndexedRecord avroRecord = curAvroRecord.get();
- final RowKind rowKind = FormatUtils.getRowKindSafely(avroRecord,
tableState.getOperationPos());
- if (rowKind == RowKind.DELETE && !emitDelete) {
- // skip the delete record
- continue;
- }
- GenericRecord requiredAvroRecord = buildAvroRecordBySchema(
- avroRecord,
- requiredSchema,
- requiredPos,
- recordBuilder);
- currentRecord = (RowData)
avroToRowDataConverter.convert(requiredAvroRecord);
- currentRecord.setRowKind(rowKind);
- return true;
- }
- }
- return false;
- }
-
- @Override
- public RowData next() {
- return currentRecord;
- }
-
- @Override
- public void close() {
- scanner.close();
- }
- };
- }
-
/**
* Get record iterator using {@link HoodieFileGroupReader}.
*
* @param split input split
- * @return {@link RowData} iterator.
+ * @param mergeType merge type for FileGroup reader
+ *
+ * @return {@link RowData} iterator for the given split.
*/
- private ClosableIterator<RowData>
getUnMergedLogFileIterator(MergeOnReadInputSplit split) {
+ private ClosableIterator<RowData> getSplitIterator(MergeOnReadInputSplit
split, String mergeType) throws IOException {
final Schema tableSchema = new
Schema.Parser().parse(tableState.getAvroSchema());
final Schema requiredSchema = new
Schema.Parser().parse(tableState.getRequiredAvroSchema());
@@ -466,11 +354,12 @@ public class MergeOnReadInputFormat
HadoopFSUtils.getStorageConf(hadoopConf),
() -> internalSchemaManager,
predicates,
- metaClient.getTableConfig());
+ metaClient.getTableConfig(),
+ split.getInstantRange());
TypedProperties typedProps =
FlinkClientUtil.getMergedTableAndWriteProps(metaClient.getTableConfig(),
writeConfig);
- typedProps.put(HoodieReaderConfig.MERGE_TYPE.key(),
HoodieReaderConfig.REALTIME_SKIP_MERGE);
+ typedProps.put(HoodieReaderConfig.MERGE_TYPE.key(), mergeType);
- try (HoodieFileGroupReader<RowData> fileGroupReader =
HoodieFileGroupReader.<RowData>newBuilder()
+ HoodieFileGroupReader<RowData> fileGroupReader =
HoodieFileGroupReader.<RowData>newBuilder()
.withReaderContext(readerContext)
.withHoodieTableMetaClient(metaClient)
.withLatestCommitTime(split.getLatestCommit())
@@ -480,11 +369,9 @@ public class MergeOnReadInputFormat
.withInternalSchema(Option.ofNullable(internalSchemaManager.getQuerySchema()))
.withProps(typedProps)
.withShouldUseRecordPosition(false)
- .build()) {
- return fileGroupReader.getClosableIterator();
- } catch (IOException e) {
- throw new HoodieUpsertException("Failed to compact file slice: " +
fileSlice, e);
- }
+ .withEmitDelete(emitDelete)
+ .build();
+ return fileGroupReader.getClosableIterator();
}
protected static Option<IndexedRecord> getInsertVal(HoodieAvroRecord<?>
hoodieRecord, Schema tableSchema) {
@@ -551,130 +438,6 @@ public class MergeOnReadInputFormat
// -------------------------------------------------------------------------
// Inner Class
// -------------------------------------------------------------------------
- /**
- * Base record iterator with instant time filtering.
- */
- static class BaseFileOnlyFilteringIterator implements
ClosableIterator<RowData> {
- // base file record iterator
- private final ClosableIterator<RowData> nested;
- private final InstantRange instantRange;
- private final RowDataProjection projection;
-
- private RowData currentRecord;
-
- private int commitTimePos;
-
- BaseFileOnlyFilteringIterator(
- InstantRange instantRange,
- RowType requiredRowType,
- int[] requiredPos,
- ClosableIterator<RowData> nested) {
- this.nested = nested;
- this.instantRange = instantRange;
- this.commitTimePos = getCommitTimePos(requiredPos);
- int[] positions;
- if (commitTimePos < 0) {
- commitTimePos = 0;
- positions = IntStream.range(1, 1 + requiredPos.length).toArray();
- } else {
- positions = IntStream.range(0, requiredPos.length).toArray();
- }
- this.projection = RowDataProjection.instance(requiredRowType, positions);
- }
-
- @Override
- public boolean hasNext() {
- while (this.nested.hasNext()) {
- currentRecord = this.nested.next();
- boolean isInRange =
instantRange.isInRange(currentRecord.getString(commitTimePos).toString());
- if (isInRange) {
- return true;
- }
- }
- return false;
- }
-
- @Override
- public RowData next() {
- // can promote: no need to project with null instant range
- return projection.project(currentRecord);
- }
-
- @Override
- public void close() {
- if (this.nested != null) {
- this.nested.close();
- }
- }
- }
-
- protected static class LogFileOnlyIterator implements
ClosableIterator<RowData> {
- // iterator for log files
- private final ClosableIterator<RowData> iterator;
-
- public LogFileOnlyIterator(ClosableIterator<RowData> iterator) {
- this.iterator = iterator;
- }
-
- @Override
- public boolean hasNext() {
- return this.iterator.hasNext();
- }
-
- @Override
- public RowData next() {
- return this.iterator.next();
- }
-
- @Override
- public void close() {
- if (this.iterator != null) {
- this.iterator.close();
- }
- }
- }
-
- static class SkipMergeIterator implements ClosableIterator<RowData> {
- // base file record iterator
- private final ClosableIterator<RowData> nested;
- // iterator for log files
- private final ClosableIterator<RowData> iterator;
-
- private RowData currentRecord;
-
- SkipMergeIterator(ClosableIterator<RowData> nested,
ClosableIterator<RowData> iterator) {
- this.nested = nested;
- this.iterator = iterator;
- }
-
- @Override
- public boolean hasNext() {
- if (this.nested.hasNext()) {
- currentRecord = this.nested.next();
- return true;
- }
- if (this.iterator.hasNext()) {
- currentRecord = this.iterator.next();
- return true;
- }
- return false;
- }
-
- @Override
- public RowData next() {
- return currentRecord;
- }
-
- @Override
- public void close() {
- if (this.nested != null) {
- this.nested.close();
- }
- if (this.iterator != null) {
- this.iterator.close();
- }
- }
- }
protected static class MergeIterator implements ClosableIterator<RowData> {
// base file record iterator
@@ -703,26 +466,6 @@ public class MergeOnReadInputFormat
private RowData currentRecord;
- public MergeIterator(
- Configuration flinkConf,
- org.apache.hadoop.conf.Configuration hadoopConf,
- MergeOnReadInputSplit split,
- RowType tableRowType,
- RowType requiredRowType,
- Schema tableSchema,
- Schema requiredSchema,
- InternalSchema querySchema,
- int[] requiredPos,
- boolean emitDelete,
- int operationPos,
- ClosableIterator<RowData> nested) { // the iterator should be with
full schema
- this(flinkConf, hadoopConf, split, tableRowType, requiredRowType,
tableSchema,
- querySchema,
- Option.of(RowDataProjection.instance(requiredRowType, requiredPos)),
- Option.of(record -> buildAvroRecordBySchema(record, requiredSchema,
requiredPos, new GenericRecordBuilder(requiredSchema))),
- emitDelete, operationPos, nested);
- }
-
public MergeIterator(
Configuration flinkConf,
org.apache.hadoop.conf.Configuration hadoopConf,
@@ -905,25 +648,6 @@ public class MergeOnReadInputFormat
// Utilities
// -------------------------------------------------------------------------
- private static int[] getRequiredPosWithCommitTime(int[] requiredPos) {
- if (getCommitTimePos(requiredPos) >= 0) {
- return requiredPos;
- }
- int[] requiredPos2 = new int[requiredPos.length + 1];
- requiredPos2[0] = HOODIE_COMMIT_TIME_COL_POS;
- System.arraycopy(requiredPos, 0, requiredPos2, 1, requiredPos.length);
- return requiredPos2;
- }
-
- private static int getCommitTimePos(int[] requiredPos) {
- for (int i = 0; i < requiredPos.length; i++) {
- if (requiredPos[i] == HOODIE_COMMIT_TIME_COL_POS) {
- return i;
- }
- }
- return -1;
- }
-
@VisibleForTesting
public void isEmitDelete(boolean emitDelete) {
this.emitDelete = emitDelete;
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
index 36dfecbb79a..7f55b38b612 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
@@ -20,11 +20,9 @@ package org.apache.hudi.table.format.mor;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import java.io.Serializable;
-import java.util.Arrays;
import java.util.List;
/**
@@ -89,29 +87,4 @@ public class MergeOnReadTableState implements Serializable {
.mapToInt(i -> i)
.toArray();
}
-
- /**
- * Get the primary key positions in required row type.
- */
- public int[] getPkOffsetsInRequired() {
- final List<String> fieldNames = requiredRowType.getFieldNames();
- return Arrays.stream(pkFields)
- .map(fieldNames::indexOf)
- .mapToInt(i -> i)
- .toArray();
- }
-
- /**
- * Returns the primary key fields logical type with given offsets.
- *
- * @param pkOffsets the pk offsets in required row type
- * @return pk field logical types
- * @see #getPkOffsetsInRequired()
- */
- public LogicalType[] getPkTypes(int[] pkOffsets) {
- final LogicalType[] requiredTypes = requiredRowType.getFields().stream()
- .map(RowType.RowField::getType).toArray(LogicalType[]::new);
- return Arrays.stream(pkOffsets).mapToObj(offset -> requiredTypes[offset])
- .toArray(LogicalType[]::new);
- }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java
index 6c4aae3cd13..b9f1baf6482 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StringToRowDataConverter.java
@@ -22,10 +22,13 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import java.math.BigDecimal;
@@ -44,22 +47,26 @@ import static
org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
@Internal
public class StringToRowDataConverter {
private final Converter[] converters;
+ private final int[] fieldsPos;
+ private final int rowArity;
- public StringToRowDataConverter(LogicalType[] fieldTypes) {
- this.converters = Arrays.stream(fieldTypes)
- .map(StringToRowDataConverter::getConverter)
+ public StringToRowDataConverter(int[] fieldsPos, RowType rowType) {
+ this.fieldsPos = fieldsPos;
+ this.rowArity = rowType.getFieldCount();
+ this.converters = Arrays.stream(fieldsPos)
+ .mapToObj(f -> getConverter(rowType.getTypeAt(f)))
.toArray(Converter[]::new);
}
- public Object[] convert(String[] fields) {
+ public RowData convert(String[] fields) {
ValidationUtils.checkArgument(converters.length == fields.length,
"Field types and values should equal with number");
- Object[] converted = new Object[fields.length];
+ GenericRowData rowData = new GenericRowData(rowArity);
for (int i = 0; i < fields.length; i++) {
- converted[i] = converters[i].convert(fields[i]);
+ rowData.setField(fieldsPos[i], converters[i].convert(fields[i]));
}
- return converted;
+ return rowData;
}
private interface Converter {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
index 0d3fde90155..e915581b1a4 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
@@ -110,7 +110,8 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
storageConf,
() -> InternalSchemaManager.DISABLED,
Collections.emptyList(),
- metaClient.getTableConfig());
+ metaClient.getTableConfig(),
+ Option.empty());
}
@Override
@@ -164,7 +165,7 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
when(tableConfig.populateMetaFields()).thenReturn(true);
FlinkRowDataReaderContext readerContext =
- new FlinkRowDataReaderContext(getStorageConf(), () ->
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+ new FlinkRowDataReaderContext(getStorageConf(), () ->
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig,
Option.empty());
Schema schema = SchemaBuilder.builder()
.record("test")
.fields()
@@ -182,7 +183,7 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
HoodieTableConfig tableConfig = Mockito.mock(HoodieTableConfig.class);
when(tableConfig.populateMetaFields()).thenReturn(true);
FlinkRowDataReaderContext readerContext =
- new FlinkRowDataReaderContext(getStorageConf(), () ->
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+ new FlinkRowDataReaderContext(getStorageConf(), () ->
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig,
Option.empty());
Schema schema = SchemaBuilder.builder()
.record("test")
.fields()
@@ -200,7 +201,7 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
when(tableConfig.populateMetaFields()).thenReturn(false);
when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"field1"}));
FlinkRowDataReaderContext readerContext =
- new FlinkRowDataReaderContext(getStorageConf(), () ->
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+ new FlinkRowDataReaderContext(getStorageConf(), () ->
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig,
Option.empty());
Schema schema = SchemaBuilder.builder()
.record("test")
.fields()
@@ -218,7 +219,7 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
when(tableConfig.populateMetaFields()).thenReturn(false);
when(tableConfig.getRecordKeyFields()).thenReturn(Option.of(new String[]
{"field1", "field2"}));
FlinkRowDataReaderContext readerContext =
- new FlinkRowDataReaderContext(getStorageConf(), () ->
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig);
+ new FlinkRowDataReaderContext(getStorageConf(), () ->
InternalSchemaManager.DISABLED, Collections.emptyList(), tableConfig,
Option.empty());
Schema schema = SchemaBuilder.builder()
.record("test")
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index bbf7e0415bf..e5a497ee8ab 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -304,6 +304,31 @@ public class TestInputFormat {
assertThat(actual2, is(expected));
}
+ @Test
+ void testReadWithDeletes() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "uuid");
+ beforeEach(HoodieTableType.MERGE_ON_READ, options);
+
+ // write another commit to read again
+ TestData.writeData(TestData.DATA_SET_UPDATE_DELETE, conf);
+
+ InputFormat<RowData, ?> inputFormat = this.tableSource.getInputFormat();
+ assertThat(inputFormat, instanceOf(MergeOnReadInputFormat.class));
+ ((MergeOnReadInputFormat) inputFormat).isEmitDelete(true);
+
+ List<RowData> result = readData(inputFormat);
+
+ final String actual = TestData.rowDataToString(result);
+ final String expected = "["
+ + "+I[id1, Danny, 24, 1970-01-01T00:00:00.001, par1], "
+ + "+I[id2, Stephen, 34, 1970-01-01T00:00:00.002, par1], "
+ + "-D[id3, null, null, null, null], "
+ + "-D[id5, null, null, null, null], "
+ + "-D[id9, null, null, null, null]]";
+ assertThat(actual, is(expected));
+ }
+
@Test
void testReadWithDeletesMOR() throws Exception {
Map<String, String> options = new HashMap<>();
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 1c671e371d9..ad8d2d4080c 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -982,6 +982,7 @@ public class TestData {
/**
* Returns the scanner to read avro log files.
*/
+ @Deprecated
private static HoodieMergedLogRecordScanner getScanner(
HoodieStorage storage,
String basePath,
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java
index 8f7ecad1384..b5f140b72e2 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStringToRowDataConverter.java
@@ -27,6 +27,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -41,7 +42,7 @@ import java.time.temporal.ChronoField;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Test cases for {@link StringToRowDataConverter}.
@@ -59,8 +60,9 @@ public class TestStringToRowDataConverter {
DataTypes.TIMESTAMP(6).getLogicalType(),
DataTypes.DECIMAL(7, 2).getLogicalType()
};
- StringToRowDataConverter converter = new
StringToRowDataConverter(fieldTypes);
- Object[] converted = converter.convert(fields);
+ RowType rowType = RowType.of(fieldTypes);
+ StringToRowDataConverter converter = new StringToRowDataConverter(new
int[]{0, 1, 2, 3, 4, 5, 6}, rowType);
+ RowData actual = converter.convert(fields);
Object[] expected = new Object[] {
1.1f, 3.4D, (int) LocalDate.parse("2021-03-30").toEpochDay(),
LocalTime.parse("15:44:29").get(ChronoField.MILLI_OF_DAY),
@@ -68,7 +70,8 @@ public class TestStringToRowDataConverter {
TimestampData.fromInstant(Instant.parse("2021-03-30T15:44:29.666111Z")),
DecimalData.fromBigDecimal(new BigDecimal("12345.67"), 7, 2)
};
- assertArrayEquals(expected, converted);
+ GenericRowData expectedRow = GenericRowData.of(expected);
+ assertEquals(expectedRow, actual);
}
@Test
@@ -97,15 +100,10 @@ public class TestStringToRowDataConverter {
GenericRecord avroRecord =
(GenericRecord)
converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData);
StringToRowDataConverter stringToRowDataConverter =
- new StringToRowDataConverter(rowType.getChildren().toArray(new
LogicalType[0]));
+ new StringToRowDataConverter(new int[]{0, 1, 2, 3, 4, 5, 6}, rowType);
final String recordKey = KeyGenUtils.getRecordKey(avroRecord,
rowType.getFieldNames(), false);
final String[] recordKeys = KeyGenUtils.extractRecordKeys(recordKey);
- Object[] convertedKeys = stringToRowDataConverter.convert(recordKeys);
-
- GenericRowData converted = new GenericRowData(7);
- for (int i = 0; i < 7; i++) {
- converted.setField(i, convertedKeys[i]);
- }
+ RowData converted = stringToRowDataConverter.convert(recordKeys);
assertThat(converted, is(rowData));
}
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index fa051360b3f..8941d6bed34 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -177,6 +177,11 @@ public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable>
return objectInspectorCache.serialize(record, schema);
}
+ @Override
+ public ArrayWritable getDeleteRow(ArrayWritable record, String recordKey) {
+ throw new UnsupportedOperationException("Not supported for " +
this.getClass().getSimpleName());
+ }
+
@Override
public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode,
String mergeStrategyId, String mergeImplClasses) {
// TODO(HUDI-7843):
@@ -202,6 +207,11 @@ public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable>
return StringUtils.isNullOrEmpty(fieldName) ? null :
objectInspectorCache.getValue(record, schema, fieldName);
}
+ @Override
+ public String getMetaFieldValue(ArrayWritable record, int pos) {
+ return record.get()[pos].toString();
+ }
+
@Override
public boolean castToBoolean(Object value) {
if (value instanceof BooleanWritable) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
index 47e7f42d12d..4211b24f16c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
@@ -138,7 +138,8 @@ public class TestPositionBasedFileGroupRecordBuffer extends
TestHoodieFileGroupR
baseFileInstantTime,
props,
readStats,
- Option.of("timestamp"));
+ Option.of("timestamp"),
+ false);
}
public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader(boolean
shouldWriteRecordPositions,