This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e0fa459f1516 [HUDI-9661] Fix `Hoodie#getOrderingValue()` to avoid
extracting ordering fields at record level (#13645)
e0fa459f1516 is described below
commit e0fa459f1516faf86ff8f718bd347561a8c4bc25
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Jul 31 19:04:54 2025 +0800
[HUDI-9661] Fix `Hoodie#getOrderingValue()` to avoid extracting ordering
fields at record level (#13645)
---
.../org/apache/hudi/io/HoodieAppendHandle.java | 5 ++++-
.../client/model/CommitTimeFlinkRecordMerger.java | 2 --
.../client/model/EventTimeFlinkRecordMerger.java | 8 ++++++--
.../hudi/client/model/HoodieFlinkRecord.java | 4 +---
.../model/PartialUpdateFlinkRecordMerger.java | 10 ++++++++--
.../org/apache/hudi/DefaultSparkRecordMerger.java | 13 ++++++++++--
.../hudi/common/model/HoodieSparkRecord.java | 4 +---
.../hudi/common/model/HoodieAvroIndexedRecord.java | 3 +--
.../apache/hudi/common/model/HoodieAvroRecord.java | 2 +-
.../hudi/common/model/HoodieEmptyRecord.java | 2 +-
.../org/apache/hudi/common/model/HoodieRecord.java | 8 +++++---
.../table/log/HoodieMergedLogRecordScanner.java | 5 ++++-
.../hudi/common/table/read/BufferedRecord.java | 4 ++--
.../table/read/BufferedRecordMergerFactory.java | 15 ++++++++++----
.../apache/hudi/common/util/HoodieRecordUtils.java | 23 +---------------------
.../table/read/TestHoodieFileGroupReaderBase.java | 2 +-
.../reader/HoodieAvroRecordTestMerger.java | 12 +++++++++--
.../testutils/reader/HoodieFileSliceTestUtils.java | 8 ++------
.../hudi/common/util/TestHoodieRecordUtils.java | 2 +-
.../table/format/FlinkRowDataReaderContext.java | 4 ++--
.../common/table/read/TestCustomRecordMerger.java | 9 +++++++--
.../hudi/hadoop/DefaultHiveRecordMerger.java | 9 ++++++++-
.../org/apache/hudi/hadoop/HoodieHiveRecord.java | 4 +---
23 files changed, 89 insertions(+), 69 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index e6707a8688ed..63b79102fd16 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -48,6 +48,7 @@ import
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
@@ -134,6 +135,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
private boolean useWriterSchema = false;
private final Properties recordProperties = new Properties();
+ private final String[] orderingFields;
/**
* This is used by log compaction only.
@@ -162,6 +164,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
this.sizeEstimator = getSizeEstimator();
this.statuses = new ArrayList<>();
this.recordProperties.putAll(config.getProps());
+ this.orderingFields = ConfigUtils.getOrderingFields(recordProperties);
boolean shouldWriteRecordPositions = config.shouldWriteRecordPositions()
// record positions supported only from table version 8
&&
config.getWriteVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT);
@@ -646,7 +649,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
hoodieRecord.seal();
recordsDeleted++;
- final Comparable<?> orderingVal =
hoodieRecord.getOrderingValue(writeSchema, recordProperties);
+ final Comparable<?> orderingVal =
hoodieRecord.getOrderingValue(writeSchema, recordProperties, orderingFields);
long position = baseFileInstantTimeOfPositions.isPresent() ?
hoodieRecord.getCurrentPosition() : -1L;
recordsToDeleteWithPositions.add(Pair.of(DeleteRecord.create(hoodieRecord.getKey(),
orderingVal), position));
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/CommitTimeFlinkRecordMerger.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/CommitTimeFlinkRecordMerger.java
index 012d632e2a7d..d4150e78ca65 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/CommitTimeFlinkRecordMerger.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/CommitTimeFlinkRecordMerger.java
@@ -33,8 +33,6 @@ import java.io.IOException;
*/
public class CommitTimeFlinkRecordMerger extends HoodieFlinkRecordMerger {
- public static final CommitTimeFlinkRecordMerger INSTANCE = new
CommitTimeFlinkRecordMerger();
-
@Override
public String getMergingStrategy() {
return COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/EventTimeFlinkRecordMerger.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/EventTimeFlinkRecordMerger.java
index 0bdab63b3caf..e8c77eceb834 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/EventTimeFlinkRecordMerger.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/EventTimeFlinkRecordMerger.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client.model;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -34,7 +35,7 @@ import java.io.IOException;
*/
public class EventTimeFlinkRecordMerger extends HoodieFlinkRecordMerger {
- public static final EventTimeFlinkRecordMerger INSTANCE = new
EventTimeFlinkRecordMerger();
+ private String[] orderingFields;
@Override
public String getMergingStrategy() {
@@ -52,7 +53,10 @@ public class EventTimeFlinkRecordMerger extends
HoodieFlinkRecordMerger {
ValidationUtils.checkArgument(older.getRecordType() ==
HoodieRecord.HoodieRecordType.FLINK);
ValidationUtils.checkArgument(newer.getRecordType() ==
HoodieRecord.HoodieRecordType.FLINK);
- if (older.getOrderingValue(oldSchema,
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
+ if (orderingFields == null) {
+ orderingFields = ConfigUtils.getOrderingFields(props);
+ }
+ if (older.getOrderingValue(oldSchema, props,
orderingFields).compareTo(newer.getOrderingValue(newSchema, props,
orderingFields)) > 0) {
return Option.of(Pair.of(older, oldSchema));
} else {
return Option.of(Pair.of(newer, newSchema));
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
index f942d4cf8714..40e8872847c2 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.MetadataValues;
-import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.OrderingValues;
import org.apache.hudi.common.util.ValidationUtils;
@@ -101,8 +100,7 @@ public class HoodieFlinkRecord extends
HoodieRecord<RowData> {
}
@Override
- protected Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props) {
- String[] orderingFields = ConfigUtils.getOrderingFields(props);
+ protected Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props, String[] orderingFields) {
if (orderingFields == null) {
return OrderingValues.getDefault();
} else {
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java
index 07082ce40ddc..e9e74e05ce0a 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client.model;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -81,6 +82,8 @@ import java.io.IOException;
*/
public class PartialUpdateFlinkRecordMerger extends HoodieFlinkRecordMerger {
+ private String[] orderingFields;
+
@Override
public String getMergingStrategy() {
return CUSTOM_MERGE_STRATEGY_UUID;
@@ -97,7 +100,10 @@ public class PartialUpdateFlinkRecordMerger extends
HoodieFlinkRecordMerger {
ValidationUtils.checkArgument(older.getRecordType() ==
HoodieRecord.HoodieRecordType.FLINK);
ValidationUtils.checkArgument(newer.getRecordType() ==
HoodieRecord.HoodieRecordType.FLINK);
- if (older.getOrderingValue(oldSchema,
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
+ if (orderingFields == null) {
+ orderingFields = ConfigUtils.getOrderingFields(props);
+ }
+ if (older.getOrderingValue(oldSchema, props,
orderingFields).compareTo(newer.getOrderingValue(newSchema, props,
orderingFields)) > 0) {
if (older.isDelete(oldSchema, props) || newer.isDelete(newSchema,
props)) {
return Option.of(Pair.of(older, oldSchema));
} else {
@@ -161,7 +167,7 @@ public class PartialUpdateFlinkRecordMerger extends
HoodieFlinkRecordMerger {
return new HoodieFlinkRecord(
highOrderRecord.getKey(),
highOrderRecord.getOperation(),
- highOrderRecord.getOrderingValue(highOrderSchema, props),
+ highOrderRecord.getOrderingValue(highOrderSchema, props,
orderingFields),
mergedRow);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
index 7ac55ea81e44..385f57f21e46 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.merge.SparkRecordMergingUtils;
@@ -36,6 +37,8 @@ import java.io.IOException;
*/
public class DefaultSparkRecordMerger extends HoodieSparkRecordMerger {
+ private String[] orderingFields;
+
@Override
public String getMergingStrategy() {
return HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
@@ -48,7 +51,10 @@ public class DefaultSparkRecordMerger extends
HoodieSparkRecordMerger {
return deleteHandlingResult;
}
- if (older.getOrderingValue(oldSchema,
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
+ if (orderingFields == null) {
+ orderingFields = ConfigUtils.getOrderingFields(props);
+ }
+ if (older.getOrderingValue(oldSchema, props,
orderingFields).compareTo(newer.getOrderingValue(newSchema, props,
orderingFields)) > 0) {
return Option.of(Pair.of(older, oldSchema));
} else {
return Option.of(Pair.of(newer, newSchema));
@@ -62,7 +68,10 @@ public class DefaultSparkRecordMerger extends
HoodieSparkRecordMerger {
return deleteHandlingResult;
}
- if (older.getOrderingValue(oldSchema,
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
+ if (orderingFields == null) {
+ orderingFields = ConfigUtils.getOrderingFields(props);
+ }
+ if (older.getOrderingValue(oldSchema, props,
orderingFields).compareTo(newer.getOrderingValue(newSchema, props,
orderingFields)) > 0) {
return Option.of(SparkRecordMergingUtils.mergePartialRecords(
(HoodieSparkRecord) newer, newSchema, (HoodieSparkRecord) older,
oldSchema, readerSchema, props));
} else {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index be4f3a9c9268..04060831a112 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -21,7 +21,6 @@ package org.apache.hudi.common.model;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.client.model.HoodieInternalRow;
-import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.OrderingValues;
import org.apache.hudi.common.util.StringUtils;
@@ -345,9 +344,8 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
}
@Override
- protected Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props) {
+ protected Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props, String[] orderingFields) {
StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
- String[] orderingFields = ConfigUtils.getOrderingFields(props);
if (orderingFields != null) {
return OrderingValues.create(orderingFields, field -> {
scala.Option<NestedFieldPath> cachedNestedFieldPath =
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index 1ff5dde705b9..01d6145aca7a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -213,8 +213,7 @@ public class HoodieAvroIndexedRecord extends
HoodieRecord<IndexedRecord> {
}
@Override
- public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props) {
- String[] orderingFields = ConfigUtils.getOrderingFields(props);
+ public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props, String[] orderingFields) {
if (orderingFields == null) {
return OrderingValues.getDefault();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index 44a3983546c4..1f8f1f2fa624 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -98,7 +98,7 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload>
extends HoodieRecor
}
@Override
- public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props) {
+ public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props, String[] orderingFields) {
return this.getData().getOrderingValue();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
index a7d0d4903e3d..1f5145ee47e8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
@@ -58,7 +58,7 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
}
@Override
- public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props) {
+ public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props, String[] orderingFields) {
return orderingVal;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 48b0042b5aaa..78a2cd3d9493 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -218,11 +218,12 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
*
* @param recordSchema Avro schema for the record
* @param props Properties containing the necessary configurations
+ * @param orderingFields name of the ordering fields
* @return The ordering value for the record
*/
- public Comparable<?> getOrderingValue(Schema recordSchema, Properties props)
{
+ public Comparable<?> getOrderingValue(Schema recordSchema, Properties props,
String[] orderingFields) {
if (orderingValue == null) {
- orderingValue = doGetOrderingValue(recordSchema, props);
+ orderingValue = doGetOrderingValue(recordSchema, props, orderingFields);
}
return orderingValue;
}
@@ -232,9 +233,10 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
*
* @param recordSchema Avro schema for the record
* @param props Properties containing the necessary configurations
+ * @param orderingFields name of the ordering fields
* @return The ordering value for the record
*/
- protected abstract Comparable<?> doGetOrderingValue(Schema recordSchema,
Properties props);
+ protected abstract Comparable<?> doGetOrderingValue(Schema recordSchema,
Properties props, String[] orderingFields);
public T getData() {
if (data == null) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 8f89dd9bcf09..0a8b8268d0f5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.serialization.DefaultSerializer;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordUtils;
@@ -90,6 +91,7 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
private final long maxMemorySizeInBytes;
// Stores the total time taken to perform reading and merging of log blocks
private long totalTimeTakenToReadAndMergeBlocks;
+ private final String[] orderingFields;
@SuppressWarnings("unchecked")
protected HoodieMergedLogRecordScanner(HoodieStorage storage, String
basePath, List<String> logFilePaths, Schema readerSchema,
@@ -115,6 +117,7 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
new HoodieRecordSizeEstimator(readerSchema), diskMapType, new
DefaultSerializer<>(), isBitCaskDiskMapCompressionEnabled,
getClass().getSimpleName());
this.scannedPrefixes = new HashSet<>();
this.allowInflightInstants = allowInflightInstants;
+ this.orderingFields =
ConfigUtils.getOrderingFields(this.hoodieTableMetaClient.getTableConfig().getProps());
} catch (IOException e) {
throw new HoodieIOException("IOException when creating
ExternalSpillableMap at " + spillableMapBasePath, e);
}
@@ -275,7 +278,7 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordScanner
// should be deleted or be kept. The old record is kept only if the
DELETE record has smaller ordering val.
// For same ordering values, uses the natural order(arrival time
semantics).
- Comparable curOrderingVal =
oldRecord.getOrderingValue(this.readerSchema,
this.hoodieTableMetaClient.getTableConfig().getProps());
+ Comparable curOrderingVal =
oldRecord.getOrderingValue(this.readerSchema,
this.hoodieTableMetaClient.getTableConfig().getProps(), orderingFields);
Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
// Checks the ordering value does not equal to 0
// because we use 0 as the default value which means natural order
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
index bf47163db780..5f205431b24f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
@@ -54,7 +54,7 @@ public class BufferedRecord<T> implements Serializable {
this.isDelete = isDelete;
}
- public static <T> BufferedRecord<T> forRecordWithContext(HoodieRecord<T>
record, Schema schema, RecordContext<T> recordContext, Properties props) {
+ public static <T> BufferedRecord<T> forRecordWithContext(HoodieRecord<T>
record, Schema schema, RecordContext<T> recordContext, Properties props,
String[] orderingFields) {
HoodieKey hoodieKey = record.getKey();
String recordKey = hoodieKey == null ?
recordContext.getRecordKey(record.getData(), schema) : hoodieKey.getRecordKey();
Integer schemaId = recordContext.encodeAvroSchema(schema);
@@ -64,7 +64,7 @@ public class BufferedRecord<T> implements Serializable {
} catch (IOException e) {
throw new HoodieException("Failed to get isDelete from record.", e);
}
- return new BufferedRecord<>(recordKey, record.getOrderingValue(schema,
props), record.getData(), schemaId, isDelete);
+ return new BufferedRecord<>(recordKey, record.getOrderingValue(schema,
props, orderingFields), record.getData(), schemaId, isDelete);
}
public static <T> BufferedRecord<T> forRecordWithContext(T record, Schema
schema, RecordContext<T> recordContext, List<String> orderingFieldNames,
boolean isDelete) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
index 2a6f5b02b4bc..46feee0e67e6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
@@ -65,7 +65,7 @@ public class BufferedRecordMergerFactory {
if (enablePartialMerging) {
BufferedRecordMerger<T> deleteRecordMerger = create(
readerContext, recordMergeMode, false, recordMerger,
orderingFieldNames, payloadClass, readerSchema, props, partialUpdateMode);
- return new
PartialUpdateBufferedRecordMerger<>(readerContext.getRecordContext(),
recordMerger, deleteRecordMerger, readerSchema, props);
+ return new
PartialUpdateBufferedRecordMerger<>(readerContext.getRecordContext(),
recordMerger, deleteRecordMerger, orderingFieldNames, readerSchema, props);
}
switch (recordMergeMode) {
@@ -84,7 +84,7 @@ public class BufferedRecordMergerFactory {
return new CustomPayloadRecordMerger<>(
readerContext.getRecordContext(), recordMerger,
orderingFieldNames, payloadClass.get(), readerSchema, props);
} else {
- return new CustomRecordMerger<>(readerContext.getRecordContext(),
recordMerger, readerSchema, props);
+ return new CustomRecordMerger<>(readerContext.getRecordContext(),
recordMerger, orderingFieldNames, readerSchema, props);
}
}
}
@@ -259,11 +259,13 @@ public class BufferedRecordMergerFactory {
private final BufferedRecordMerger<T> deleteRecordMerger;
private final Schema readerSchema;
private final TypedProperties props;
+ private final String[] orderingFields;
public PartialUpdateBufferedRecordMerger(
RecordContext<T> recordContext,
Option<HoodieRecordMerger> recordMerger,
BufferedRecordMerger<T> deleteRecordMerger,
+ List<String> orderingFieldNames,
Schema readerSchema,
TypedProperties props) {
this.recordContext = recordContext;
@@ -271,6 +273,7 @@ public class BufferedRecordMergerFactory {
this.deleteRecordMerger = deleteRecordMerger;
this.readerSchema = readerSchema;
this.props = props;
+ this.orderingFields = orderingFieldNames.toArray(new String[0]);
}
@Override
@@ -296,7 +299,7 @@ public class BufferedRecordMergerFactory {
// If pre-combine returns existing record, no need to update it
if (combinedRecord.getData() != existingRecord.getRecord()) {
- return Option.of(BufferedRecord.forRecordWithContext(combinedRecord,
combinedRecordAndSchema.getRight(), recordContext, props));
+ return Option.of(BufferedRecord.forRecordWithContext(combinedRecord,
combinedRecordAndSchema.getRight(), recordContext, props, orderingFields));
}
return Option.empty();
}
@@ -333,12 +336,16 @@ public class BufferedRecordMergerFactory {
* based on {@code CUSTOM} merge mode.
*/
private static class CustomRecordMerger<T> extends BaseCustomMerger<T> {
+ private final String[] orderingFields;
+
public CustomRecordMerger(
RecordContext<T> recordContext,
Option<HoodieRecordMerger> recordMerger,
+ List<String> orderingFieldNames,
Schema readerSchema,
TypedProperties props) {
super(recordContext, recordMerger, readerSchema, props);
+ this.orderingFields = orderingFieldNames.toArray(new String[0]);
}
@Override
@@ -360,7 +367,7 @@ public class BufferedRecordMergerFactory {
// If pre-combine returns existing record, no need to update it
if (combinedRecord.getData() != existingRecord.getRecord()) {
- return Option.of(BufferedRecord.forRecordWithContext(combinedRecord,
combinedRecordAndSchema.getRight(), recordContext, props));
+ return Option.of(BufferedRecord.forRecordWithContext(combinedRecord,
combinedRecordAndSchema.getRight(), recordContext, props, orderingFields));
}
return Option.empty();
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
index 4cc59c2927f1..6d3c3b9eadca 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
@@ -32,7 +32,6 @@ import org.apache.avro.generic.GenericRecord;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -42,33 +41,13 @@ import java.util.concurrent.ConcurrentHashMap;
* A utility class for HoodieRecord.
*/
public class HoodieRecordUtils {
- private static final Map<String, Object> INSTANCE_CACHE = new HashMap<>();
private static final Map<String, Constructor<?>> CONSTRUCTOR_CACHE = new
ConcurrentHashMap<>();
- static {
- INSTANCE_CACHE.put(HoodieAvroRecordMerger.class.getName(),
HoodieAvroRecordMerger.INSTANCE);
- }
-
/**
* Instantiate a given class with a record merge.
*/
public static HoodieRecordMerger loadRecordMerger(String mergerClass) {
- try {
- HoodieRecordMerger recordMerger = (HoodieRecordMerger)
INSTANCE_CACHE.get(mergerClass);
- if (null == recordMerger) {
- synchronized (HoodieRecordMerger.class) {
- recordMerger = (HoodieRecordMerger) INSTANCE_CACHE.get(mergerClass);
- if (null == recordMerger) {
- recordMerger = (HoodieRecordMerger)
ReflectionUtils.loadClass(mergerClass,
- new Object[] {});
- INSTANCE_CACHE.put(mergerClass, recordMerger);
- }
- }
- }
- return recordMerger;
- } catch (HoodieException e) {
- throw new HoodieException("Unable to instantiate hoodie merge class ",
e);
- }
+ return (HoodieRecordMerger) ReflectionUtils.loadClass(mergerClass, new
Object[] {});
}
/**
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 870ad694b652..248d54c82f0d 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -904,7 +904,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
.map(record -> new HoodieTestDataGenerator.RecordIdentifier(
record.getRecordKey(),
removeHiveStylePartition(record.getPartitionPath()),
- record.getOrderingValue(schema, props).toString(),
+ record.getOrderingValue(schema, props,
preCombineFields.toArray(new String[0])).toString(),
readerContext.getRecordContext().getValue(record.getData(),
schema, RIDER_FIELD_NAME).toString()))
.collect(Collectors.toList());
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieAvroRecordTestMerger.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieAvroRecordTestMerger.java
index 9bb69b6ceb06..2914ff06c9b9 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieAvroRecordTestMerger.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieAvroRecordTestMerger.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
@@ -34,6 +35,9 @@ import java.io.IOException;
import java.util.Properties;
public class HoodieAvroRecordTestMerger extends HoodieAvroRecordMerger {
+
+ private String[] orderingFields;
+
@Override
public Option<Pair<HoodieRecord, Schema>> merge(
HoodieRecord older,
@@ -42,8 +46,12 @@ public class HoodieAvroRecordTestMerger extends
HoodieAvroRecordMerger {
Schema newSchema,
TypedProperties props
) throws IOException {
- Comparable oldOrderingVal = older.getOrderingValue(oldSchema, props);
- Comparable newOrderingVal = newer.getOrderingValue(newSchema, props);
+ if (orderingFields == null) {
+ orderingFields = ConfigUtils.getOrderingFields(props);
+ }
+
+ Comparable oldOrderingVal = older.getOrderingValue(oldSchema, props,
orderingFields);
+ Comparable newOrderingVal = newer.getOrderingValue(newSchema, props,
orderingFields);
// The record with higher ordering value is returned.
if (oldOrderingVal == null || newOrderingVal.compareTo(oldOrderingVal) >
0) {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
index 594cbc1b8ca1..6cc8c0750dad 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
@@ -92,11 +92,7 @@ public class HoodieFileSliceTestUtils {
public static final HoodieTestDataGenerator DATA_GEN =
new HoodieTestDataGenerator(0xDEED);
public static final TypedProperties PROPERTIES = new TypedProperties();
-
- static {
- PROPERTIES.setProperty(
- "hoodie.datasource.write.precombine.field", "timestamp");
- }
+ private static String[] orderingFields = new String[] {"timestamp"};
// We use a number to represent a record key, and a (start, end) range
// to represent a set of record keys between start <= k <= end.
@@ -235,7 +231,7 @@ public class HoodieFileSliceTestUtils {
return new HoodieDeleteBlock(
hoodieRecords.stream().map(
r -> Pair.of(DeleteRecord.create(
- r.getKey(), r.getOrderingValue(schema, props)),
r.getCurrentLocation().getPosition()))
+ r.getKey(), r.getOrderingValue(schema, props,
orderingFields)), r.getCurrentLocation().getPosition()))
.collect(Collectors.toList()),
header);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
index 9f693b01a536..466486158076 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
@@ -37,7 +37,7 @@ class TestHoodieRecordUtils {
HoodieRecordMerger recordMerger1 =
HoodieRecordUtils.loadRecordMerger(mergeClassName);
HoodieRecordMerger recordMerger2 =
HoodieRecordUtils.loadRecordMerger(mergeClassName);
assertEquals(recordMerger1.getClass().getName(), mergeClassName);
- assertEquals(recordMerger1, recordMerger2);
+ assertEquals(recordMerger2.getClass().getName(), mergeClassName);
}
@Test
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index a17286e9dfdc..5d9dca452fa9 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -115,9 +115,9 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode,
String mergeStrategyId, String mergeImplClasses) {
switch (mergeMode) {
case EVENT_TIME_ORDERING:
- return Option.of(EventTimeFlinkRecordMerger.INSTANCE);
+ return Option.of(new EventTimeFlinkRecordMerger());
case COMMIT_TIME_ORDERING:
- return Option.of(CommitTimeFlinkRecordMerger.INSTANCE);
+ return Option.of(new CommitTimeFlinkRecordMerger());
default:
Option<HoodieRecordMerger> recordMerger =
HoodieRecordUtils.createValidRecordMerger(EngineType.FLINK, mergeImplClasses,
mergeStrategyId);
if (recordMerger.isEmpty()) {
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomRecordMerger.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomRecordMerger.java
index 47d75a1f99e8..a1a73a3d20a9 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomRecordMerger.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomRecordMerger.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.testutils.HoodieTestTable;
import
org.apache.hudi.common.testutils.reader.HoodieFileGroupReaderTestHarness;
import org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.Pair;
@@ -239,6 +240,7 @@ public class TestCustomRecordMerger extends
HoodieFileGroupReaderTestHarness {
public static final String KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY =
"KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY";
public static final String TIMESTAMP = "timestamp";
+ private String[] orderingFields;
@Override
public Option<Pair<HoodieRecord, Schema>> merge(
@@ -248,8 +250,11 @@ public class TestCustomRecordMerger extends
HoodieFileGroupReaderTestHarness {
Schema newSchema,
TypedProperties props
) throws IOException {
- if (newer.getOrderingValue(newSchema, props).compareTo(
- older.getOrderingValue(oldSchema, props)) >= 0) {
+ if (orderingFields == null) {
+ this.orderingFields = ConfigUtils.getOrderingFields(props);
+ }
+ if (newer.getOrderingValue(newSchema, props, orderingFields).compareTo(
+ older.getOrderingValue(oldSchema, props, orderingFields)) >= 0) {
if (newer.isDelete(newSchema, props)) {
return Option.empty();
}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/DefaultHiveRecordMerger.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/DefaultHiveRecordMerger.java
index dc03c755ee79..be3c28c91fc6 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/DefaultHiveRecordMerger.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/DefaultHiveRecordMerger.java
@@ -22,6 +22,7 @@ package org.apache.hudi.hadoop;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -34,6 +35,9 @@ import java.io.IOException;
* Record merger for hive that implements the default merger strategy
*/
public class DefaultHiveRecordMerger extends HoodieHiveRecordMerger {
+
+ private String[] orderingFields;
+
@Override
public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
ValidationUtils.checkArgument(older.getRecordType() ==
HoodieRecord.HoodieRecordType.HIVE);
@@ -55,7 +59,10 @@ public class DefaultHiveRecordMerger extends
HoodieHiveRecordMerger {
} else if (older.getData() == null) {
return Option.empty();
}
- if (older.getOrderingValue(oldSchema,
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
+ if (orderingFields == null) {
+ orderingFields = ConfigUtils.getOrderingFields(props);
+ }
+ if (older.getOrderingValue(oldSchema, props,
orderingFields).compareTo(newer.getOrderingValue(newSchema, props,
orderingFields)) > 0) {
return Option.of(Pair.of(older, oldSchema));
} else {
return Option.of(Pair.of(newer, newSchema));
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
index fd2b8572ce91..caf4b331f019 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.MetadataValues;
-import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.OrderingValues;
import org.apache.hudi.common.util.collection.Pair;
@@ -107,8 +106,7 @@ public class HoodieHiveRecord extends
HoodieRecord<ArrayWritable> {
}
@Override
- public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props) {
- String[] orderingFields = ConfigUtils.getOrderingFields(props);
+ public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties
props, String[] orderingFields) {
if (orderingFields == null) {
return OrderingValues.getDefault();
} else {