This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c0576131759 [HUDI-6798] Add record merging mode and implement
event-time ordering in the new file group reader (#9894)
c0576131759 is described below
commit c05761317596585a3c0c3cc69a34b4407843351c
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sun Jun 9 20:48:09 2024 -0700
[HUDI-6798] Add record merging mode and implement event-time ordering in
the new file group reader (#9894)
This PR adds a new table config `hoodie.record.merge.mode` to control the
record merging mode and behavior in the new file group reader
(`HoodieFileGroupReader`) and implements event-time ordering in it.
The config `hoodie.record.merge.mode` is going to be the single config that
determines how the record merging happens in release 1.0 and beyond.
---------
Co-authored-by: Sagar Sumit <[email protected]>
---
.../hudi/client/TestTableSchemaEvolution.java | 3 +
.../hudi/common/config/HoodieCommonConfig.java | 3 +
.../apache/hudi/common/config/RecordMergeMode.java | 36 ++++
.../hudi/common/table/HoodieTableConfig.java | 13 +-
.../hudi/common/table/HoodieTableMetaClient.java | 114 ++++++++++-
.../table/log/BaseHoodieLogRecordReader.java | 7 +
.../table/log/HoodieMergedLogRecordReader.java | 13 +-
.../read/HoodieBaseFileGroupRecordBuffer.java | 209 ++++++++++++++++-----
.../common/table/read/HoodieFileGroupReader.java | 26 ++-
.../table/read/TestHoodieFileGroupReaderBase.java | 77 ++++++--
.../common/table/TestHoodieTableMetaClient.java | 144 ++++++++++++++
.../hudi/common/table/read/TestCustomMerger.java | 4 +
.../common/table/read/TestEventTimeMerging.java | 4 +
...stHoodiePositionBasedFileGroupRecordBuffer.java | 6 +-
.../read/TestHoodieFileGroupReaderOnSpark.scala | 11 +-
15 files changed, 588 insertions(+), 82 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
index f5fa70c6668..496b42c13d6 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -48,6 +49,7 @@ import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.config.HoodieCommonConfig.RECORD_MERGE_MODE;
import static
org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion.VERSION_1;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.EXTRA_TYPE_SCHEMA;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.FARE_NESTED_SCHEMA;
@@ -165,6 +167,7 @@ public class TestTableSchemaEvolution extends
HoodieClientTestBase {
HoodieTableMetaClient.withPropertyBuilder()
.fromMetaClient(metaClient)
.setTableType(HoodieTableType.MERGE_ON_READ)
+
.setRecordMergeMode(RecordMergeMode.valueOf(RECORD_MERGE_MODE.defaultValue()))
.setTimelineLayoutVersion(VERSION_1)
.initTable(metaClient.getStorageConf().newInstance(),
metaClient.getBasePath());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
index 1a4c2e31780..c96b07ee4f0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.config;
+import org.apache.hudi.common.table.HoodieTableConfig;
import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
@@ -81,6 +82,8 @@ public class HoodieCommonConfig extends HoodieConfig {
+ " operation will fail schema compatibility check. Set this option
to true will make the missing "
+ " column be filled with null values to successfully complete the
write operation.");
+ public static final ConfigProperty<String> RECORD_MERGE_MODE =
HoodieTableConfig.RECORD_MERGE_MODE;
+
public static final ConfigProperty<ExternalSpillableMap.DiskMapType>
SPILLABLE_DISK_MAP_TYPE = ConfigProperty
.key("hoodie.common.spillable.diskmap.type")
.defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/RecordMergeMode.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/RecordMergeMode.java
new file mode 100644
index 00000000000..641f3514ad6
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/RecordMergeMode.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+@EnumDescription("Determines the logic of merging updates")
+public enum RecordMergeMode {
+ @EnumFieldDescription("Using transaction time to merge records, i.e., the
record from later "
+ + "transaction overwrites the earlier record with the same key.")
+ OVERWRITE_WITH_LATEST,
+
+ @EnumFieldDescription("Using event time as the ordering to merge records,
i.e., the record "
+ + "with the larger event time overwrites the record with the smaller
event time on the "
+ + "same key, regardless of transaction time. The event time or
preCombine field needs "
+ + "to be specified by the user.")
+ EVENT_TIME_ORDERING,
+
+ @EnumFieldDescription("Using custom merging logic specified by the user.")
+ CUSTOM
+}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 87263a13f9d..b3bf9668d93 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.OrderedProperties;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.BootstrapIndexType;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
@@ -45,8 +46,8 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.metadata.MetadataPartitionType;
-import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
import org.slf4j.Logger;
@@ -175,6 +176,12 @@ public class HoodieTableConfig extends HoodieConfig {
.noDefaultValue()
.withDocumentation("Version of timeline used, by the table.");
+ public static final ConfigProperty<String> RECORD_MERGE_MODE = ConfigProperty
+ .key("hoodie.record.merge.mode")
+ .defaultValue(RecordMergeMode.EVENT_TIME_ORDERING.name())
+ .sinceVersion("1.0.0")
+ .withDocumentation(RecordMergeMode.class);
+
public static final ConfigProperty<String> PAYLOAD_CLASS_NAME =
ConfigProperty
.key("hoodie.compaction.payload.class")
.defaultValue(DefaultHoodieRecordPayload.class.getName())
@@ -532,6 +539,10 @@ public class HoodieTableConfig extends HoodieConfig {
setValue(VERSION, Integer.toString(tableVersion.versionCode()));
}
+ public RecordMergeMode getRecordMergeMode() {
+ return
RecordMergeMode.valueOf(getStringOrDefault(RECORD_MERGE_MODE).toUpperCase());
+ }
+
/**
* Read the payload class for HoodieRecords from the table properties.
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index e8e99ff9a0c..250091ecec6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -22,17 +22,20 @@ import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetaserverConfig;
import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.fs.ConsistencyGuard;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
import org.apache.hudi.common.fs.FileSystemRetryConfig;
import org.apache.hudi.common.fs.NoOpConsistencyGuard;
import org.apache.hudi.common.model.BootstrapIndexType;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieIndexDefinition;
import org.apache.hudi.common.model.HoodieIndexMetadata;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieTimelineTimeZone;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.RecordPayloadType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
@@ -47,7 +50,6 @@ import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.TableNotFoundException;
@@ -76,10 +78,14 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
import static org.apache.hudi.common.table.HoodieTableConfig.INITIAL_VERSION;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
import static org.apache.hudi.io.storage.HoodieIOFactory.getIOFactory;
/**
@@ -161,7 +167,7 @@ public class HoodieTableMetaClient implements Serializable {
Option<TimelineLayoutVersion> tableConfigVersion =
tableConfig.getTimelineLayoutVersion();
if (layoutVersion.isPresent() && tableConfigVersion.isPresent()) {
// Ensure layout version passed in config is not lower than the one seen
in hoodie.properties
-
ValidationUtils.checkArgument(layoutVersion.get().compareTo(tableConfigVersion.get())
>= 0,
+ checkArgument(layoutVersion.get().compareTo(tableConfigVersion.get()) >=
0,
"Layout Version defined in hoodie properties has higher version (" +
tableConfigVersion.get()
+ ") than the one passed in config (" + layoutVersion.get() +
")");
}
@@ -196,7 +202,7 @@ public class HoodieTableMetaClient implements Serializable {
String indexType,
Map<String, Map<String, String>> columns,
Map<String, String> options) {
- ValidationUtils.checkState(
+ checkState(
!indexMetadataOpt.isPresent() ||
!indexMetadataOpt.get().getIndexDefinitions().containsKey(indexName),
"Index metadata is already present");
List<String> columnNames = new ArrayList<>(columns.keySet());
@@ -892,9 +898,9 @@ public class HoodieTableMetaClient implements Serializable {
}
public HoodieTableMetaClient build() {
- ValidationUtils.checkArgument(conf != null || storage != null,
+ checkArgument(conf != null || storage != null,
"Storage configuration or HoodieStorage needs to be set to init
HoodieTableMetaClient");
- ValidationUtils.checkArgument(basePath != null, "basePath needs to be
set to init HoodieTableMetaClient");
+ checkArgument(basePath != null, "basePath needs to be set to init
HoodieTableMetaClient");
if (timeGeneratorConfig == null) {
timeGeneratorConfig =
HoodieTimeGeneratorConfig.newBuilder().withPath(basePath).build();
}
@@ -923,6 +929,7 @@ public class HoodieTableMetaClient implements Serializable {
private String recordKeyFields;
private String secondaryKeyFields;
private String archiveLogFolder;
+ private RecordMergeMode recordMergeMode;
private String payloadClassName;
private String payloadType;
private String recordMergerStrategy;
@@ -999,6 +1006,11 @@ public class HoodieTableMetaClient implements
Serializable {
return this;
}
+ public PropertyBuilder setRecordMergeMode(RecordMergeMode recordMergeMode)
{
+ this.recordMergeMode = recordMergeMode;
+ return this;
+ }
+
public PropertyBuilder setPayloadClassName(String payloadClassName) {
this.payloadClassName = payloadClassName;
return this;
@@ -1144,6 +1156,7 @@ public class HoodieTableMetaClient implements
Serializable {
return setTableType(metaClient.getTableType())
.setTableName(metaClient.getTableConfig().getTableName())
.setArchiveLogFolder(metaClient.getTableConfig().getArchivelogFolder())
+ .setRecordMergeMode(metaClient.getTableConfig().getRecordMergeMode())
.setPayloadClassName(metaClient.getTableConfig().getPayloadClass())
.setRecordMergerStrategy(metaClient.getTableConfig().getRecordMergerStrategy());
}
@@ -1173,6 +1186,10 @@ public class HoodieTableMetaClient implements
Serializable {
setArchiveLogFolder(
hoodieConfig.getString(HoodieTableConfig.ARCHIVELOG_FOLDER));
}
+ if (hoodieConfig.contains(HoodieTableConfig.RECORD_MERGE_MODE)) {
+ setRecordMergeMode(
+
RecordMergeMode.valueOf(hoodieConfig.getString(HoodieTableConfig.RECORD_MERGE_MODE).toUpperCase()));
+ }
if (hoodieConfig.contains(HoodieTableConfig.PAYLOAD_CLASS_NAME)) {
setPayloadClassName(hoodieConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME));
} else if (hoodieConfig.contains(HoodieTableConfig.PAYLOAD_TYPE)) {
@@ -1262,8 +1279,8 @@ public class HoodieTableMetaClient implements
Serializable {
}
public Properties build() {
- ValidationUtils.checkArgument(tableType != null, "tableType is null");
- ValidationUtils.checkArgument(tableName != null, "tableName is null");
+ checkArgument(tableType != null, "tableType is null");
+ checkArgument(tableName != null, "tableName is null");
HoodieTableConfig tableConfig = new HoodieTableConfig();
@@ -1285,6 +1302,11 @@ public class HoodieTableMetaClient implements
Serializable {
if (recordMergerStrategy != null) {
tableConfig.setValue(HoodieTableConfig.RECORD_MERGER_STRATEGY,
recordMergerStrategy);
}
+ inferRecordMergeMode();
+ validateMergeConfigs();
+ if (recordMergeMode != null) {
+ tableConfig.setValue(RECORD_MERGE_MODE, recordMergeMode.name());
+ }
}
if (null != tableCreateSchema) {
@@ -1385,5 +1407,83 @@ public class HoodieTableMetaClient implements
Serializable {
throws IOException {
return HoodieTableMetaClient.initTableAndGetMetaClient(configuration,
basePath, build());
}
+
+ private void inferRecordMergeMode() {
+ if (null == recordMergeMode) {
+ boolean payloadClassNameSet = null != payloadClassName;
+ boolean payloadTypeSet = null != payloadType;
+ boolean recordMergerStrategySet = null != recordMergerStrategy;
+
+ if (!recordMergerStrategySet
+ || recordMergerStrategy.equals(DEFAULT_MERGER_STRATEGY_UUID)) {
+ if (payloadClassNameSet) {
+ if
(payloadClassName.equals(OverwriteWithLatestAvroPayload.class.getName())) {
+ recordMergeMode = RecordMergeMode.OVERWRITE_WITH_LATEST;
+ } else if
(payloadClassName.equals(DefaultHoodieRecordPayload.class.getName())) {
+ recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+ } else {
+ recordMergeMode = RecordMergeMode.CUSTOM;
+ }
+ } else if (payloadTypeSet) {
+ if
(payloadType.equals(RecordPayloadType.OVERWRITE_LATEST_AVRO.name())) {
+ recordMergeMode = RecordMergeMode.OVERWRITE_WITH_LATEST;
+ } else if
(payloadType.equals(RecordPayloadType.HOODIE_AVRO_DEFAULT.name())) {
+ recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
+ } else {
+ recordMergeMode = RecordMergeMode.CUSTOM;
+ }
+ } else {
+ LOG.warn("One of the payload class name or payload type must be
set for the MERGE_ON_READ table");
+ recordMergeMode =
RecordMergeMode.valueOf(RECORD_MERGE_MODE.defaultValue());
+ LOG.warn("Setting the record merge mode to the default: {}",
recordMergeMode);
+ }
+ } else {
+ // Custom merger strategy is set
+ recordMergeMode = RecordMergeMode.CUSTOM;
+ }
+ }
+ }
+
+ private void validateMergeConfigs() {
+ boolean payloadClassNameSet = null != payloadClassName;
+ boolean payloadTypeSet = null != payloadType;
+ boolean recordMergerStrategySet = null != recordMergerStrategy;
+ boolean recordMergeModeSet = null != recordMergeMode;
+
+ checkArgument(recordMergeModeSet,
+ "Record merge mode " + HoodieTableConfig.RECORD_MERGE_MODE.key() + "
should be set");
+ switch (recordMergeMode) {
+ case OVERWRITE_WITH_LATEST:
+ checkArgument((!payloadClassNameSet && !payloadTypeSet)
+ || (payloadClassNameSet &&
payloadClassName.equals(OverwriteWithLatestAvroPayload.class.getName()))
+ || (payloadTypeSet &&
payloadType.equals(RecordPayloadType.OVERWRITE_LATEST_AVRO.name())),
+ constructMergeConfigErrorMessage());
+ break;
+ case EVENT_TIME_ORDERING:
+ checkArgument((!payloadClassNameSet && !payloadTypeSet)
+ || (payloadClassNameSet &&
payloadClassName.equals(DefaultHoodieRecordPayload.class.getName()))
+ || (payloadTypeSet &&
payloadType.equals(RecordPayloadType.HOODIE_AVRO_DEFAULT.name())),
+ constructMergeConfigErrorMessage());
+ checkArgument(!recordMergerStrategySet
+ || recordMergerStrategy.equals(DEFAULT_MERGER_STRATEGY_UUID),
+ "Record merger strategy (" + (recordMergerStrategySet ?
recordMergerStrategy : "null")
+ + ") should be consistent with the record merging mode
EVENT_TIME_ORDERING");
+ break;
+ case CUSTOM:
+ default:
+ // No op
+ }
+ }
+
+ private String constructMergeConfigErrorMessage() {
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder.append("Payload class name (");
+ stringBuilder.append(payloadClassName != null ? payloadClassName :
"null");
+ stringBuilder.append(") or type (");
+ stringBuilder.append(payloadType != null ? payloadType : "null");
+ stringBuilder.append(") should be consistent with the record merge mode
");
+ stringBuilder.append(recordMergeMode);
+ return stringBuilder.toString();
+ }
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index d58c54a929f..2f38dc9b258 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.table.log;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -97,6 +98,8 @@ public abstract class BaseHoodieLogRecordReader<T> {
protected final String preCombineField;
// Stateless component for merging records
protected final HoodieRecordMerger recordMerger;
+ // Record merge mode
+ protected final RecordMergeMode recordMergeMode;
private final TypedProperties payloadProps;
// Log File Paths
protected final List<String> logFilePaths;
@@ -148,6 +151,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
Option<String> keyFieldOverride,
boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger,
+ RecordMergeMode recordMergeMode,
HoodieFileGroupRecordBuffer<T>
recordBuffer) {
this.readerContext = readerContext;
this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
@@ -166,6 +170,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
}
this.payloadProps = props;
this.recordMerger = recordMerger;
+ this.recordMergeMode = recordMergeMode;
this.totalLogFiles.addAndGet(logFilePaths.size());
this.logFilePaths = logFilePaths;
this.reverseReader = reverseReader;
@@ -866,6 +871,8 @@ public abstract class BaseHoodieLogRecordReader<T> {
throw new UnsupportedOperationException();
}
+ public abstract Builder withRecordMergeMode(RecordMergeMode
recordMergeMode);
+
public Builder withOptimizedLogBlocksScan(boolean
enableOptimizedLogBlocksScan) {
throw new UnsupportedOperationException();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index bcc821a34a1..c79de61536c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.table.log;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
@@ -75,9 +76,10 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
Option<String> keyFieldOverride,
boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger,
+ RecordMergeMode recordMergeMode,
HoodieFileGroupRecordBuffer<T>
recordBuffer) {
super(readerContext, storage, logFilePaths, reverseReader, bufferSize,
instantRange, withOperationField,
- forceFullScan, partitionName, keyFieldOverride,
enableOptimizedLogBlocksScan, recordMerger, recordBuffer);
+ forceFullScan, partitionName, keyFieldOverride,
enableOptimizedLogBlocksScan, recordMerger, recordMergeMode, recordBuffer);
this.scannedPrefixes = new HashSet<>();
if (forceFullScan) {
@@ -228,6 +230,7 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
private boolean forceFullScan = true;
private boolean enableOptimizedLogBlocksScan = false;
private HoodieRecordMerger recordMerger =
HoodiePreCombineAvroRecordMerger.INSTANCE;
+ private RecordMergeMode recordMergeMode;
private HoodieFileGroupRecordBuffer<T> recordBuffer;
@@ -293,6 +296,12 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
return this;
}
+ @Override
+ public Builder<T> withRecordMergeMode(RecordMergeMode recordMergeMode) {
+ this.recordMergeMode = recordMergeMode;
+ return this;
+ }
+
public Builder<T> withKeyFiledOverride(String keyFieldOverride) {
this.keyFieldOverride = Objects.requireNonNull(keyFieldOverride);
return this;
@@ -324,7 +333,7 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
withOperationField, forceFullScan,
Option.ofNullable(partitionName),
Option.ofNullable(keyFieldOverride),
- enableOptimizedLogBlocksScan, recordMerger,
+ enableOptimizedLogBlocksScan, recordMerger, recordMergeMode,
recordBuffer);
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index ed8d643b215..984d9740ceb 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.table.read;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.DeleteRecord;
@@ -33,11 +34,13 @@ import
org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieCorruptedDataException;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.exception.HoodieValidationException;
@@ -60,12 +63,14 @@ import java.util.function.Function;
import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_SCHEMA;
import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
+import static
org.apache.hudi.common.table.read.HoodieFileGroupReader.getRecordMergeMode;
public abstract class HoodieBaseFileGroupRecordBuffer<T> implements
HoodieFileGroupRecordBuffer<T> {
protected final HoodieReaderContext<T> readerContext;
protected final Schema readerSchema;
protected final Option<String> partitionNameOverrideOpt;
protected final Option<String[]> partitionPathFieldOpt;
+ protected final RecordMergeMode recordMergeMode;
protected final HoodieRecordMerger recordMerger;
protected final TypedProperties payloadProps;
protected final ExternalSpillableMap<Serializable, Pair<Option<T>,
Map<String, Object>>> records;
@@ -90,6 +95,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
this.partitionNameOverrideOpt = partitionNameOverrideOpt;
this.partitionPathFieldOpt = partitionPathFieldOpt;
+ this.recordMergeMode = getRecordMergeMode(payloadProps);
this.recordMerger = recordMerger;
this.payloadProps = payloadProps;
this.internalSchema = readerContext.getSchemaHandler().getInternalSchema();
@@ -147,6 +153,37 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
records.clear();
}
+ /**
+ * Compares two {@link Comparable}s. If both are numbers, converts them to
{@link Long} for comparison.
+ * If one of the {@link Comparable}s is a String, assumes that both are
String values for comparison.
+ *
+ * @param o1 {@link Comparable} object.
+ * @param o2 other {@link Comparable} object to compare to.
+ * @return comparison result.
+ */
+ @VisibleForTesting
+ static int compareTo(Comparable o1, Comparable o2) {
+ // TODO(HUDI-7848): fix the delete records to contain the correct ordering
value type
+ // so this util with the number comparison is not necessary.
+ try {
+ return o1.compareTo(o2);
+ } catch (ClassCastException e) {
+ if (o1 instanceof Number && o2 instanceof Number) {
+ Long o1LongValue = ((Number) o1).longValue();
+ Long o2LongValue = ((Number) o2).longValue();
+ return o1LongValue.compareTo(o2LongValue);
+ } else if (o1 instanceof String || o2 instanceof String) {
+ return o1.toString().compareTo(o2.toString());
+ } else {
+ throw new IllegalArgumentException("Cannot compare values in different
types: "
+ + o1 + "(" + o1.getClass() + "), " + o2 + "(" + o2.getClass() +
")");
+ }
+ } catch (Throwable e) {
+ throw new HoodieException("Cannot compare values: "
+ + o1 + "(" + o1.getClass() + "), " + o2 + "(" + o2.getClass() + ")",
e);
+ }
+ }
+
/**
* Merge two log data records if needed.
*
@@ -160,42 +197,76 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
Map<String, Object> metadata,
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair) throws
IOException {
if (existingRecordMetadataPair != null) {
- // Merge and store the combined record
- // Note that the incoming `record` is from an older commit, so it should
be put as
- // the `older` in the merge API
- Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
enablePartialMerging
- ? recordMerger.partialMerge(
- readerContext.constructHoodieRecord(Option.of(record), metadata),
- (Schema) metadata.get(INTERNAL_META_SCHEMA),
- readerContext.constructHoodieRecord(
- existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
- (Schema)
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
- readerSchema,
- payloadProps)
- : recordMerger.merge(
- readerContext.constructHoodieRecord(Option.of(record), metadata),
- (Schema) metadata.get(INTERNAL_META_SCHEMA),
- readerContext.constructHoodieRecord(
- existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
- (Schema)
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
- payloadProps);
-
- if (!combinedRecordAndSchemaOpt.isPresent()) {
+ if (enablePartialMerging) {
+ // TODO(HUDI-7843): decouple the merging logic from the merger
+ // and use the record merge mode to control how to merge partial
updates
+ // Merge and store the combined record
+ // Note that the incoming `record` is from an older commit, so it
should be put as
+ // the `older` in the merge API
+ Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
recordMerger.partialMerge(
+ readerContext.constructHoodieRecord(Option.of(record), metadata),
+ (Schema) metadata.get(INTERNAL_META_SCHEMA),
+ readerContext.constructHoodieRecord(
+ existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
+ (Schema)
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
+ readerSchema,
+ payloadProps);
+ if (!combinedRecordAndSchemaOpt.isPresent()) {
+ return Option.empty();
+ }
+ Pair<HoodieRecord, Schema> combinedRecordAndSchema =
combinedRecordAndSchemaOpt.get();
+ HoodieRecord<T> combinedRecord = combinedRecordAndSchema.getLeft();
+
+ // If pre-combine returns existing record, no need to update it
+ if (combinedRecord.getData() !=
existingRecordMetadataPair.getLeft().get()) {
+ return Option.of(Pair.of(
+ combinedRecord.getData(),
+
readerContext.updateSchemaAndResetOrderingValInMetadata(metadata,
combinedRecordAndSchema.getRight())));
+ }
return Option.empty();
+ } else {
+ switch (recordMergeMode) {
+ case OVERWRITE_WITH_LATEST:
+ return Option.empty();
+ case EVENT_TIME_ORDERING:
+ Comparable existingOrderingValue = readerContext.getOrderingValue(
+ existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), readerSchema, payloadProps);
+ if
(isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(),
existingOrderingValue)) {
+ return Option.empty();
+ }
+ Comparable incomingOrderingValue = readerContext.getOrderingValue(
+ Option.of(record), metadata, readerSchema, payloadProps);
+ if (compareTo(incomingOrderingValue, existingOrderingValue) > 0) {
+ return Option.of(Pair.of(record, metadata));
+ }
+ return Option.empty();
+ case CUSTOM:
+ default:
+ // Merge and store the combined record
+ // Note that the incoming `record` is from an older commit, so it
should be put as
+ // the `older` in the merge API
+ Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
recordMerger.merge(
+ readerContext.constructHoodieRecord(Option.of(record),
metadata),
+ (Schema) metadata.get(INTERNAL_META_SCHEMA),
+ readerContext.constructHoodieRecord(
+ existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight()),
+ (Schema)
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
+ payloadProps);
+
+ if (!combinedRecordAndSchemaOpt.isPresent()) {
+ return Option.empty();
+ }
+
+ Pair<HoodieRecord, Schema> combinedRecordAndSchema =
combinedRecordAndSchemaOpt.get();
+ HoodieRecord<T> combinedRecord = combinedRecordAndSchema.getLeft();
+
+ // If pre-combine returns existing record, no need to update it
+ if (combinedRecord.getData() !=
existingRecordMetadataPair.getLeft().get()) {
+ return Option.of(Pair.of(combinedRecord.getData(), metadata));
+ }
+ return Option.empty();
+ }
}
-
- Pair<HoodieRecord, Schema> combinedRecordAndSchema =
combinedRecordAndSchemaOpt.get();
- HoodieRecord<T> combinedRecord = combinedRecordAndSchema.getLeft();
-
- // If pre-combine returns existing record, no need to update it
- if (combinedRecord.getData() !=
existingRecordMetadataPair.getLeft().get()) {
- return Option.of(Pair.of(
- combinedRecord.getData(),
- enablePartialMerging
- ?
readerContext.updateSchemaAndResetOrderingValInMetadata(metadata,
combinedRecordAndSchema.getRight())
- : metadata));
- }
- return Option.empty();
} else {
// Put the record as is
// NOTE: Record have to be cloned here to make sure if it holds
low-level engine-specific
@@ -265,7 +336,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
* @param dataBlock current processed block
* @return final read schema.
*/
- protected Option<Pair<Function<T,T>, Schema>>
composeEvolvedSchemaTransformer(
+ protected Option<Pair<Function<T, T>, Schema>>
composeEvolvedSchemaTransformer(
HoodieDataBlock dataBlock) {
if (internalSchema.isEmptySchema()) {
return Option.empty();
@@ -274,7 +345,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
long currentInstantTime =
Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME));
InternalSchema fileSchema =
InternalSchemaCache.searchSchemaAndCache(currentInstantTime,
hoodieTableMetaClient, false);
- Pair<InternalSchema, Map<String,String>> mergedInternalSchema = new
InternalSchemaMerger(fileSchema, internalSchema,
+ Pair<InternalSchema, Map<String, String>> mergedInternalSchema = new
InternalSchemaMerger(fileSchema, internalSchema,
true, false, false).mergeSchemaGetRenamed();
Schema mergedAvroSchema =
AvroInternalSchemaConverter.convert(mergedInternalSchema.getLeft(),
readerSchema.getFullName());
assert mergedAvroSchema.equals(readerSchema);
@@ -297,32 +368,63 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
return newer;
}
- Option<Pair<HoodieRecord, Schema>> mergedRecord;
if (enablePartialMerging) {
- mergedRecord = recordMerger.partialMerge(
+ // TODO(HUDI-7843): decouple the merging logic from the merger
+ // and use the record merge mode to control how to merge partial updates
+ Option<Pair<HoodieRecord, Schema>> mergedRecord =
recordMerger.partialMerge(
readerContext.constructHoodieRecord(older, olderInfoMap), (Schema)
olderInfoMap.get(INTERNAL_META_SCHEMA),
readerContext.constructHoodieRecord(newer, newerInfoMap), (Schema)
newerInfoMap.get(INTERNAL_META_SCHEMA),
readerSchema, payloadProps);
- } else {
- mergedRecord = recordMerger.merge(
- readerContext.constructHoodieRecord(older, olderInfoMap), (Schema)
olderInfoMap.get(INTERNAL_META_SCHEMA),
- readerContext.constructHoodieRecord(newer, newerInfoMap), (Schema)
newerInfoMap.get(INTERNAL_META_SCHEMA), payloadProps);
- }
- if (mergedRecord.isPresent()
- &&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(),
payloadProps)) {
- if (!mergedRecord.get().getRight().equals(readerSchema)) {
- return Option.ofNullable((T)
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
null, readerSchema).getData());
+ if (mergedRecord.isPresent()
+ &&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(),
payloadProps)) {
+ if (!mergedRecord.get().getRight().equals(readerSchema)) {
+ return Option.ofNullable((T)
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
null, readerSchema).getData());
+ }
+ return Option.ofNullable((T) mergedRecord.get().getLeft().getData());
+ }
+ return Option.empty();
+ } else {
+ switch (recordMergeMode) {
+ case OVERWRITE_WITH_LATEST:
+ return newer;
+ case EVENT_TIME_ORDERING:
+ Comparable oldOrderingValue = readerContext.getOrderingValue(
+ older, olderInfoMap, readerSchema, payloadProps);
+ if (isDeleteRecordWithNaturalOrder(older, oldOrderingValue)) {
+ return newer;
+ }
+ Comparable newOrderingValue = readerContext.getOrderingValue(
+ newer, newerInfoMap, readerSchema, payloadProps);
+ if (isDeleteRecordWithNaturalOrder(newer, newOrderingValue)) {
+ return Option.empty();
+ }
+ if (compareTo(oldOrderingValue, newOrderingValue) > 0) {
+ return older;
+ }
+ return newer;
+ case CUSTOM:
+ default:
+ Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.merge(
+ readerContext.constructHoodieRecord(older, olderInfoMap),
(Schema) olderInfoMap.get(INTERNAL_META_SCHEMA),
+ readerContext.constructHoodieRecord(newer, newerInfoMap),
(Schema) newerInfoMap.get(INTERNAL_META_SCHEMA), payloadProps);
+
+ if (mergedRecord.isPresent()
+ &&
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(),
payloadProps)) {
+ if (!mergedRecord.get().getRight().equals(readerSchema)) {
+ return Option.ofNullable((T)
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
null, readerSchema).getData());
+ }
+ return Option.ofNullable((T)
mergedRecord.get().getLeft().getData());
+ }
+ return Option.empty();
}
- return Option.ofNullable((T) mergedRecord.get().getLeft().getData());
}
- return Option.empty();
}
/**
* Filter a record for downstream processing when:
- * 1. A set of pre-specified keys exists.
- * 2. The key of the record is not contained in the set.
+ * 1. A set of pre-specified keys exists.
+ * 2. The key of the record is not contained in the set.
*/
protected boolean shouldSkip(T record, String keyFieldName, boolean
isFullKey, Set<String> keys, Schema writerSchema) {
String recordKey = readerContext.getValue(record, writerSchema,
keyFieldName).toString();
@@ -419,4 +521,9 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
.orElseGet(dataBlock::getSchema);
return Pair.of(transformer, evolvedSchema);
}
+
+ private boolean isDeleteRecordWithNaturalOrder(Option<T> rowOption,
+ Comparable orderingValue) {
+ return rowOption.isEmpty() && orderingValue.equals(0);
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 396da4166a7..8661a91a12f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -19,7 +19,9 @@
package org.apache.hudi.common.table.read;
+import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMemoryConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.BaseFile;
@@ -46,11 +48,13 @@ import org.apache.avro.Schema;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.Properties;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
+import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
/**
* A file group reader that iterates through the records in a single file
group.
@@ -73,6 +77,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
private final long length;
// Core structure to store and process records.
private final HoodieFileGroupRecordBuffer<T> recordBuffer;
+ private final RecordMergeMode recordMergeMode;
private ClosableIterator<T> baseFileIterator;
private final HoodieRecordMerger recordMerger;
private final Option<UnaryOperator<T>> outputConverter;
@@ -102,6 +107,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
this.props = props;
this.start = start;
this.length = length;
+ this.recordMergeMode = getRecordMergeMode(props);
this.recordMerger =
readerContext.getRecordMerger(tableConfig.getRecordMergerStrategy());
readerContext.setRecordMerger(this.recordMerger);
readerContext.setTablePath(tablePath);
@@ -154,11 +160,11 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
private ClosableIterator<T> makeBootstrapBaseFileIterator(HoodieBaseFile
baseFile) throws IOException {
BaseFile dataFile = baseFile.getBootstrapBaseFile().get();
- Pair<List<Schema.Field>,List<Schema.Field>> requiredFields =
readerContext.getSchemaHandler().getBootstrapRequiredFields();
- Pair<List<Schema.Field>,List<Schema.Field>> allFields =
readerContext.getSchemaHandler().getBootstrapDataFields();
- Option<Pair<ClosableIterator<T>,Schema>> dataFileIterator =
+ Pair<List<Schema.Field>, List<Schema.Field>> requiredFields =
readerContext.getSchemaHandler().getBootstrapRequiredFields();
+ Pair<List<Schema.Field>, List<Schema.Field>> allFields =
readerContext.getSchemaHandler().getBootstrapDataFields();
+ Option<Pair<ClosableIterator<T>, Schema>> dataFileIterator =
makeBootstrapBaseFileIteratorHelper(requiredFields.getRight(),
allFields.getRight(), dataFile);
- Option<Pair<ClosableIterator<T>,Schema>> skeletonFileIterator =
+ Option<Pair<ClosableIterator<T>, Schema>> skeletonFileIterator =
makeBootstrapBaseFileIteratorHelper(requiredFields.getLeft(),
allFields.getLeft(), baseFile);
if (!dataFileIterator.isPresent() && !skeletonFileIterator.isPresent()) {
throw new IllegalStateException("should not be here if only partition
cols are required");
@@ -180,9 +186,9 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
* @param file file to be read
* @return pair of the record iterator of the file, and the schema of the
data being read
*/
- private Option<Pair<ClosableIterator<T>,Schema>>
makeBootstrapBaseFileIteratorHelper(List<Schema.Field> requiredFields,
-
List<Schema.Field> allFields,
-
BaseFile file) throws IOException {
+ private Option<Pair<ClosableIterator<T>, Schema>>
makeBootstrapBaseFileIteratorHelper(List<Schema.Field> requiredFields,
+
List<Schema.Field> allFields,
+
BaseFile file) throws IOException {
if (requiredFields.isEmpty()) {
return Option.empty();
}
@@ -225,6 +231,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
.withPartition(getRelativePartitionPath(
new StoragePath(path), logFiles.get(0).getPath().getParent()))
.withRecordMerger(recordMerger)
+ .withRecordMergeMode(recordMergeMode)
.withRecordBuffer(recordBuffer)
.build();
logRecordReader.close();
@@ -244,6 +251,11 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
return new HoodieFileGroupReaderIterator<>(this);
}
+ public static RecordMergeMode getRecordMergeMode(Properties props) {
+ String mergeMode = getStringWithAltKeys(props,
HoodieCommonConfig.RECORD_MERGE_MODE, true).toUpperCase();
+ return RecordMergeMode.valueOf(mergeMode);
+ }
+
public static class HoodieFileGroupReaderIterator<T> implements
ClosableIterator<T> {
private HoodieFileGroupReader<T> reader;
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index a8a95887c18..9f3f8acf81c 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -22,6 +22,7 @@ package org.apache.hudi.common.table.read;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
@@ -42,22 +43,28 @@ import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS;
import static
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGER_STRATEGY;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static
org.apache.hudi.common.table.read.HoodieBaseFileGroupRecordBuffer.compareTo;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.getLogFileListFromFileSlice;
import static
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
/**
* Tests {@link HoodieFileGroupReader} with different engines
@@ -80,9 +87,55 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
Schema schema,
String fileGroupId);
+ public abstract Comparable getComparableUTF8String(String value);
+
+ @Test
+ public void testCompareToComparable() {
+ // Test same type
+ assertEquals(1, compareTo(Boolean.TRUE, Boolean.FALSE));
+ assertEquals(0, compareTo(Boolean.TRUE, Boolean.TRUE));
+ assertEquals(-1, compareTo(Boolean.FALSE, Boolean.TRUE));
+ assertEquals(1, compareTo(20, 15));
+ assertEquals(0, compareTo(15, 15));
+ assertEquals(-1, compareTo(10, 15));
+ assertEquals(1, compareTo(1.1f, 1.0f));
+ assertEquals(0, compareTo(1.0f, 1.0f));
+ assertEquals(-1, compareTo(0.9f, 1.0f));
+ assertEquals(1, compareTo(1.1, 1.0));
+ assertEquals(0, compareTo(1.0, 1.0));
+ assertEquals(-1, compareTo(0.9, 1.0));
+ assertEquals(1, compareTo("value2", "value1"));
+ assertEquals(0, compareTo("value1", "value1"));
+ assertEquals(-1, compareTo("value1", "value2"));
+ // Test different types which are comparable
+ assertEquals(1, compareTo(Long.MAX_VALUE / 2L, 10));
+ assertEquals(1, compareTo(20, 10L));
+ assertEquals(0, compareTo(10L, 10));
+ assertEquals(0, compareTo(10, 10L));
+ assertEquals(-1, compareTo(10, Long.MAX_VALUE));
+ assertEquals(-1, compareTo(10L, 20));
+ assertEquals(1, compareTo(getComparableUTF8String("value2"), "value1"));
+ assertEquals(1, compareTo("value2", getComparableUTF8String("value1")));
+ assertEquals(0, compareTo(getComparableUTF8String("value1"), "value1"));
+ assertEquals(0, compareTo("value1", getComparableUTF8String("value1")));
+ assertEquals(-1, compareTo(getComparableUTF8String("value1"), "value2"));
+ assertEquals(-1, compareTo("value1", getComparableUTF8String("value2")));
+ }
+
+ private static Stream<Arguments> testArguments() {
+ return Stream.of(
+ arguments(RecordMergeMode.OVERWRITE_WITH_LATEST, "avro"),
+ arguments(RecordMergeMode.OVERWRITE_WITH_LATEST, "parquet"),
+ arguments(RecordMergeMode.EVENT_TIME_ORDERING, "avro"),
+ arguments(RecordMergeMode.EVENT_TIME_ORDERING, "parquet"),
+ arguments(RecordMergeMode.CUSTOM, "avro"),
+ arguments(RecordMergeMode.CUSTOM, "parquet")
+ );
+ }
+
@ParameterizedTest
- @ValueSource(strings = {"avro", "parquet"})
- public void testReadFileGroupInMergeOnReadTable(String logDataBlockFormat)
throws Exception {
+ @MethodSource("testArguments")
+ public void testReadFileGroupInMergeOnReadTable(RecordMergeMode
recordMergeMode, String logDataBlockFormat) throws Exception {
Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs());
writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
logDataBlockFormat);
@@ -90,23 +143,23 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
// One commit; reading one file group containing a base file only
commitToTable(recordsToStrings(dataGen.generateInserts("001", 100)),
INSERT.value(), writeConfigs);
validateOutputFromFileGroupReader(
- getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true,
0);
+ getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true,
0, recordMergeMode);
// Two commits; reading one file group containing a base file and a log
file
commitToTable(recordsToStrings(dataGen.generateUpdates("002", 100)),
UPSERT.value(), writeConfigs);
validateOutputFromFileGroupReader(
- getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true,
1);
+ getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true,
1, recordMergeMode);
// Three commits; reading one file group containing a base file and two
log files
commitToTable(recordsToStrings(dataGen.generateUpdates("003", 100)),
UPSERT.value(), writeConfigs);
validateOutputFromFileGroupReader(
- getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true,
2);
+ getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), true,
2, recordMergeMode);
}
}
@ParameterizedTest
- @ValueSource(strings = {"avro", "parquet"})
- public void testReadLogFilesOnlyInMergeOnReadTable(String
logDataBlockFormat) throws Exception {
+ @MethodSource("testArguments")
+ public void testReadLogFilesOnlyInMergeOnReadTable(RecordMergeMode
recordMergeMode, String logDataBlockFormat) throws Exception {
Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs());
writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
logDataBlockFormat);
// Use InMemoryIndex to generate log only mor table
@@ -116,12 +169,12 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
// One commit; reading one file group containing a base file only
commitToTable(recordsToStrings(dataGen.generateInserts("001", 100)),
INSERT.value(), writeConfigs);
validateOutputFromFileGroupReader(
- getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), false,
1);
+ getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), false,
1, recordMergeMode);
// Two commits; reading one file group containing a base file and a log
file
commitToTable(recordsToStrings(dataGen.generateUpdates("002", 100)),
UPSERT.value(), writeConfigs);
validateOutputFromFileGroupReader(
- getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), false,
2);
+ getStorageConf(), getBasePath(), dataGen.getPartitionPaths(), false,
2, recordMergeMode);
}
}
@@ -145,7 +198,8 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
String tablePath,
String[] partitionPaths,
boolean containsBaseFile,
- int expectedLogFileNum)
throws Exception {
+ int expectedLogFileNum,
+ RecordMergeMode
recordMergeMode) throws Exception {
HoodieTableMetaClient metaClient =
HoodieTestUtils.createMetaClient(storageConf, tablePath);
Schema avroSchema = new
TableSchemaResolver(metaClient).getTableAvroSchema();
HoodieEngineContext engineContext = new
HoodieLocalEngineContext(storageConf);
@@ -165,6 +219,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
props.setProperty("hoodie.datasource.write.precombine.field", "timestamp");
props.setProperty("hoodie.payload.ordering.field", "timestamp");
props.setProperty(RECORD_MERGER_STRATEGY.key(),
RECORD_MERGER_STRATEGY.defaultValue());
+ props.setProperty(RECORD_MERGE_MODE.key(), recordMergeMode.name());
if (metaClient.getTableConfig().contains(PARTITION_FIELDS)) {
props.setProperty(PARTITION_FIELDS.key(),
metaClient.getTableConfig().getString(PARTITION_FIELDS));
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
index b4e2fca80d3..60358872fc8 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
@@ -18,6 +18,12 @@
package org.apache.hudi.common.table;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.RecordPayloadType;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -28,16 +34,24 @@ import org.apache.hudi.common.util.Option;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
+import java.util.stream.Stream;
+import static
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
/**
* Tests hoodie table meta client {@link HoodieTableMetaClient}.
@@ -109,6 +123,136 @@ public class TestHoodieTableMetaClient extends
HoodieCommonTestHarness {
"Commit value should be \"test-detail\"");
}
+ private static Stream<Arguments> argumentsForInferringRecordMergeMode() {
+ Stream<Arguments> arguments = Stream.of(
+ // Record merger strategy is not set
+ // Payload class is set, payload type is not set
+ arguments(Option.of(OverwriteWithLatestAvroPayload.class.getName()),
+ Option.empty(), Option.empty(),
RecordMergeMode.OVERWRITE_WITH_LATEST),
+ arguments(Option.of(DefaultHoodieRecordPayload.class.getName()),
+ Option.empty(), Option.empty(),
RecordMergeMode.EVENT_TIME_ORDERING),
+ arguments(Option.of(PostgresDebeziumAvroPayload.class.getName()),
+ Option.empty(), Option.empty(), RecordMergeMode.CUSTOM),
+ // Record merger strategy is not set
+ // Payload class is set, payload type is set; payload class takes
precedence
+ arguments(Option.of(OverwriteWithLatestAvroPayload.class.getName()),
+ Option.of(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()),
+ Option.empty(), RecordMergeMode.OVERWRITE_WITH_LATEST),
+ arguments(Option.of(DefaultHoodieRecordPayload.class.getName()),
+ Option.of(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()),
+ Option.empty(), RecordMergeMode.EVENT_TIME_ORDERING),
+ arguments(Option.of(PostgresDebeziumAvroPayload.class.getName()),
+ Option.of(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()),
+ Option.empty(), RecordMergeMode.CUSTOM),
+ // Record merger strategy is set to default
+ // Payload class is set, payload type is not set
+ arguments(Option.of(OverwriteWithLatestAvroPayload.class.getName()),
+ Option.empty(), Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+ RecordMergeMode.OVERWRITE_WITH_LATEST),
+ arguments(Option.of(DefaultHoodieRecordPayload.class.getName()),
+ Option.empty(), Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+ RecordMergeMode.EVENT_TIME_ORDERING),
+ arguments(Option.of(PostgresDebeziumAvroPayload.class.getName()),
+ Option.empty(), Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+ RecordMergeMode.CUSTOM),
+ // Record merger strategy is set to default
+ // Payload class is not set, payload type is set
+ arguments(Option.empty(),
Option.of(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()),
+ Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+ RecordMergeMode.OVERWRITE_WITH_LATEST),
+ arguments(Option.empty(),
Option.of(RecordPayloadType.HOODIE_AVRO_DEFAULT.name()),
+ Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+ RecordMergeMode.EVENT_TIME_ORDERING),
+ arguments(Option.empty(),
Option.of(RecordPayloadType.HOODIE_METADATA.name()),
+ Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+ RecordMergeMode.CUSTOM),
+ // Record merger strategy is set to default
+ // Payload class or payload type is not set
+ arguments(Option.empty(), Option.empty(),
Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+ RecordMergeMode.valueOf(RECORD_MERGE_MODE.defaultValue())),
+ // Record merger strategy is set to custom
+ arguments(Option.empty(), Option.empty(),
Option.of("custom_merge_strategy"),
+ RecordMergeMode.CUSTOM),
+ arguments(Option.of(DefaultHoodieRecordPayload.class.getName()),
+ Option.of(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()),
+ Option.of("custom_merge_strategy"), RecordMergeMode.CUSTOM)
+ );
+ return arguments;
+ }
+
+ @ParameterizedTest
+ @MethodSource("argumentsForInferringRecordMergeMode")
+ public void testInferRecordMergeMode(Option<String> payloadClassName,
+ Option<String> payloadType,
+ Option<String> recordMergerStrategy,
+ RecordMergeMode
expectedRecordMergeMode) {
+ HoodieTableMetaClient.PropertyBuilder builder =
HoodieTableMetaClient.withPropertyBuilder()
+ .setTableType(HoodieTableType.MERGE_ON_READ.name())
+ .setTableName("table_name");
+ if (payloadClassName.isPresent()) {
+ builder.setPayloadClassName(payloadClassName.get());
+ }
+ if (payloadType.isPresent()) {
+ builder.setPayloadType(payloadType.get());
+ }
+ if (recordMergerStrategy.isPresent()) {
+ builder.setRecordMergerStrategy(recordMergerStrategy.get());
+ }
+ assertEquals(expectedRecordMergeMode,
+
RecordMergeMode.valueOf(builder.build().getProperty(RECORD_MERGE_MODE.key())));
+ }
+
+ private static Stream<Arguments>
argumentsForValidationFailureOnMergeConfigs() {
+ Stream<Arguments> arguments = Stream.of(
+ arguments(Option.of(DefaultHoodieRecordPayload.class.getName()),
Option.empty(),
+ Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+ RecordMergeMode.OVERWRITE_WITH_LATEST,
+ "Payload class name
(org.apache.hudi.common.model.DefaultHoodieRecordPayload) or type "
+ + "(null) should be consistent with the record merge mode
OVERWRITE_WITH_LATEST"),
+ arguments(Option.empty(),
Option.of(RecordPayloadType.HOODIE_AVRO_DEFAULT.name()),
+ Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+ RecordMergeMode.OVERWRITE_WITH_LATEST,
+ "Payload class name (null) or type (HOODIE_AVRO_DEFAULT) "
+ + "should be consistent with the record merge mode
OVERWRITE_WITH_LATEST"),
+ arguments(Option.of(OverwriteWithLatestAvroPayload.class.getName()),
Option.empty(),
+ Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+ RecordMergeMode.EVENT_TIME_ORDERING,
+ "Payload class name
(org.apache.hudi.common.model.OverwriteWithLatestAvroPayload) or type "
+ + "(null) should be consistent with the record merge mode
EVENT_TIME_ORDERING"),
+ arguments(Option.empty(),
Option.of(RecordPayloadType.OVERWRITE_LATEST_AVRO.name()),
+ Option.of(DEFAULT_MERGER_STRATEGY_UUID),
+ RecordMergeMode.EVENT_TIME_ORDERING,
+ "Payload class name (null) or type (OVERWRITE_LATEST_AVRO) "
+ + "should be consistent with the record merge mode
EVENT_TIME_ORDERING")
+ );
+ return arguments;
+ }
+
+ @ParameterizedTest
+ @MethodSource("argumentsForValidationFailureOnMergeConfigs")
+ public void testValidationFailureOnMergeConfigs(Option<String>
payloadClassName,
+ Option<String> payloadType,
+ Option<String>
recordMergerStrategy,
+ RecordMergeMode
recordMergeMode,
+ String expectedErrorMessage)
{
+ HoodieTableMetaClient.PropertyBuilder builder =
HoodieTableMetaClient.withPropertyBuilder()
+ .setTableType(HoodieTableType.MERGE_ON_READ.name())
+ .setTableName("table_name")
+ .setRecordMergeMode(recordMergeMode);
+ if (payloadClassName.isPresent()) {
+ builder.setPayloadClassName(payloadClassName.get());
+ }
+ if (payloadType.isPresent()) {
+ builder.setPayloadType(payloadType.get());
+ }
+ if (recordMergerStrategy.isPresent()) {
+ builder.setRecordMergerStrategy(recordMergerStrategy.get());
+ }
+ IllegalArgumentException exception = assertThrows(
+ IllegalArgumentException.class, builder::build);
+ assertEquals(expectedErrorMessage, exception.getMessage());
+ }
+
@Test
public void testEquals() throws IOException {
HoodieTableMetaClient metaClient1 =
HoodieTestUtils.init(tempDir.toAbsolutePath().toString(), getTableType());
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
index 3e80d4bee56..4ec1c0556b0 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomMerger.java
@@ -19,6 +19,8 @@
package org.apache.hudi.common.table.read;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieRecord;
@@ -58,6 +60,8 @@ public class TestCustomMerger extends
HoodieFileGroupReaderTestHarness {
readerContext = new HoodieTestReaderContext(
Option.of(new CustomAvroMerger()),
Option.of(HoodieRecordTestPayload.class.getName()));
+ properties.setProperty(
+ HoodieCommonConfig.RECORD_MERGE_MODE.key(),
RecordMergeMode.CUSTOM.name());
// -------------------------------------------------------------
// The test logic is as follows:
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
index bf0fac19c67..3b3fc3c4359 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestEventTimeMerging.java
@@ -19,6 +19,8 @@
package org.apache.hudi.common.table.read;
+import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.reader.HoodieAvroRecordTestMerger;
@@ -55,6 +57,8 @@ public class TestEventTimeMerging extends
HoodieFileGroupReaderTestHarness {
readerContext = new HoodieTestReaderContext(
Option.of(merger),
Option.of(HoodieRecordTestPayload.class.getName()));
+ properties.setProperty(
+ HoodieCommonConfig.RECORD_MERGE_MODE.key(),
RecordMergeMode.EVENT_TIME_ORDERING.name());
// -------------------------------------------------------------
// The test logic is as follows:
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
index e59e65bea3e..f61db4ee247 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
@@ -19,7 +19,9 @@
package org.apache.hudi;
+import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.DeleteRecord;
@@ -112,13 +114,15 @@ public class TestHoodiePositionBasedFileGroupRecordBuffer
extends TestHoodieFile
ctx.setRecordMerger(useCustomMerger ? new CustomMerger() : new
HoodieSparkRecordMerger());
ctx.setSchemaHandler(new HoodiePositionBasedSchemaHandler<>(ctx,
avroSchema, avroSchema,
Option.empty(), metaClient.getTableConfig()));
+ TypedProperties props = new TypedProperties();
+ props.put(HoodieCommonConfig.RECORD_MERGE_MODE.key(),
RecordMergeMode.CUSTOM.name());
buffer = new HoodiePositionBasedFileGroupRecordBuffer<>(
ctx,
metaClient,
partitionNameOpt,
partitionFields,
ctx.getRecordMerger(),
- new TypedProperties(),
+ props,
1024 * 1024 * 1000,
metaClient.getTempFolderPath(),
ExternalSpillableMap.DiskMapType.ROCKS_DB,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 747fcb9a2eb..e20104858b6 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -19,8 +19,6 @@
package org.apache.hudi.common.table.read
-import org.apache.avro.Schema
-import org.apache.hadoop.conf.Configuration
import
org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
@@ -28,14 +26,19 @@ import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.storage.StorageConfiguration
import org.apache.hudi.{HoodieSparkRecordMerger, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
+
+import org.apache.avro.Schema
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Dataset, HoodieInternalRowUtils,
HoodieUnsafeUtils, Row, SaveMode, SparkSession}
+import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.{HoodieSparkKryoRegistrar, SparkConf}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import java.util
+
import scala.collection.JavaConverters._
/**
@@ -114,4 +117,8 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
assertEquals(0, expectedDf.except(actualDf).count())
assertEquals(0, actualDf.except(expectedDf).count())
}
+
+ override def getComparableUTF8String(value: String): Comparable[_] = {
+ UTF8String.fromString(value)
+ }
}