This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 33dd7afcc7a [HUDI-9022] Handle records with custom delete markers in
FG reader (#12843)
33dd7afcc7a is described below
commit 33dd7afcc7a23c68b7ce4bdfaeab6ec596d0c742
Author: Lin Liu <[email protected]>
AuthorDate: Tue Mar 25 10:27:50 2025 -0700
[HUDI-9022] Handle records with custom delete markers in FG reader (#12843)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../SparkFileFormatInternalRowReaderContext.scala | 2 +-
.../SparkClientFunctionalTestHarness.java | 11 +-
.../hudi/common/engine/HoodieReaderContext.java | 21 +-
.../table/log/BaseHoodieLogRecordReader.java | 6 +-
.../table/log/HoodieMergedLogRecordReader.java | 8 +-
...dler.java => FileGroupReaderSchemaHandler.java} | 123 ++++++--
...ecordBuffer.java => FileGroupRecordBuffer.java} | 58 +++-
.../common/table/read/HoodieFileGroupReader.java | 34 +--
...fer.java => KeyBasedFileGroupRecordBuffer.java} | 42 ++-
...ava => PositionBasedFileGroupRecordBuffer.java} | 44 ++-
...andler.java => PositionBasedSchemaHandler.java} | 18 +-
...fer.java => UnmergedFileGroupRecordBuffer.java} | 4 +-
.../table/read/TestFileGroupRecordBuffer.java | 328 +++++++++++++++++++++
.../table/read/TestHoodieFileGroupReaderBase.java | 2 +-
.../read/TestHoodieFileGroupRecordBuffer.java | 141 ---------
.../hudi/hadoop/HiveHoodieReaderContext.java | 10 +
...stSparkFileFormatInternalRowReaderContext.scala | 2 +-
...=> TestPositionBasedFileGroupRecordBuffer.java} | 12 +-
.../read/TestHoodieFileGroupReaderOnSpark.scala | 145 ++++++++-
19 files changed, 747 insertions(+), 264 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index fb1913bf80e..7eb154ddd0a 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.avro.AvroSchemaUtils.isNullable
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
-import
org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
+import
org.apache.hudi.common.table.read.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.collection.{CachingIterator,
ClosableIterator}
import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory,
HoodieSparkParquetReader}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index aba8573073c..97c79534787 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -447,7 +447,9 @@ public class SparkClientFunctionalTestHarness implements
SparkProvider, HoodieMe
* @param validateColumns columns to validate
* @return true if dataframes are equal, false otherwise
*/
- public static boolean areDataframesEqual(Dataset<Row> expectedDf,
Dataset<Row> actualDf, Set<String> validateColumns) {
+ public static boolean areDataframesEqual(Dataset<Row> expectedDf,
+ Dataset<Row> actualDf,
+ Set<String> validateColumns) {
// Normalize schema order
String[] sortedColumnNames = Arrays.stream(expectedDf.columns())
.filter(validateColumns::contains).sorted().toArray(String[]::new);
@@ -456,11 +458,8 @@ public class SparkClientFunctionalTestHarness implements
SparkProvider, HoodieMe
Dataset<Row> df1Normalized = expectedDf.selectExpr(sortedColumnNames);
Dataset<Row> df2Normalized = actualDf.selectExpr(sortedColumnNames);
- // Sort rows
- Dataset<Row> df1Sorted = df1Normalized.sort("_row_key");
- Dataset<Row> df2Sorted = df2Normalized.sort("_row_key");
-
// Check for differences
- return df1Sorted.except(df2Sorted).isEmpty() &&
df2Sorted.except(df1Sorted).isEmpty();
+ return df1Normalized.except(df2Normalized).isEmpty()
+ && df2Normalized.except(df1Normalized).isEmpty();
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index d2103513f8d..2751998b111 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -22,7 +22,7 @@ package org.apache.hudi.common.engine;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.table.read.HoodieFileGroupReaderSchemaHandler;
+import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
import org.apache.hudi.common.util.LocalAvroSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -59,7 +59,7 @@ import static
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIEL
*/
public abstract class HoodieReaderContext<T> implements Closeable {
- private HoodieFileGroupReaderSchemaHandler<T> schemaHandler = null;
+ private FileGroupReaderSchemaHandler<T> schemaHandler = null;
private String tablePath = null;
private String latestCommitTime = null;
private Option<HoodieRecordMerger> recordMerger = null;
@@ -72,11 +72,11 @@ public abstract class HoodieReaderContext<T> implements
Closeable {
private final LocalAvroSchemaCache localAvroSchemaCache =
LocalAvroSchemaCache.getInstance();
// Getter and Setter for schemaHandler
- public HoodieFileGroupReaderSchemaHandler<T> getSchemaHandler() {
+ public FileGroupReaderSchemaHandler<T> getSchemaHandler() {
return schemaHandler;
}
- public void setSchemaHandler(HoodieFileGroupReaderSchemaHandler<T>
schemaHandler) {
+ public void setSchemaHandler(FileGroupReaderSchemaHandler<T> schemaHandler) {
this.schemaHandler = schemaHandler;
}
@@ -215,6 +215,19 @@ public abstract class HoodieReaderContext<T> implements
Closeable {
*/
public abstract Object getValue(T record, Schema schema, String fieldName);
+ /**
+ * Cast to Java boolean value.
+ * If the object is not compatible with boolean type, throws.
+ */
+ public boolean castToBoolean(Object value) {
+ if (value instanceof Boolean) {
+ return (boolean) value;
+ } else {
+ throw new IllegalArgumentException(
+ "Input value type " + value.getClass() + ", cannot be cast to
boolean");
+ }
+ }
+
/**
* Gets the record key in String.
*
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index 346472411bb..b74a0c9b92f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -30,7 +30,7 @@ import
org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.common.table.read.HoodieFileGroupRecordBuffer;
+import org.apache.hudi.common.table.read.FileGroupRecordBuffer;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -135,7 +135,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
private final List<String> validBlockInstants = new ArrayList<>();
// Use scanV2 method.
private final boolean enableOptimizedLogBlocksScan;
- protected HoodieFileGroupRecordBuffer<T> recordBuffer;
+ protected FileGroupRecordBuffer<T> recordBuffer;
protected BaseHoodieLogRecordReader(HoodieReaderContext readerContext,
HoodieStorage storage,
@@ -145,7 +145,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
Option<String> partitionNameOverride,
Option<String> keyFieldOverride,
boolean enableOptimizedLogBlocksScan,
- HoodieFileGroupRecordBuffer<T>
recordBuffer) {
+ FileGroupRecordBuffer<T> recordBuffer) {
this.readerContext = readerContext;
this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
this.latestInstantTime = readerContext.getLatestCommitTime();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index 6c4dbf936f3..cffe48b6a24 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -21,7 +21,7 @@ package org.apache.hudi.common.table.log;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.table.read.HoodieFileGroupRecordBuffer;
+import org.apache.hudi.common.table.read.FileGroupRecordBuffer;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
@@ -70,7 +70,7 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
Option<String> partitionName,
Option<String> keyFieldOverride,
boolean enableOptimizedLogBlocksScan,
- HoodieFileGroupRecordBuffer<T>
recordBuffer) {
+ FileGroupRecordBuffer<T> recordBuffer) {
super(readerContext, storage, logFilePaths, reverseReader, bufferSize,
instantRange, withOperationField,
forceFullScan, partitionName, keyFieldOverride,
enableOptimizedLogBlocksScan, recordBuffer);
this.scannedPrefixes = new HashSet<>();
@@ -219,7 +219,7 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
private boolean forceFullScan = true;
private boolean enableOptimizedLogBlocksScan = false;
- private HoodieFileGroupRecordBuffer<T> recordBuffer;
+ private FileGroupRecordBuffer<T> recordBuffer;
@Override
public Builder<T> withHoodieReaderContext(HoodieReaderContext<T>
readerContext) {
@@ -287,7 +287,7 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
return this;
}
- public Builder<T> withRecordBuffer(HoodieFileGroupRecordBuffer<T>
recordBuffer) {
+ public Builder<T> withRecordBuffer(FileGroupRecordBuffer<T> recordBuffer) {
this.recordBuffer = recordBuffer;
return this;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
similarity index 65%
rename from
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
rename to
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
index 15f7b161518..a73398f2240 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
@@ -26,8 +26,8 @@ import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.util.LocalAvroSchemaCache;
import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.LocalAvroSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
@@ -41,9 +41,11 @@ import org.apache.avro.Schema;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -51,13 +53,16 @@ import java.util.stream.Stream;
import static
org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchemaDedupNested;
import static
org.apache.hudi.avro.AvroSchemaUtils.createNewSchemaFromFieldsWithReference;
import static org.apache.hudi.avro.AvroSchemaUtils.findNestedField;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static
org.apache.hudi.common.model.HoodieRecord.HOODIE_IS_DELETED_FIELD;
/**
* This class is responsible for handling the schema for the file group reader.
*/
-public class HoodieFileGroupReaderSchemaHandler<T> {
+public class FileGroupReaderSchemaHandler<T> {
- protected final Schema dataSchema;
+ protected final Schema tableSchema;
// requestedSchema: the schema that the caller requests
protected final Schema requestedSchema;
@@ -84,20 +89,26 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
private final LocalAvroSchemaCache localAvroSchemaCache;
- public HoodieFileGroupReaderSchemaHandler(HoodieReaderContext<T>
readerContext,
- Schema dataSchema,
- Schema requestedSchema,
- Option<InternalSchema>
internalSchemaOpt,
- HoodieTableConfig
hoodieTableConfig,
- TypedProperties properties) {
+ private final Option<Pair<String, String>> customDeleteMarkerKeyValue;
+ private final boolean hasBuiltInDelete;
+
+ public FileGroupReaderSchemaHandler(HoodieReaderContext<T> readerContext,
+ Schema tableSchema,
+ Schema requestedSchema,
+ Option<InternalSchema> internalSchemaOpt,
+ HoodieTableConfig hoodieTableConfig,
+ TypedProperties properties) {
this.properties = properties;
this.readerContext = readerContext;
this.hasBootstrapBaseFile = readerContext.getHasBootstrapBaseFile();
this.needsMORMerge = readerContext.getHasLogFiles();
this.recordMerger = readerContext.getRecordMerger();
- this.dataSchema = dataSchema;
+ this.tableSchema = tableSchema;
this.requestedSchema = AvroSchemaCache.intern(requestedSchema);
this.hoodieTableConfig = hoodieTableConfig;
+ Pair<Option<Pair<String, String>>, Boolean> deleteConfigs =
getDeleteConfigs(properties, tableSchema);
+ this.customDeleteMarkerKeyValue = deleteConfigs.getLeft();
+ this.hasBuiltInDelete = deleteConfigs.getRight();
this.requiredSchema = AvroSchemaCache.intern(prepareRequiredSchema());
this.internalSchema = pruneInternalSchema(requiredSchema,
internalSchemaOpt);
this.internalSchemaOpt = getInternalSchemaOpt(internalSchemaOpt);
@@ -105,8 +116,8 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
this.localAvroSchemaCache = LocalAvroSchemaCache.getInstance();
}
- public Schema getDataSchema() {
- return this.dataSchema;
+ public Schema getTableSchema() {
+ return this.tableSchema;
}
public Schema getRequestedSchema() {
@@ -132,6 +143,14 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
return Option.empty();
}
+ public Option<Pair<String, String>> getCustomDeleteMarkerKeyValue() {
+ return customDeleteMarkerKeyValue;
+ }
+
+ public boolean hasBuiltInDelete() {
+ return hasBuiltInDelete;
+ }
+
private InternalSchema pruneInternalSchema(Schema requiredSchema,
Option<InternalSchema> internalSchemaOption) {
if (!internalSchemaOption.isPresent()) {
return InternalSchema.getEmptyInternalSchema();
@@ -161,14 +180,16 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
if (hoodieTableConfig.getRecordMergeMode() == RecordMergeMode.CUSTOM) {
if (!recordMerger.get().isProjectionCompatible()) {
- return dataSchema;
+ return tableSchema;
}
}
List<Schema.Field> addedFields = new ArrayList<>();
- for (String field : getMandatoryFieldsForMerging(hoodieTableConfig,
properties, dataSchema, recordMerger)) {
+ for (String field : getMandatoryFieldsForMerging(
+ hoodieTableConfig, properties, tableSchema, recordMerger,
+ hasBuiltInDelete, customDeleteMarkerKeyValue)) {
if (!findNestedField(requestedSchema, field).isPresent()) {
- Option<Schema.Field> foundFieldOpt = findNestedField(dataSchema,
field);
+ Option<Schema.Field> foundFieldOpt = findNestedField(tableSchema,
field);
if (!foundFieldOpt.isPresent()) {
throw new IllegalArgumentException("Field: " + field + " does not
exist in the table schema");
}
@@ -184,8 +205,12 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
return appendFieldsToSchemaDedupNested(requestedSchema, addedFields);
}
- private static String[] getMandatoryFieldsForMerging(HoodieTableConfig cfg,
TypedProperties props,
- Schema dataSchema,
Option<HoodieRecordMerger> recordMerger) {
+ private static String[] getMandatoryFieldsForMerging(HoodieTableConfig cfg,
+ TypedProperties props,
+ Schema tableSchema,
+
Option<HoodieRecordMerger> recordMerger,
+ boolean
hasBuiltInDelete,
+ Option<Pair<String,
String>> customDeleteMarkerKeyAndValue) {
Triple<RecordMergeMode, String, String> mergingConfigs =
HoodieTableConfig.inferCorrectMergingBehavior(
cfg.getRecordMergeMode(),
cfg.getPayloadClass(),
@@ -194,11 +219,12 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
HoodieTableVersion.current());
if (mergingConfigs.getLeft() == RecordMergeMode.CUSTOM) {
- return recordMerger.get().getMandatoryFieldsForMerging(dataSchema, cfg,
props);
+ return recordMerger.get().getMandatoryFieldsForMerging(tableSchema, cfg,
props);
}
- ArrayList<String> requiredFields = new ArrayList<>();
-
+ // Use Set to avoid duplicated fields.
+ Set<String> requiredFields = new HashSet<>();
+ // Add record key fields.
if (cfg.populateMetaFields()) {
requiredFields.add(HoodieRecord.RECORD_KEY_METADATA_FIELD);
} else {
@@ -207,17 +233,22 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
requiredFields.addAll(Arrays.asList(fields.get()));
}
}
-
+ // Add precombine field for event time ordering merge mode.
if (mergingConfigs.getLeft() == RecordMergeMode.EVENT_TIME_ORDERING) {
String preCombine = cfg.getPreCombineField();
if (!StringUtils.isNullOrEmpty(preCombine)) {
requiredFields.add(preCombine);
}
}
-
- if (dataSchema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD) != null) {
+ // Add `HOODIE_IS_DELETED_FIELD` field if exists.
+ if (hasBuiltInDelete) {
requiredFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
}
+ // Add custom delete key field if exists.
+ if (customDeleteMarkerKeyAndValue.isPresent()) {
+ requiredFields.add(customDeleteMarkerKeyAndValue.get().getLeft());
+ }
+
return requiredFields.toArray(new String[0]);
}
@@ -235,14 +266,14 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
}
public Pair<List<Schema.Field>,List<Schema.Field>> getBootstrapDataFields() {
- return getDataAndMetaCols(dataSchema);
+ return getDataAndMetaCols(tableSchema);
}
@VisibleForTesting
static Pair<List<Schema.Field>, List<Schema.Field>>
getDataAndMetaCols(Schema schema) {
Map<Boolean, List<Schema.Field>> fieldsByMeta = schema.getFields().stream()
//if there are no data fields, then we don't want to think the temp
col is a data col
- .filter(f -> !Objects.equals(f.name(),
HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME))
+ .filter(f -> !Objects.equals(f.name(),
PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME))
.collect(Collectors.partitioningBy(f ->
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name())));
return Pair.of(fieldsByMeta.getOrDefault(true, Collections.emptyList()),
fieldsByMeta.getOrDefault(false, Collections.emptyList()));
@@ -254,6 +285,46 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
Schema.Field curr = fields.get(i);
fields.set(i, new Schema.Field(curr.name(), curr.schema(), curr.doc(),
curr.defaultVal()));
}
- return createNewSchemaFromFieldsWithReference(dataSchema, fields);
+ return createNewSchemaFromFieldsWithReference(tableSchema, fields);
+ }
+
+ /**
+ * Fetches the delete configs from the configs.
+ *
+ * @param props write and table configs that contain delete related
properties
+ * @param tableSchema table schema
+ * @return a pair of custom delete marker key, value, and whether built-in
delete marker
+ * (`_hoodie_is_deleted`) is included.
+ */
+ private static Pair<Option<Pair<String, String>>, Boolean>
getDeleteConfigs(TypedProperties props,
+
Schema tableSchema) {
+ String deleteKey = props.getProperty(DELETE_KEY);
+ String deleteMarker = props.getProperty(DELETE_MARKER);
+ boolean deleteKeyExists = !StringUtils.isNullOrEmpty(deleteKey);
+ boolean deleteMarkerExists = !StringUtils.isNullOrEmpty(deleteMarker);
+
+ Option<Pair<String, String>> customDeleteMarkerKeyAndValue;
+ // DELETE_KEY and DELETE_MARKER both should be set.
+ if (deleteKeyExists && deleteMarkerExists) {
+ // DELETE_KEY field exists in the schema.
+ customDeleteMarkerKeyAndValue = Option.of(Pair.of(deleteKey,
deleteMarker));
+ } else if (!deleteKeyExists && !deleteMarkerExists) {
+ // Normal case.
+ customDeleteMarkerKeyAndValue = Option.empty();
+ } else {
+ throw new IllegalArgumentException("Either custom delete key or marker
is not specified");
+ }
+ return Pair.of(customDeleteMarkerKeyAndValue,
hasBuiltInDeleteField(tableSchema));
+ }
+
+ /**
+ * Check if "_hoodie_is_deleted" field (built-in deletes) exists in the
schema.
+ * Assume the type of this column is boolean.
+ *
+ * @param schema table schema to check
+ * @return whether built-in delete field is included in the table schema
+ */
+ private static boolean hasBuiltInDeleteField(Schema schema) {
+ return schema.getField(HOODIE_IS_DELETED_FIELD) != null;
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
similarity index 93%
rename from
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
rename to
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
index 528c2083136..38ac956e07b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
@@ -79,7 +79,7 @@ import static
org.apache.hudi.common.model.HoodieRecord.OPERATION_METADATA_FIELD
import static
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
-public abstract class HoodieBaseFileGroupRecordBuffer<T> implements
HoodieFileGroupRecordBuffer<T> {
+public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordBuffer<T> {
protected final HoodieReaderContext<T> readerContext;
protected final Schema readerSchema;
protected final Option<String> orderingFieldName;
@@ -91,6 +91,8 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
protected final TypedProperties props;
protected final ExternalSpillableMap<Serializable, Pair<Option<T>,
Map<String, Object>>> records;
protected final HoodieReadStats readStats;
+ protected final boolean shouldCheckCustomDeleteMarker;
+ protected final boolean shouldCheckBuiltInDeleteMarker;
protected ClosableIterator<T> baseFileIterator;
protected Iterator<Pair<Option<T>, Map<String, Object>>> logRecordIterator;
protected T nextRecord;
@@ -98,13 +100,13 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
protected InternalSchema internalSchema;
protected HoodieTableMetaClient hoodieTableMetaClient;
- public HoodieBaseFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
- HoodieTableMetaClient
hoodieTableMetaClient,
- RecordMergeMode recordMergeMode,
- Option<String>
partitionNameOverrideOpt,
- Option<String[]>
partitionPathFieldOpt,
- TypedProperties props,
- HoodieReadStats readStats) {
+ protected FileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
+ HoodieTableMetaClient hoodieTableMetaClient,
+ RecordMergeMode recordMergeMode,
+ Option<String> partitionNameOverrideOpt,
+ Option<String[]> partitionPathFieldOpt,
+ TypedProperties props,
+ HoodieReadStats readStats) {
this.readerContext = readerContext;
this.readerSchema =
AvroSchemaCache.intern(readerContext.getSchemaHandler().getRequiredSchema());
this.partitionNameOverrideOpt = partitionNameOverrideOpt;
@@ -143,6 +145,40 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
} catch (IOException e) {
throw new HoodieIOException("IOException when creating
ExternalSpillableMap at " + spillableMapBasePath, e);
}
+ this.shouldCheckCustomDeleteMarker =
+
readerContext.getSchemaHandler().getCustomDeleteMarkerKeyValue().isPresent();
+ this.shouldCheckBuiltInDeleteMarker =
+ readerContext.getSchemaHandler().hasBuiltInDelete();
+ }
+
+ /**
+ * Here we assume that delete marker column type is of string.
+ * This should be sufficient for most cases.
+ */
+ protected final boolean isCustomDeleteRecord(T record) {
+ if (!shouldCheckCustomDeleteMarker) {
+ return false;
+ }
+
+ Pair<String, String> markerKeyValue =
+ readerContext.getSchemaHandler().getCustomDeleteMarkerKeyValue().get();
+ Object deleteMarkerValue =
+ readerContext.getValue(record, readerSchema, markerKeyValue.getLeft());
+ return deleteMarkerValue != null
+ && markerKeyValue.getRight().equals(deleteMarkerValue.toString());
+ }
+
+ /**
+ * Check if the value of column "_hoodie_is_deleted" is true.
+ */
+ protected final boolean isBuiltInDeleteRecord(T record) {
+ if (!shouldCheckBuiltInDeleteMarker) {
+ return false;
+ }
+
+ Object columnValue = readerContext.getValue(
+ record, readerSchema, HOODIE_IS_DELETED_FIELD);
+ return columnValue != null && readerContext.castToBoolean(columnValue);
}
@Override
@@ -244,8 +280,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
Comparable incomingOrderingValue = readerContext.getOrderingValue(
Option.of(record), metadata, readerSchema, orderingFieldName);
if (incomingOrderingValue.compareTo(existingOrderingValue) > 0) {
- return Option.of(Pair.of(isDeleteRecord(Option.of(record),
readerContext.getSchemaFromMetadata(metadata))
- ? Option.empty() : Option.of(record), metadata));
+ return Option.of(Pair.of(Option.of(record), metadata));
}
return Option.empty();
case CUSTOM:
@@ -263,7 +298,6 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
return
Option.of(Pair.of(Option.ofNullable(combinedRecordData), metadata));
}
}
-
return Option.empty();
} else {
Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
recordMerger.get().merge(
@@ -285,7 +319,6 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
if (combinedRecord.getData() !=
existingRecordMetadataPair.getLeft().get()) {
return
Option.of(Pair.of(Option.ofNullable(combinedRecord.getData()), metadata));
}
-
return Option.empty();
}
}
@@ -445,7 +478,6 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
}
return
Option.ofNullable(readerContext.convertAvroRecord(indexedRecord));
}
-
return Option.empty();
} else {
Option<Pair<HoodieRecord, Schema>> mergedRecord =
recordMerger.get().merge(
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 3d6c23182c7..94d63aac81b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -78,7 +78,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
// Length of bytes to read from the base file
private final long length;
// Core structure to store and process records.
- private final HoodieFileGroupRecordBuffer<T> recordBuffer;
+ private final FileGroupRecordBuffer<T> recordBuffer;
private ClosableIterator<T> baseFileIterator;
private final Option<UnaryOperator<T>> outputConverter;
private final HoodieReadStats readStats;
@@ -127,8 +127,8 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
}
readerContext.setHasBootstrapBaseFile(hoodieBaseFileOption.isPresent() &&
hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent());
readerContext.setSchemaHandler(readerContext.supportsParquetRowIndex()
- ? new HoodiePositionBasedSchemaHandler<>(readerContext, dataSchema,
requestedSchema, internalSchemaOpt, tableConfig, props)
- : new HoodieFileGroupReaderSchemaHandler<>(readerContext, dataSchema,
requestedSchema, internalSchemaOpt, tableConfig, props));
+ ? new PositionBasedSchemaHandler<>(readerContext, dataSchema,
requestedSchema, internalSchemaOpt, tableConfig, props)
+ : new FileGroupReaderSchemaHandler<>(readerContext, dataSchema,
requestedSchema, internalSchemaOpt, tableConfig, props));
this.outputConverter =
readerContext.getSchemaHandler().getOutputConverter();
this.readStats = new HoodieReadStats();
this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
@@ -139,26 +139,26 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
/**
* Initialize correct record buffer
*/
- private static HoodieFileGroupRecordBuffer
getRecordBuffer(HoodieReaderContext readerContext,
-
HoodieTableMetaClient hoodieTableMetaClient,
- RecordMergeMode
recordMergeMode,
- TypedProperties
props,
-
Option<HoodieBaseFile> baseFileOption,
- boolean
hasNoLogFiles,
- boolean
isSkipMerge,
- boolean
shouldUseRecordPosition,
- HoodieReadStats
readStats) {
+ private static FileGroupRecordBuffer getRecordBuffer(HoodieReaderContext
readerContext,
+ HoodieTableMetaClient
hoodieTableMetaClient,
+ RecordMergeMode
recordMergeMode,
+ TypedProperties props,
+ Option<HoodieBaseFile>
baseFileOption,
+ boolean hasNoLogFiles,
+ boolean isSkipMerge,
+ boolean
shouldUseRecordPosition,
+ HoodieReadStats
readStats) {
if (hasNoLogFiles) {
return null;
} else if (isSkipMerge) {
- return new HoodieUnmergedFileGroupRecordBuffer<>(
+ return new UnmergedFileGroupRecordBuffer<>(
readerContext, hoodieTableMetaClient, recordMergeMode,
Option.empty(), Option.empty(), props, readStats);
} else if (shouldUseRecordPosition && baseFileOption.isPresent()) {
- return new HoodiePositionBasedFileGroupRecordBuffer<>(
+ return new PositionBasedFileGroupRecordBuffer<>(
readerContext, hoodieTableMetaClient, recordMergeMode,
Option.empty(),
Option.empty(), baseFileOption.get().getCommitTime(), props,
readStats);
} else {
- return new HoodieKeyBasedFileGroupRecordBuffer<>(
+ return new KeyBasedFileGroupRecordBuffer<>(
readerContext, hoodieTableMetaClient, recordMergeMode,
Option.empty(), Option.empty(), props, readStats);
}
}
@@ -191,12 +191,12 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
if (baseFileStoragePathInfo != null) {
return readerContext.getFileRecordIterator(
baseFileStoragePathInfo, start, length,
- readerContext.getSchemaHandler().getDataSchema(),
+ readerContext.getSchemaHandler().getTableSchema(),
readerContext.getSchemaHandler().getRequiredSchema(), storage);
} else {
return readerContext.getFileRecordIterator(
baseFile.getStoragePath(), start, length,
- readerContext.getSchemaHandler().getDataSchema(),
+ readerContext.getSchemaHandler().getTableSchema(),
readerContext.getSchemaHandler().getRequiredSchema(), storage);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
similarity index 78%
rename from
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
rename to
hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
index 144e1c764cf..d19519a3f86 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/KeyBasedFileGroupRecordBuffer.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.KeySpec;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -41,21 +42,23 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
+import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
+
/**
* A buffer that is used to store log records by {@link
org.apache.hudi.common.table.log.HoodieMergedLogRecordReader}
* by calling the {@link #processDataBlock} and {@link #processDeleteBlock}
methods into a record key based map.
* The records from the base file is accessed from an iterator object. These
records are merged when the
* {@link #hasNext} method is called.
*/
-public class HoodieKeyBasedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupRecordBuffer<T> {
-
- public HoodieKeyBasedFileGroupRecordBuffer(HoodieReaderContext<T>
readerContext,
- HoodieTableMetaClient
hoodieTableMetaClient,
- RecordMergeMode recordMergeMode,
- Option<String>
partitionNameOverrideOpt,
- Option<String[]>
partitionPathFieldOpt,
- TypedProperties props,
- HoodieReadStats readStats) {
+public class KeyBasedFileGroupRecordBuffer<T> extends FileGroupRecordBuffer<T>
{
+
+ public KeyBasedFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
+ HoodieTableMetaClient
hoodieTableMetaClient,
+ RecordMergeMode recordMergeMode,
+ Option<String> partitionNameOverrideOpt,
+ Option<String[]> partitionPathFieldOpt,
+ TypedProperties props,
+ HoodieReadStats readStats) {
super(readerContext, hoodieTableMetaClient, recordMergeMode,
partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
}
@@ -82,7 +85,12 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupR
Map<String, Object> metadata = readerContext.generateMetadataForRecord(
nextRecord, schema);
String recordKey = (String)
metadata.get(HoodieReaderContext.INTERNAL_META_RECORD_KEY);
- processNextDataRecord(nextRecord, metadata, recordKey);
+
+ if (isBuiltInDeleteRecord(nextRecord) ||
isCustomDeleteRecord(nextRecord)) {
+ processDeleteRecord(nextRecord, metadata);
+ } else {
+ processNextDataRecord(nextRecord, metadata, recordKey);
+ }
}
}
}
@@ -123,6 +131,20 @@ public class HoodieKeyBasedFileGroupRecordBuffer<T>
extends HoodieBaseFileGroupR
}
}
+ protected void processDeleteRecord(T record, Map<String, Object> metadata) {
+ DeleteRecord deleteRecord = DeleteRecord.create(
+ new HoodieKey(
+ (String) metadata.get(INTERNAL_META_RECORD_KEY),
+ // The partition path of the delete record is set to null because
it is not
+ // used, and the delete record is never surfaced from the file
group reader
+ null),
+ readerContext.getOrderingValue(
+ Option.of(record), metadata, readerSchema, orderingFieldName));
+ processNextDeletedRecord(
+ deleteRecord,
+ (String) metadata.get(INTERNAL_META_RECORD_KEY));
+ }
+
@Override
public boolean containsLogRecord(String recordKey) {
return records.containsKey(recordKey);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
similarity index 87%
rename from
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
rename to
hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
index 74925592f22..b0e039a4b68 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.KeySpec;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -61,8 +62,8 @@ import static
org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
* Here the position means that record position in the base file. The records
from the base file is accessed from an iterator object. These records are
merged when the
* {@link #hasNext} method is called.
*/
-public class HoodiePositionBasedFileGroupRecordBuffer<T> extends
HoodieKeyBasedFileGroupRecordBuffer<T> {
- private static final Logger LOG =
LoggerFactory.getLogger(HoodiePositionBasedFileGroupRecordBuffer.class);
+public class PositionBasedFileGroupRecordBuffer<T> extends
KeyBasedFileGroupRecordBuffer<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(PositionBasedFileGroupRecordBuffer.class);
private static final String ROW_INDEX_COLUMN_NAME = "row_index";
public static final String ROW_INDEX_TEMPORARY_COLUMN_NAME =
"_tmp_metadata_" + ROW_INDEX_COLUMN_NAME;
@@ -70,14 +71,14 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieKeyBasedF
private long nextRecordPosition = 0L;
private boolean needToDoHybridStrategy = false;
- public HoodiePositionBasedFileGroupRecordBuffer(HoodieReaderContext<T>
readerContext,
- HoodieTableMetaClient
hoodieTableMetaClient,
- RecordMergeMode
recordMergeMode,
- Option<String>
partitionNameOverrideOpt,
- Option<String[]>
partitionPathFieldOpt,
- String baseFileInstantTime,
- TypedProperties props,
- HoodieReadStats readStats) {
+ public PositionBasedFileGroupRecordBuffer(HoodieReaderContext<T>
readerContext,
+ HoodieTableMetaClient
hoodieTableMetaClient,
+ RecordMergeMode recordMergeMode,
+ Option<String>
partitionNameOverrideOpt,
+ Option<String[]>
partitionPathFieldOpt,
+ String baseFileInstantTime,
+ TypedProperties props,
+ HoodieReadStats readStats) {
super(readerContext, hoodieTableMetaClient, recordMergeMode,
partitionNameOverrideOpt, partitionPathFieldOpt, props, readStats);
this.baseFileInstantTime = baseFileInstantTime;
}
@@ -135,13 +136,13 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieKeyBasedF
}
long recordPosition = recordPositions.get(recordIndex++);
-
T evolvedNextRecord =
schemaTransformerWithEvolvedSchema.getLeft().apply(nextRecord);
- processNextDataRecord(
- evolvedNextRecord,
- readerContext.generateMetadataForRecord(evolvedNextRecord, schema),
- recordPosition
- );
+ Map<String, Object> metadata =
readerContext.generateMetadataForRecord(evolvedNextRecord, schema);
+ if (isBuiltInDeleteRecord(evolvedNextRecord) ||
isCustomDeleteRecord(evolvedNextRecord)) {
+ processDeleteRecord(evolvedNextRecord, metadata, recordPosition);
+ } else {
+ processNextDataRecord(evolvedNextRecord, metadata, recordPosition);
+ }
}
}
}
@@ -202,6 +203,17 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieKeyBasedF
}
}
+ protected void processDeleteRecord(T record, Map<String, Object> metadata,
long recordPosition) {
+ DeleteRecord deleteRecord = DeleteRecord.create(
+ new HoodieKey(
+ // The partition path of the delete record is set to null because
it is not
+ // used, and the delete record is never surfaced from the file
group reader
+ (String) metadata.get(INTERNAL_META_RECORD_KEY), null),
+ readerContext.getOrderingValue(
+ Option.of(record), metadata, readerSchema, orderingFieldName));
+ processNextDeletedRecord(deleteRecord, recordPosition);
+ }
+
@Override
public void processNextDeletedRecord(DeleteRecord deleteRecord, Serializable
recordPosition) {
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair =
records.get(recordPosition);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedSchemaHandler.java
similarity index 83%
rename from
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
rename to
hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedSchemaHandler.java
index 71722f438a7..c91073b4f78 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedSchemaHandler.java
@@ -35,18 +35,18 @@ import java.util.Collections;
import java.util.List;
import static
org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchemaDedupNested;
-import static
org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME;
+import static
org.apache.hudi.common.table.read.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME;
/**
* This class is responsible for handling the schema for the file group reader
that supports positional merge.
*/
-public class HoodiePositionBasedSchemaHandler<T> extends
HoodieFileGroupReaderSchemaHandler<T> {
- public HoodiePositionBasedSchemaHandler(HoodieReaderContext<T> readerContext,
- Schema dataSchema,
- Schema requestedSchema,
- Option<InternalSchema>
internalSchemaOpt,
- HoodieTableConfig hoodieTableConfig,
- TypedProperties properties) {
+public class PositionBasedSchemaHandler<T> extends
FileGroupReaderSchemaHandler<T> {
+ public PositionBasedSchemaHandler(HoodieReaderContext<T> readerContext,
+ Schema dataSchema,
+ Schema requestedSchema,
+ Option<InternalSchema> internalSchemaOpt,
+ HoodieTableConfig hoodieTableConfig,
+ TypedProperties properties) {
super(readerContext, dataSchema, requestedSchema, internalSchemaOpt,
hoodieTableConfig, properties);
}
@@ -60,7 +60,7 @@ public class HoodiePositionBasedSchemaHandler<T> extends
HoodieFileGroupReaderSc
@Override
protected Option<InternalSchema> getInternalSchemaOpt(Option<InternalSchema>
internalSchemaOpt) {
- return
internalSchemaOpt.map(HoodiePositionBasedSchemaHandler::addPositionalMergeCol);
+ return
internalSchemaOpt.map(PositionBasedSchemaHandler::addPositionalMergeCol);
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
similarity index 97%
rename from
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
rename to
hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
index fec5c6fe2c4..7874b994c2a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/UnmergedFileGroupRecordBuffer.java
@@ -41,12 +41,12 @@ import java.io.Serializable;
import java.util.Iterator;
import java.util.Map;
-public class HoodieUnmergedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupRecordBuffer<T> {
+public class UnmergedFileGroupRecordBuffer<T> extends FileGroupRecordBuffer<T>
{
// Used to order the records in the record map.
private Long putIndex = 0L;
private Long getIndex = 0L;
- public HoodieUnmergedFileGroupRecordBuffer(
+ public UnmergedFileGroupRecordBuffer(
HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient,
RecordMergeMode recordMergeMode,
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
new file mode 100644
index 00000000000..a4b912e33b5
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupRecordBuffer.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_PARTITION_PATH;
+import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
+import static
org.apache.hudi.common.table.read.FileGroupRecordBuffer.getOrderingValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests {@link FileGroupRecordBuffer}
+ */
+class TestFileGroupRecordBuffer {
+ private String schemaString = "{"
+ + "\"type\": \"record\","
+ + "\"name\": \"EventRecord\","
+ + "\"namespace\": \"com.example.avro\","
+ + "\"fields\": ["
+ + "{\"name\": \"id\", \"type\": \"string\"},"
+ + "{\"name\": \"ts\", \"type\": \"long\"},"
+ + "{\"name\": \"op\", \"type\": \"string\"},"
+ + "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\"}"
+ + "]"
+ + "}";
+ private Schema schema = new Schema.Parser().parse(schemaString);
+ private final HoodieReaderContext readerContext =
mock(HoodieReaderContext.class);
+ private final FileGroupReaderSchemaHandler schemaHandler =
+ mock(FileGroupReaderSchemaHandler.class);
+ private HoodieTableMetaClient hoodieTableMetaClient =
mock(HoodieTableMetaClient.class);
+ private Option<String> partitionNameOverrideOpt = Option.empty();
+ private Option<String[]> partitionPathFieldOpt = Option.empty();
+ private TypedProperties props = new TypedProperties();
+ private HoodieReadStats readStats = mock(HoodieReadStats.class);
+
+ @BeforeEach
+ void setUp() {
+ when(readerContext.getSchemaHandler()).thenReturn(schemaHandler);
+ when(schemaHandler.getRequiredSchema()).thenReturn(schema);
+ when(readerContext.getRecordMerger()).thenReturn(Option.empty());
+ }
+
+ @Test
+ void testGetOrderingValueFromDeleteRecord() {
+ HoodieReaderContext readerContext = mock(HoodieReaderContext.class);
+ DeleteRecord deleteRecord = mock(DeleteRecord.class);
+ mockDeleteRecord(deleteRecord, null);
+ assertEquals(DEFAULT_ORDERING_VALUE, getOrderingValue(readerContext,
deleteRecord));
+ mockDeleteRecord(deleteRecord, DEFAULT_ORDERING_VALUE);
+ assertEquals(DEFAULT_ORDERING_VALUE, getOrderingValue(readerContext,
deleteRecord));
+ String orderingValue = "xyz";
+ String convertedValue = "_xyz";
+ mockDeleteRecord(deleteRecord, orderingValue);
+
when(readerContext.convertValueToEngineType(orderingValue)).thenReturn(convertedValue);
+ assertEquals(convertedValue, getOrderingValue(readerContext,
deleteRecord));
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "true, true, true, EVENT_TIME_ORDERING",
+ "true, false, false, EVENT_TIME_ORDERING",
+ "false, true, false, EVENT_TIME_ORDERING",
+ "false, false, true, EVENT_TIME_ORDERING",
+ "true, true, true, COMMIT_TIME_ORDERING",
+ "true, false, false, COMMIT_TIME_ORDERING",
+ "false, true, false, COMMIT_TIME_ORDERING",
+ "false, false, true, COMMIT_TIME_ORDERING",
+ "true, true, true, CUSTOM",
+ "true, false, false, CUSTOM",
+ "false, true, false, CUSTOM",
+ "false, false, true, CUSTOM",
+ "true, true, true,",
+ "true, false, false,",
+ "false, true, false,",
+ "false, false, true,"
+ })
+ public void testSchemaForMandatoryFields(boolean setPrecombine,
+ boolean addHoodieIsDeleted,
+ boolean addCustomDeleteMarker,
+ RecordMergeMode mergeMode) {
+ HoodieReaderContext readerContext = mock(HoodieReaderContext.class);
+ when(readerContext.getHasBootstrapBaseFile()).thenReturn(false);
+ when(readerContext.getHasLogFiles()).thenReturn(true);
+ HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class);
+ when(readerContext.getRecordMerger()).thenReturn(Option.of(recordMerger));
+ when(recordMerger.isProjectionCompatible()).thenReturn(false);
+
+ String preCombineField = "ts";
+ String customDeleteKey = "colC";
+ String customDeleteValue = "D";
+ List<String> dataSchemaFields = new ArrayList<>();
+ dataSchemaFields.addAll(Arrays.asList(
+ HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD, preCombineField,
+ "colA", "colB", "colC", "colD"));
+ if (addHoodieIsDeleted) {
+ dataSchemaFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
+ }
+
+ Schema dataSchema = getSchema(dataSchemaFields);
+ Schema requestedSchema =
getSchema(Arrays.asList(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD));
+
+ HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+ when(tableConfig.getRecordMergeMode()).thenReturn(mergeMode);
+ when(tableConfig.populateMetaFields()).thenReturn(true);
+ when(tableConfig.getPreCombineField()).thenReturn(setPrecombine ?
preCombineField : StringUtils.EMPTY_STRING);
+
+ TypedProperties props = new TypedProperties();
+ if (addCustomDeleteMarker) {
+ props.setProperty(DELETE_KEY, customDeleteKey);
+ props.setProperty(DELETE_MARKER, customDeleteValue);
+ }
+ FileGroupReaderSchemaHandler fileGroupReaderSchemaHandler = new
FileGroupReaderSchemaHandler(readerContext,
+ dataSchema, requestedSchema, Option.empty(), tableConfig, props);
+ List<String> expectedFields = new ArrayList();
+ expectedFields.add(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ expectedFields.add(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
+ if (addCustomDeleteMarker) {
+ expectedFields.add(customDeleteKey);
+ }
+ if (setPrecombine && mergeMode != RecordMergeMode.COMMIT_TIME_ORDERING) {
// commit time ordering does not project ordering field.
+ expectedFields.add(preCombineField);
+ }
+ if (addHoodieIsDeleted) {
+ expectedFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
+ }
+ Schema expectedSchema = mergeMode == RecordMergeMode.CUSTOM ? dataSchema :
getSchema(expectedFields);
+ Schema actualSchema =
fileGroupReaderSchemaHandler.generateRequiredSchema();
+ assertEquals(expectedSchema, actualSchema);
+ assertEquals(addHoodieIsDeleted,
fileGroupReaderSchemaHandler.hasBuiltInDelete());
+ assertEquals(addCustomDeleteMarker
+ ? Option.of(Pair.of(customDeleteKey, customDeleteValue)) :
Option.empty(),
+ fileGroupReaderSchemaHandler.getCustomDeleteMarkerKeyValue());
+ }
+
+ @ParameterizedTest
+ @CsvSource({"true,false", "false,true"})
+ void testInvalidCustomDeleteConfigs(boolean configureCustomDeleteKey,
+ boolean configureCustomDeleteMarker) {
+ HoodieReaderContext readerContext = mock(HoodieReaderContext.class);
+ when(readerContext.getHasBootstrapBaseFile()).thenReturn(false);
+ when(readerContext.getHasLogFiles()).thenReturn(true);
+ HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class);
+ when(readerContext.getRecordMerger()).thenReturn(Option.of(recordMerger));
+ when(recordMerger.isProjectionCompatible()).thenReturn(false);
+
+ String customDeleteKey = "colC";
+ String customDeleteValue = "D";
+ List<String> dataSchemaFields = new ArrayList<>(Arrays.asList(
+ HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD,
+ "colA", "colB", "colC", "colD"));
+
+ Schema dataSchema = getSchema(dataSchemaFields);
+ Schema requestedSchema =
getSchema(Arrays.asList(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD));
+
+ HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+
+ TypedProperties props = new TypedProperties();
+ if (configureCustomDeleteKey) {
+ props.setProperty(DELETE_KEY, customDeleteKey);
+ }
+ if (configureCustomDeleteMarker) {
+ props.setProperty(DELETE_MARKER, customDeleteValue);
+ }
+ Throwable exception = assertThrows(IllegalArgumentException.class,
+ () -> new FileGroupReaderSchemaHandler(readerContext,
+ dataSchema, requestedSchema, Option.empty(), tableConfig, props));
+ assertEquals("Either custom delete key or marker is not specified",
+ exception.getMessage());
+ }
+
+ private Schema getSchema(List<String> fields) {
+ SchemaBuilder.FieldAssembler<Schema> schemaFieldAssembler =
SchemaBuilder.builder().record("test_schema")
+ .namespace("test_namespace").fields();
+ for (String field : fields) {
+ schemaFieldAssembler =
schemaFieldAssembler.name(field).type().stringType().noDefault();
+ }
+ return schemaFieldAssembler.endRecord();
+ }
+
+ private void mockDeleteRecord(DeleteRecord deleteRecord,
+ Comparable orderingValue) {
+ when(deleteRecord.getOrderingValue()).thenReturn(orderingValue);
+ }
+
+ @Test
+ void testIsCustomDeleteRecord() {
+ String customDeleteKey = "op";
+ String customDeleteValue = "d";
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("id", "12345");
+ record.put("ts", System.currentTimeMillis());
+ record.put(customDeleteKey, "d");
+
+ when(schemaHandler.getCustomDeleteMarkerKeyValue())
+ .thenReturn(Option.of(Pair.of(customDeleteKey, customDeleteValue)));
+ KeyBasedFileGroupRecordBuffer keyBasedBuffer =
+ new KeyBasedFileGroupRecordBuffer(
+ readerContext,
+ hoodieTableMetaClient,
+ RecordMergeMode.COMMIT_TIME_ORDERING,
+ partitionNameOverrideOpt,
+ partitionPathFieldOpt,
+ props,
+ readStats);
+ when(readerContext.getValue(any(), any(), any())).thenReturn(null);
+ assertFalse(keyBasedBuffer.isCustomDeleteRecord(record));
+
+ props.setProperty(DELETE_KEY, customDeleteKey);
+ props.setProperty(DELETE_MARKER, customDeleteValue);
+ keyBasedBuffer = new KeyBasedFileGroupRecordBuffer(
+ readerContext,
+ hoodieTableMetaClient,
+ RecordMergeMode.COMMIT_TIME_ORDERING,
+ partitionNameOverrideOpt,
+ partitionPathFieldOpt,
+ props,
+ readStats);
+ when(readerContext.getValue(any(), any(), any())).thenReturn("i");
+ assertFalse(keyBasedBuffer.isCustomDeleteRecord(record));
+ when(readerContext.getValue(any(), any(), any())).thenReturn("d");
+ assertTrue(keyBasedBuffer.isCustomDeleteRecord(record));
+ }
+
+ @Test
+ void testProcessCustomDeleteRecord() {
+ String customDeleteKey = "op";
+ String customDeleteValue = "d";
+ when(schemaHandler.getCustomDeleteMarkerKeyValue())
+ .thenReturn(Option.of(Pair.of(customDeleteKey, customDeleteValue)));
+ when(schemaHandler.hasBuiltInDelete()).thenReturn(true);
+ KeyBasedFileGroupRecordBuffer keyBasedBuffer =
+ new KeyBasedFileGroupRecordBuffer(
+ readerContext,
+ hoodieTableMetaClient,
+ RecordMergeMode.COMMIT_TIME_ORDERING,
+ partitionNameOverrideOpt,
+ partitionPathFieldOpt,
+ props,
+ readStats);
+
+ // CASE 1: With custom delete marker.
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("id", "12345");
+ record.put("ts", System.currentTimeMillis());
+ record.put("op", "d");
+ record.put("_hoodie_is_deleted", false);
+
+ Map<String, Object> metadata = new HashMap<>();
+ metadata.put(INTERNAL_META_RECORD_KEY, "12345");
+ metadata.put(INTERNAL_META_PARTITION_PATH, "partition1");
+ when(readerContext.getOrderingValue(any(), any(), any(),
any())).thenReturn(1);
+ when(readerContext.generateMetadataForRecord(any(), any(),
any())).thenReturn(metadata);
+ keyBasedBuffer.processDeleteRecord(record, metadata);
+ Map<Serializable, Pair<Option<GenericRecord>, Map<String, Object>>>
records =
+ keyBasedBuffer.getLogRecords();
+ assertEquals(1, records.size());
+ assertEquals(Pair.of(Option.empty(), metadata), records.get("12345"));
+
+ // CASE 2: With _hoodie_is_deleted is true.
+ GenericRecord anotherRecord = new GenericData.Record(schema);
+ anotherRecord.put("id", "54321");
+ anotherRecord.put("ts", System.currentTimeMillis());
+ anotherRecord.put("op", "i");
+ anotherRecord.put("_hoodie_is_deleted", true);
+
+ Map<String, Object> anotherMetadata = new HashMap<>();
+ anotherMetadata.put(INTERNAL_META_RECORD_KEY, "54321");
+ anotherMetadata.put(INTERNAL_META_PARTITION_PATH, "partition2");
+ when(readerContext.generateMetadataForRecord(any(), any(),
any())).thenReturn(anotherMetadata);
+ keyBasedBuffer.processDeleteRecord(anotherRecord, anotherMetadata);
+ records = keyBasedBuffer.getLogRecords();
+ assertEquals(2, records.size());
+ assertEquals(Pair.of(Option.empty(), anotherMetadata),
records.get("54321"));
+ }
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index e72d5dd40ce..3f1720734a2 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -359,7 +359,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
}
if (fileSlice.getBaseFile().get().getBootstrapBaseFile().isPresent()) {
//TODO: [HUDI-8169] this code path will not hit until we implement
bootstrap tests
- Pair<List<Schema.Field>, List<Schema.Field>> dataAndMetaCols =
HoodieFileGroupReaderSchemaHandler.getDataAndMetaCols(requestedSchema);
+ Pair<List<Schema.Field>, List<Schema.Field>> dataAndMetaCols =
FileGroupReaderSchemaHandler.getDataAndMetaCols(requestedSchema);
return !dataAndMetaCols.getLeft().isEmpty() &&
!dataAndMetaCols.getRight().isEmpty();
}
return false;
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java
deleted file mode 100644
index ee369410c8e..00000000000
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupRecordBuffer.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.common.table.read;
-
-import org.apache.hudi.common.config.RecordMergeMode;
-import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.model.DeleteRecord;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRecordMerger;
-import org.apache.hudi.common.table.HoodieTableConfig;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.StringUtils;
-
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.CsvSource;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
-import static
org.apache.hudi.common.table.read.HoodieBaseFileGroupRecordBuffer.getOrderingValue;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests {@link HoodieBaseFileGroupRecordBuffer}
- */
-public class TestHoodieFileGroupRecordBuffer {
- @Test
- void testGetOrderingValueFromDeleteRecord() {
- HoodieReaderContext readerContext = mock(HoodieReaderContext.class);
- DeleteRecord deleteRecord = mock(DeleteRecord.class);
- mockDeleteRecord(deleteRecord, null);
- assertEquals(DEFAULT_ORDERING_VALUE, getOrderingValue(readerContext,
deleteRecord));
- mockDeleteRecord(deleteRecord, DEFAULT_ORDERING_VALUE);
- assertEquals(DEFAULT_ORDERING_VALUE, getOrderingValue(readerContext,
deleteRecord));
- String orderingValue = "xyz";
- String convertedValue = "_xyz";
- mockDeleteRecord(deleteRecord, orderingValue);
-
when(readerContext.convertValueToEngineType(orderingValue)).thenReturn(convertedValue);
- assertEquals(convertedValue, getOrderingValue(readerContext,
deleteRecord));
- }
-
- @ParameterizedTest
- @CsvSource({
- "true, true, EVENT_TIME_ORDERING",
- "true, false, EVENT_TIME_ORDERING",
- "false, true, EVENT_TIME_ORDERING",
- "false, false, EVENT_TIME_ORDERING",
- "true, true, COMMIT_TIME_ORDERING",
- "true, false, COMMIT_TIME_ORDERING",
- "false, true, COMMIT_TIME_ORDERING",
- "false, false, COMMIT_TIME_ORDERING",
- "true, true, CUSTOM",
- "true, false, CUSTOM",
- "false, true, CUSTOM",
- "false, false, CUSTOM",
- "true, true,",
- "true, false,",
- "false, true,",
- "false, false,"
- })
- public void testSchemaForMandatoryFields(boolean setPrecombine, boolean
addHoodieIsDeleted, RecordMergeMode mergeMode) {
- HoodieReaderContext readerContext = mock(HoodieReaderContext.class);
- when(readerContext.getHasBootstrapBaseFile()).thenReturn(false);
- when(readerContext.getHasLogFiles()).thenReturn(true);
- HoodieRecordMerger recordMerger = mock(HoodieRecordMerger.class);
- when(readerContext.getRecordMerger()).thenReturn(Option.of(recordMerger));
- when(recordMerger.isProjectionCompatible()).thenReturn(false);
-
- String preCombineField = "ts";
- List<String> dataSchemaFields = new ArrayList<>();
- dataSchemaFields.addAll(Arrays.asList(
- HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD, preCombineField,
- "colA", "colB", "colC", "colD"));
- if (addHoodieIsDeleted) {
- dataSchemaFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
- }
-
- Schema dataSchema = getSchema(dataSchemaFields);
- Schema requestedSchema =
getSchema(Arrays.asList(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD));
-
- HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
- when(tableConfig.getRecordMergeMode()).thenReturn(mergeMode);
- when(tableConfig.populateMetaFields()).thenReturn(true);
- when(tableConfig.getPreCombineField()).thenReturn(setPrecombine ?
preCombineField : StringUtils.EMPTY_STRING);
-
- TypedProperties props = new TypedProperties();
- HoodieFileGroupReaderSchemaHandler fileGroupReaderSchemaHandler = new
HoodieFileGroupReaderSchemaHandler(readerContext,
- dataSchema, requestedSchema, Option.empty(), tableConfig, props);
- List<String> expectedFields = new ArrayList();
- expectedFields.add(HoodieRecord.RECORD_KEY_METADATA_FIELD);
- expectedFields.add(HoodieRecord.PARTITION_PATH_METADATA_FIELD);
- if (setPrecombine && mergeMode != RecordMergeMode.COMMIT_TIME_ORDERING) {
// commit time ordering does not project ordering field.
- expectedFields.add(preCombineField);
- }
- if (addHoodieIsDeleted) {
- expectedFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
- }
- Schema expectedSchema = mergeMode == RecordMergeMode.CUSTOM ? dataSchema :
getSchema(expectedFields);
- Schema actualSchema =
fileGroupReaderSchemaHandler.generateRequiredSchema();
- assertEquals(expectedSchema, actualSchema);
- }
-
- private Schema getSchema(List<String> fields) {
- SchemaBuilder.FieldAssembler<Schema> schemaFieldAssembler =
SchemaBuilder.builder().record("test_schema")
- .namespace("test_namespace").fields();
- for (String field : fields) {
- schemaFieldAssembler =
schemaFieldAssembler.name(field).type().stringType().noDefault();
- }
- return schemaFieldAssembler.endRecord();
- }
-
- private void mockDeleteRecord(DeleteRecord deleteRecord,
- Comparable orderingValue) {
- when(deleteRecord.getOrderingValue()).thenReturn(orderingValue);
- }
-}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index fda3018e778..b0eaf6cb25c 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -208,6 +208,16 @@ public class HiveHoodieReaderContext extends
HoodieReaderContext<ArrayWritable>
return StringUtils.isNullOrEmpty(fieldName) ? null :
objectInspectorCache.getValue(record, schema, fieldName);
}
+ @Override
+ public boolean castToBoolean(Object value) {
+ if (value instanceof BooleanWritable) {
+ return ((BooleanWritable) value).get();
+ } else {
+ throw new IllegalArgumentException(
+ "Expected BooleanWritable but got " + value.getClass());
+ }
+ }
+
@Override
public HoodieRecord<ArrayWritable>
constructHoodieRecord(Option<ArrayWritable> recordOption, Map<String, Object>
metadataMap) {
if (!recordOption.isPresent()) {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
index 336e1bbe83d..fec096920aa 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
@@ -22,7 +22,7 @@ package org.apache.spark.execution.datasources.parquet
import org.apache.hudi.SparkFileFormatInternalRowReaderContext
import
org.apache.hudi.SparkFileFormatInternalRowReaderContext.filterIsSafeForBootstrap
import org.apache.hudi.common.model.HoodieRecord
-import
org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
+import
org.apache.hudi.common.table.read.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.spark.sql.execution.datasources.parquet.SparkParquetReader
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
similarity index 96%
rename from
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
rename to
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
index 3c80ef83919..c927893a115 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
@@ -33,9 +33,9 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import
org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer;
-import org.apache.hudi.common.table.read.HoodiePositionBasedSchemaHandler;
import org.apache.hudi.common.table.read.HoodieReadStats;
+import org.apache.hudi.common.table.read.PositionBasedFileGroupRecordBuffer;
+import org.apache.hudi.common.table.read.PositionBasedSchemaHandler;
import org.apache.hudi.common.table.read.TestHoodieFileGroupReaderOnSpark;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.SchemaTestUtil;
@@ -70,11 +70,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
-public class TestHoodiePositionBasedFileGroupRecordBuffer extends
TestHoodieFileGroupReaderOnSpark {
+public class TestPositionBasedFileGroupRecordBuffer extends
TestHoodieFileGroupReaderOnSpark {
private final HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF);
private HoodieTableMetaClient metaClient;
private Schema avroSchema;
- private HoodiePositionBasedFileGroupRecordBuffer<InternalRow> buffer;
+ private PositionBasedFileGroupRecordBuffer<InternalRow> buffer;
private String partitionPath;
private HoodieReadStats readStats;
@@ -123,7 +123,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
} else {
ctx.setRecordMerger(Option.empty());
}
- ctx.setSchemaHandler(new HoodiePositionBasedSchemaHandler<>(ctx,
avroSchema, avroSchema,
+ ctx.setSchemaHandler(new PositionBasedSchemaHandler<>(ctx, avroSchema,
avroSchema,
Option.empty(), metaClient.getTableConfig(), new TypedProperties()));
TypedProperties props = new TypedProperties();
props.put("hoodie.write.record.merge.mode", mergeMode.name());
@@ -136,7 +136,7 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
writeConfigs.put(HoodieTableConfig.RECORD_MERGE_STRATEGY_ID.key(),
HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID);
}
readStats = new HoodieReadStats();
- buffer = new HoodiePositionBasedFileGroupRecordBuffer<>(
+ buffer = new PositionBasedFileGroupRecordBuffer<>(
ctx,
metaClient,
mergeMode,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index ae5d45d7dfa..4120956b4ce 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -19,15 +19,22 @@
package org.apache.hudi.common.table.read
-import org.apache.hudi.{SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
-import org.apache.hudi.common.config.HoodieReaderConfig
+import org.apache.hudi.{DataSourceWriteOptions, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
+import org.apache.hudi.DataSourceWriteOptions.{OPERATION, PRECOMBINE_FIELD,
RECORDKEY_FIELD, TABLE_TYPE}
+import org.apache.hudi.common.config.{HoodieReaderConfig, RecordMergeMode}
import
org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED
import org.apache.hudi.common.engine.HoodieReaderContext
+import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{FileSlice, HoodieRecord,
WriteOperationType}
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload.{DELETE_KEY,
DELETE_MARKER}
import org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import
org.apache.hudi.common.table.read.TestHoodieFileGroupReaderOnSpark.getFileCount
import org.apache.hudi.common.testutils.{HoodieTestUtils, RawTripTestPayload}
import org.apache.hudi.common.util.{Option => HOption}
-import org.apache.hudi.storage.StorageConfiguration
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.storage.{StorageConfiguration, StoragePath}
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
@@ -39,7 +46,9 @@ import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import org.mockito.Mockito
import java.util
@@ -165,6 +174,111 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
sparkReaderContext, row, avroSchema, "non_existent_col",
DEFAULT_ORDERING_VALUE)
}
+ val expectedEventTimeBased: Seq[(Int, String, String, String, Double,
String)] = Seq(
+ (10, "5", "rider-E", "driver-E", 17.85, "i"),
+ (10, "3", "rider-C", "driver-C", 33.9, "i"),
+ (10, "2", "rider-B", "driver-B", 27.7, "i"),
+ (20, "1", "rider-Z", "driver-Z", 27.7, "i"))
+ val expectedCommitTimeBased: Seq[(Int, String, String, String, Double,
String)] = Seq(
+ (10, "5", "rider-E", "driver-E", 17.85, "i"),
+ (10, "3", "rider-C", "driver-C", 33.9, "i"),
+ (20, "1", "rider-Z", "driver-Z", 27.7, "i"))
+
+ @ParameterizedTest
+ @MethodSource(Array("customDeleteTestParams"))
+ def testCustomDelete(useFgReader: String,
+ tableType: String,
+ positionUsed: String,
+ mergeMode: String): Unit = {
+ val payloadClass =
"org.apache.hudi.common.table.read.CustomPayloadForTesting"
+ val fgReaderOpts: Map[String, String] = Map(
+ HoodieWriteConfig.MERGE_SMALL_FILE_GROUP_CANDIDATES_LIMIT.key -> "0",
+ HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key -> useFgReader,
+ HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key -> positionUsed,
+ HoodieWriteConfig.RECORD_MERGE_MODE.key -> mergeMode
+ )
+ val deleteOpts: Map[String, String] = Map(
+ DELETE_KEY -> "op", DELETE_MARKER -> "d")
+ val readOpts = if (mergeMode.equals("CUSTOM")) {
+ fgReaderOpts ++ deleteOpts ++ Map(
+ HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key -> payloadClass)
+ } else {
+ fgReaderOpts ++ deleteOpts
+ }
+ val opts = readOpts
+ val columns = Seq("ts", "key", "rider", "driver", "fare", "op")
+
+ val data = Seq(
+ (10, "1", "rider-A", "driver-A", 19.10, "i"),
+ (10, "2", "rider-B", "driver-B", 27.70, "i"),
+ (10, "3", "rider-C", "driver-C", 33.90, "i"),
+ (10, "4", "rider-D", "driver-D", 34.15, "i"),
+ (10, "5", "rider-E", "driver-E", 17.85, "i"))
+ val inserts = spark.createDataFrame(data).toDF(columns: _*)
+ inserts.write.format("hudi").
+ option(RECORDKEY_FIELD.key(), "key").
+ option(PRECOMBINE_FIELD.key(), "ts").
+ option(TABLE_TYPE.key(), tableType).
+ option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ options(opts).
+ mode(SaveMode.Overwrite).
+ save(getBasePath)
+ val metaClient = HoodieTableMetaClient
+ .builder().setConf(getStorageConf).setBasePath(getBasePath).build
+ assertEquals((1, 0), getFileCount(metaClient, getBasePath))
+
+ // Delete using delete markers.
+ val updateData = Seq(
+ (11, "1", "rider-X", "driver-X", 19.10, "d"),
+ (9, "2", "rider-Y", "driver-Y", 27.70, "d"))
+ val updates = spark.createDataFrame(updateData).toDF(columns: _*)
+ updates.write.format("hudi").
+ option(OPERATION.key(), "upsert").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ options(opts).
+ mode(SaveMode.Append).
+ save(getBasePath)
+ assertEquals((1, 1), getFileCount(metaClient, getBasePath))
+
+ // Delete from operation.
+ val deletesData = Seq((-5, "4", "rider-D", "driver-D", 34.15, 6))
+ val deletes = spark.createDataFrame(deletesData).toDF(columns: _*)
+ deletes.write.format("hudi").
+ option(OPERATION.key(), "DELETE").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ options(opts).
+ mode(SaveMode.Append).
+ save(getBasePath)
+ assertEquals((1, 2), getFileCount(metaClient, getBasePath))
+
+ // Add a record back to test ensure event time ordering work.
+ val updateDataSecond = Seq(
+ (20, "1", "rider-Z", "driver-Z", 27.70, "i"))
+ val updatesSecond = spark.createDataFrame(updateDataSecond).toDF(columns:
_*)
+ updatesSecond.write.format("hudi").
+ option(OPERATION.key(), "upsert").
+ option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+ options(opts).
+ mode(SaveMode.Append).
+ save(getBasePath)
+ // Validate data file number.
+ assertEquals((1, 3), getFileCount(metaClient, getBasePath))
+
+ // Validate in the end.
+ val columnsToCompare = Set("ts", "key", "rider", "driver", "fare", "op")
+ val df = spark.read.options(readOpts).format("hudi").load(getBasePath)
+ val finalDf = df.select("ts", "key", "rider", "driver", "fare",
"op").sort("key")
+ val expected = if (mergeMode ==
RecordMergeMode.EVENT_TIME_ORDERING.name()) {
+ expectedEventTimeBased
+ } else {
+ expectedCommitTimeBased
+ }
+ val expectedDf = spark.createDataFrame(expected).toDF(columns:
_*).sort("key")
+ assertTrue(
+ SparkClientFunctionalTestHarness.areDataframesEqual(expectedDf, finalDf,
columnsToCompare.asJava))
+ }
+
private def testGetOrderingValue(sparkReaderContext:
HoodieReaderContext[InternalRow],
row: InternalRow,
avroSchema: Schema,
@@ -177,3 +291,26 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
metadataMap.get(HoodieReaderContext.INTERNAL_META_ORDERING_FIELD))
}
}
+
+object TestHoodieFileGroupReaderOnSpark {
+ def customDeleteTestParams(): java.util.List[Arguments] = {
+ java.util.Arrays.asList(
+ Arguments.of("true", "MERGE_ON_READ", "false", "EVENT_TIME_ORDERING"),
+ Arguments.of("true", "MERGE_ON_READ", "true", "EVENT_TIME_ORDERING"),
+ Arguments.of("true", "MERGE_ON_READ", "false", "COMMIT_TIME_ORDERING"),
+ Arguments.of("true", "MERGE_ON_READ", "true", "COMMIT_TIME_ORDERING"),
+ Arguments.of("true", "MERGE_ON_READ", "false", "CUSTOM"),
+ Arguments.of("true", "MERGE_ON_READ", "true", "CUSTOM"))
+ }
+
+ def getFileCount(metaClient: HoodieTableMetaClient, basePath: String):
(Long, Long) = {
+ val newMetaClient = HoodieTableMetaClient.reload(metaClient)
+ val files = newMetaClient.getStorage.listFiles(new StoragePath(basePath))
+ (files.stream().filter(f =>
+ f.getPath.getParent.equals(new StoragePath(basePath))
+ && FSUtils.isBaseFile(f.getPath)).count(),
+ files.stream().filter(f =>
+ f.getPath.getParent.equals(new StoragePath(basePath))
+ && FSUtils.isLogFile(f.getPath)).count())
+ }
+}