This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e0fa459f1516 [HUDI-9661] Fix `Hoodie#getOrderingValue()` to avoid 
extracting ordering fields at record level (#13645)
e0fa459f1516 is described below

commit e0fa459f1516faf86ff8f718bd347561a8c4bc25
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Jul 31 19:04:54 2025 +0800

    [HUDI-9661] Fix `Hoodie#getOrderingValue()` to avoid extracting ordering 
fields at record level (#13645)
---
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  5 ++++-
 .../client/model/CommitTimeFlinkRecordMerger.java  |  2 --
 .../client/model/EventTimeFlinkRecordMerger.java   |  8 ++++++--
 .../hudi/client/model/HoodieFlinkRecord.java       |  4 +---
 .../model/PartialUpdateFlinkRecordMerger.java      | 10 ++++++++--
 .../org/apache/hudi/DefaultSparkRecordMerger.java  | 13 ++++++++++--
 .../hudi/common/model/HoodieSparkRecord.java       |  4 +---
 .../hudi/common/model/HoodieAvroIndexedRecord.java |  3 +--
 .../apache/hudi/common/model/HoodieAvroRecord.java |  2 +-
 .../hudi/common/model/HoodieEmptyRecord.java       |  2 +-
 .../org/apache/hudi/common/model/HoodieRecord.java |  8 +++++---
 .../table/log/HoodieMergedLogRecordScanner.java    |  5 ++++-
 .../hudi/common/table/read/BufferedRecord.java     |  4 ++--
 .../table/read/BufferedRecordMergerFactory.java    | 15 ++++++++++----
 .../apache/hudi/common/util/HoodieRecordUtils.java | 23 +---------------------
 .../table/read/TestHoodieFileGroupReaderBase.java  |  2 +-
 .../reader/HoodieAvroRecordTestMerger.java         | 12 +++++++++--
 .../testutils/reader/HoodieFileSliceTestUtils.java |  8 ++------
 .../hudi/common/util/TestHoodieRecordUtils.java    |  2 +-
 .../table/format/FlinkRowDataReaderContext.java    |  4 ++--
 .../common/table/read/TestCustomRecordMerger.java  |  9 +++++++--
 .../hudi/hadoop/DefaultHiveRecordMerger.java       |  9 ++++++++-
 .../org/apache/hudi/hadoop/HoodieHiveRecord.java   |  4 +---
 23 files changed, 89 insertions(+), 69 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index e6707a8688ed..63b79102fd16 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -48,6 +48,7 @@ import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
 import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
 import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.common.table.view.TableFileSystemView;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.HoodieRecordUtils;
 import org.apache.hudi.common.util.Option;
@@ -134,6 +135,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
   private boolean useWriterSchema = false;
 
   private final Properties recordProperties = new Properties();
+  private final String[] orderingFields;
 
   /**
    * This is used by log compaction only.
@@ -162,6 +164,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     this.sizeEstimator = getSizeEstimator();
     this.statuses = new ArrayList<>();
     this.recordProperties.putAll(config.getProps());
+    this.orderingFields = ConfigUtils.getOrderingFields(recordProperties);
     boolean shouldWriteRecordPositions = config.shouldWriteRecordPositions()
         // record positions supported only from table version 8
         && 
config.getWriteVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT);
@@ -646,7 +649,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
     hoodieRecord.seal();
     recordsDeleted++;
 
-    final Comparable<?> orderingVal = 
hoodieRecord.getOrderingValue(writeSchema, recordProperties);
+    final Comparable<?> orderingVal = 
hoodieRecord.getOrderingValue(writeSchema, recordProperties, orderingFields);
     long position = baseFileInstantTimeOfPositions.isPresent() ? 
hoodieRecord.getCurrentPosition() : -1L;
     
recordsToDeleteWithPositions.add(Pair.of(DeleteRecord.create(hoodieRecord.getKey(),
 orderingVal), position));
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/CommitTimeFlinkRecordMerger.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/CommitTimeFlinkRecordMerger.java
index 012d632e2a7d..d4150e78ca65 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/CommitTimeFlinkRecordMerger.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/CommitTimeFlinkRecordMerger.java
@@ -33,8 +33,6 @@ import java.io.IOException;
  */
 public class CommitTimeFlinkRecordMerger extends HoodieFlinkRecordMerger {
 
-  public static final CommitTimeFlinkRecordMerger INSTANCE = new 
CommitTimeFlinkRecordMerger();
-
   @Override
   public String getMergingStrategy() {
     return COMMIT_TIME_BASED_MERGE_STRATEGY_UUID;
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/EventTimeFlinkRecordMerger.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/EventTimeFlinkRecordMerger.java
index 0bdab63b3caf..e8c77eceb834 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/EventTimeFlinkRecordMerger.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/EventTimeFlinkRecordMerger.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client.model;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -34,7 +35,7 @@ import java.io.IOException;
  */
 public class EventTimeFlinkRecordMerger extends HoodieFlinkRecordMerger {
 
-  public static final EventTimeFlinkRecordMerger INSTANCE = new 
EventTimeFlinkRecordMerger();
+  private String[] orderingFields;
 
   @Override
   public String getMergingStrategy() {
@@ -52,7 +53,10 @@ public class EventTimeFlinkRecordMerger extends 
HoodieFlinkRecordMerger {
     ValidationUtils.checkArgument(older.getRecordType() == 
HoodieRecord.HoodieRecordType.FLINK);
     ValidationUtils.checkArgument(newer.getRecordType() == 
HoodieRecord.HoodieRecordType.FLINK);
 
-    if (older.getOrderingValue(oldSchema, 
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
+    if (orderingFields == null) {
+      orderingFields = ConfigUtils.getOrderingFields(props);
+    }
+    if (older.getOrderingValue(oldSchema, props, 
orderingFields).compareTo(newer.getOrderingValue(newSchema, props, 
orderingFields)) > 0) {
       return Option.of(Pair.of(older, oldSchema));
     } else {
       return Option.of(Pair.of(newer, newSchema));
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
index f942d4cf8714..40e8872847c2 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.MetadataValues;
-import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.OrderingValues;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -101,8 +100,7 @@ public class HoodieFlinkRecord extends 
HoodieRecord<RowData> {
   }
 
   @Override
-  protected Comparable<?> doGetOrderingValue(Schema recordSchema, Properties 
props) {
-    String[] orderingFields = ConfigUtils.getOrderingFields(props);
+  protected Comparable<?> doGetOrderingValue(Schema recordSchema, Properties 
props, String[] orderingFields) {
     if (orderingFields == null) {
       return OrderingValues.getDefault();
     } else {
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java
index 07082ce40ddc..e9e74e05ce0a 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client.model;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -81,6 +82,8 @@ import java.io.IOException;
  */
 public class PartialUpdateFlinkRecordMerger extends HoodieFlinkRecordMerger {
 
+  private String[] orderingFields;
+
   @Override
   public String getMergingStrategy() {
     return CUSTOM_MERGE_STRATEGY_UUID;
@@ -97,7 +100,10 @@ public class PartialUpdateFlinkRecordMerger extends 
HoodieFlinkRecordMerger {
     ValidationUtils.checkArgument(older.getRecordType() == 
HoodieRecord.HoodieRecordType.FLINK);
     ValidationUtils.checkArgument(newer.getRecordType() == 
HoodieRecord.HoodieRecordType.FLINK);
 
-    if (older.getOrderingValue(oldSchema, 
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
+    if (orderingFields == null) {
+      orderingFields = ConfigUtils.getOrderingFields(props);
+    }
+    if (older.getOrderingValue(oldSchema, props, 
orderingFields).compareTo(newer.getOrderingValue(newSchema, props, 
orderingFields)) > 0) {
       if (older.isDelete(oldSchema, props) || newer.isDelete(newSchema, 
props)) {
         return Option.of(Pair.of(older, oldSchema));
       } else {
@@ -161,7 +167,7 @@ public class PartialUpdateFlinkRecordMerger extends 
HoodieFlinkRecordMerger {
     return new HoodieFlinkRecord(
         highOrderRecord.getKey(),
         highOrderRecord.getOperation(),
-        highOrderRecord.getOrderingValue(highOrderSchema, props),
+        highOrderRecord.getOrderingValue(highOrderSchema, props, 
orderingFields),
         mergedRow);
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
index 7ac55ea81e44..385f57f21e46 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.merge.SparkRecordMergingUtils;
@@ -36,6 +37,8 @@ import java.io.IOException;
  */
 public class DefaultSparkRecordMerger extends HoodieSparkRecordMerger {
 
+  private String[] orderingFields;
+
   @Override
   public String getMergingStrategy() {
     return HoodieRecordMerger.EVENT_TIME_BASED_MERGE_STRATEGY_UUID;
@@ -48,7 +51,10 @@ public class DefaultSparkRecordMerger extends 
HoodieSparkRecordMerger {
       return deleteHandlingResult;
     }
 
-    if (older.getOrderingValue(oldSchema, 
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
+    if (orderingFields == null) {
+      orderingFields = ConfigUtils.getOrderingFields(props);
+    }
+    if (older.getOrderingValue(oldSchema, props, 
orderingFields).compareTo(newer.getOrderingValue(newSchema, props, 
orderingFields)) > 0) {
       return Option.of(Pair.of(older, oldSchema));
     } else {
       return Option.of(Pair.of(newer, newSchema));
@@ -62,7 +68,10 @@ public class DefaultSparkRecordMerger extends 
HoodieSparkRecordMerger {
       return deleteHandlingResult;
     }
 
-    if (older.getOrderingValue(oldSchema, 
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
+    if (orderingFields == null) {
+      orderingFields = ConfigUtils.getOrderingFields(props);
+    }
+    if (older.getOrderingValue(oldSchema, props, 
orderingFields).compareTo(newer.getOrderingValue(newSchema, props, 
orderingFields)) > 0) {
       return Option.of(SparkRecordMergingUtils.mergePartialRecords(
           (HoodieSparkRecord) newer, newSchema, (HoodieSparkRecord) older, 
oldSchema, readerSchema, props));
     } else {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index be4f3a9c9268..04060831a112 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -21,7 +21,6 @@ package org.apache.hudi.common.model;
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.SparkAdapterSupport$;
 import org.apache.hudi.client.model.HoodieInternalRow;
-import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.OrderingValues;
 import org.apache.hudi.common.util.StringUtils;
@@ -345,9 +344,8 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
   }
 
   @Override
-  protected Comparable<?> doGetOrderingValue(Schema recordSchema, Properties 
props) {
+  protected Comparable<?> doGetOrderingValue(Schema recordSchema, Properties 
props, String[] orderingFields) {
     StructType structType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
-    String[] orderingFields = ConfigUtils.getOrderingFields(props);
     if (orderingFields != null) {
       return OrderingValues.create(orderingFields, field -> {
         scala.Option<NestedFieldPath> cachedNestedFieldPath =
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index 1ff5dde705b9..01d6145aca7a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -213,8 +213,7 @@ public class HoodieAvroIndexedRecord extends 
HoodieRecord<IndexedRecord> {
   }
 
   @Override
-  public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties 
props) {
-    String[] orderingFields = ConfigUtils.getOrderingFields(props);
+  public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties 
props, String[] orderingFields) {
     if (orderingFields == null) {
       return OrderingValues.getDefault();
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index 44a3983546c4..1f8f1f2fa624 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -98,7 +98,7 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload> 
extends HoodieRecor
   }
 
   @Override
-  public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties 
props) {
+  public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties 
props, String[] orderingFields) {
     return this.getData().getOrderingValue();
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
index a7d0d4903e3d..1f5145ee47e8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieEmptyRecord.java
@@ -58,7 +58,7 @@ public class HoodieEmptyRecord<T> extends HoodieRecord<T> {
   }
 
   @Override
-  public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties 
props) {
+  public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties 
props, String[] orderingFields) {
     return orderingVal;
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 48b0042b5aaa..78a2cd3d9493 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -218,11 +218,12 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
    *
    * @param recordSchema Avro schema for the record
    * @param props Properties containing the necessary configurations
+   * @param orderingFields name of the ordering fields
    * @return The ordering value for the record
    */
-  public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) 
{
+  public Comparable<?> getOrderingValue(Schema recordSchema, Properties props, 
String[] orderingFields) {
     if (orderingValue == null) {
-      orderingValue = doGetOrderingValue(recordSchema, props);
+      orderingValue = doGetOrderingValue(recordSchema, props, orderingFields);
     }
     return orderingValue;
   }
@@ -232,9 +233,10 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
    *
    * @param recordSchema Avro schema for the record
    * @param props Properties containing the necessary configurations
+   * @param orderingFields name of the ordering fields
    * @return The ordering value for the record
    */
-  protected abstract Comparable<?> doGetOrderingValue(Schema recordSchema, 
Properties props);
+  protected abstract Comparable<?> doGetOrderingValue(Schema recordSchema, 
Properties props, String[] orderingFields);
 
   public T getData() {
     if (data == null) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 8f89dd9bcf09..0a8b8268d0f5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.serialization.DefaultSerializer;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.HoodieRecordUtils;
@@ -90,6 +91,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
   private final long maxMemorySizeInBytes;
   // Stores the total time taken to perform reading and merging of log blocks
   private long totalTimeTakenToReadAndMergeBlocks;
+  private final String[] orderingFields;
 
   @SuppressWarnings("unchecked")
   protected HoodieMergedLogRecordScanner(HoodieStorage storage, String 
basePath, List<String> logFilePaths, Schema readerSchema,
@@ -115,6 +117,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
           new HoodieRecordSizeEstimator(readerSchema), diskMapType, new 
DefaultSerializer<>(), isBitCaskDiskMapCompressionEnabled, 
getClass().getSimpleName());
       this.scannedPrefixes = new HashSet<>();
       this.allowInflightInstants = allowInflightInstants;
+      this.orderingFields = 
ConfigUtils.getOrderingFields(this.hoodieTableMetaClient.getTableConfig().getProps());
     } catch (IOException e) {
       throw new HoodieIOException("IOException when creating 
ExternalSpillableMap at " + spillableMapBasePath, e);
     }
@@ -275,7 +278,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
       // should be deleted or be kept. The old record is kept only if the 
DELETE record has smaller ordering val.
       // For same ordering values, uses the natural order(arrival time 
semantics).
 
-      Comparable curOrderingVal = 
oldRecord.getOrderingValue(this.readerSchema, 
this.hoodieTableMetaClient.getTableConfig().getProps());
+      Comparable curOrderingVal = 
oldRecord.getOrderingValue(this.readerSchema, 
this.hoodieTableMetaClient.getTableConfig().getProps(), orderingFields);
       Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
       // Checks the ordering value does not equal to 0
       // because we use 0 as the default value which means natural order
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
index bf47163db780..5f205431b24f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
@@ -54,7 +54,7 @@ public class BufferedRecord<T> implements Serializable {
     this.isDelete = isDelete;
   }
 
-  public static <T> BufferedRecord<T> forRecordWithContext(HoodieRecord<T> 
record, Schema schema, RecordContext<T> recordContext, Properties props) {
+  public static <T> BufferedRecord<T> forRecordWithContext(HoodieRecord<T> 
record, Schema schema, RecordContext<T> recordContext, Properties props, 
String[] orderingFields) {
     HoodieKey hoodieKey = record.getKey();
     String recordKey = hoodieKey == null ? 
recordContext.getRecordKey(record.getData(), schema) : hoodieKey.getRecordKey();
     Integer schemaId = recordContext.encodeAvroSchema(schema);
@@ -64,7 +64,7 @@ public class BufferedRecord<T> implements Serializable {
     } catch (IOException e) {
       throw new HoodieException("Failed to get isDelete from record.", e);
     }
-    return new BufferedRecord<>(recordKey, record.getOrderingValue(schema, 
props), record.getData(), schemaId, isDelete);
+    return new BufferedRecord<>(recordKey, record.getOrderingValue(schema, 
props, orderingFields), record.getData(), schemaId, isDelete);
   }
 
   public static <T> BufferedRecord<T> forRecordWithContext(T record, Schema 
schema, RecordContext<T> recordContext, List<String> orderingFieldNames, 
boolean isDelete) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
index 2a6f5b02b4bc..46feee0e67e6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java
@@ -65,7 +65,7 @@ public class BufferedRecordMergerFactory {
     if (enablePartialMerging) {
       BufferedRecordMerger<T> deleteRecordMerger = create(
           readerContext, recordMergeMode, false, recordMerger, 
orderingFieldNames, payloadClass, readerSchema, props, partialUpdateMode);
-      return new 
PartialUpdateBufferedRecordMerger<>(readerContext.getRecordContext(), 
recordMerger, deleteRecordMerger, readerSchema, props);
+      return new 
PartialUpdateBufferedRecordMerger<>(readerContext.getRecordContext(), 
recordMerger, deleteRecordMerger, orderingFieldNames, readerSchema, props);
     }
 
     switch (recordMergeMode) {
@@ -84,7 +84,7 @@ public class BufferedRecordMergerFactory {
           return new CustomPayloadRecordMerger<>(
               readerContext.getRecordContext(), recordMerger, 
orderingFieldNames, payloadClass.get(), readerSchema, props);
         } else {
-          return new CustomRecordMerger<>(readerContext.getRecordContext(), 
recordMerger, readerSchema, props);
+          return new CustomRecordMerger<>(readerContext.getRecordContext(), 
recordMerger, orderingFieldNames, readerSchema, props);
         }
     }
   }
@@ -259,11 +259,13 @@ public class BufferedRecordMergerFactory {
     private final BufferedRecordMerger<T> deleteRecordMerger;
     private final Schema readerSchema;
     private final TypedProperties props;
+    private final String[] orderingFields;
 
     public PartialUpdateBufferedRecordMerger(
         RecordContext<T> recordContext,
         Option<HoodieRecordMerger> recordMerger,
         BufferedRecordMerger<T> deleteRecordMerger,
+        List<String> orderingFieldNames,
         Schema readerSchema,
         TypedProperties props) {
       this.recordContext = recordContext;
@@ -271,6 +273,7 @@ public class BufferedRecordMergerFactory {
       this.deleteRecordMerger = deleteRecordMerger;
       this.readerSchema = readerSchema;
       this.props = props;
+      this.orderingFields = orderingFieldNames.toArray(new String[0]);
     }
 
     @Override
@@ -296,7 +299,7 @@ public class BufferedRecordMergerFactory {
 
       // If pre-combine returns existing record, no need to update it
       if (combinedRecord.getData() != existingRecord.getRecord()) {
-        return Option.of(BufferedRecord.forRecordWithContext(combinedRecord, 
combinedRecordAndSchema.getRight(), recordContext, props));
+        return Option.of(BufferedRecord.forRecordWithContext(combinedRecord, 
combinedRecordAndSchema.getRight(), recordContext, props, orderingFields));
       }
       return Option.empty();
     }
@@ -333,12 +336,16 @@ public class BufferedRecordMergerFactory {
    * based on {@code CUSTOM} merge mode.
    */
   private static class CustomRecordMerger<T> extends BaseCustomMerger<T> {
+    private final String[] orderingFields;
+
     public CustomRecordMerger(
         RecordContext<T> recordContext,
         Option<HoodieRecordMerger> recordMerger,
+        List<String> orderingFieldNames,
         Schema readerSchema,
         TypedProperties props) {
       super(recordContext, recordMerger, readerSchema, props);
+      this.orderingFields = orderingFieldNames.toArray(new String[0]);
     }
 
     @Override
@@ -360,7 +367,7 @@ public class BufferedRecordMergerFactory {
 
       // If pre-combine returns existing record, no need to update it
       if (combinedRecord.getData() != existingRecord.getRecord()) {
-        return Option.of(BufferedRecord.forRecordWithContext(combinedRecord, 
combinedRecordAndSchema.getRight(), recordContext, props));
+        return Option.of(BufferedRecord.forRecordWithContext(combinedRecord, 
combinedRecordAndSchema.getRight(), recordContext, props, orderingFields));
       }
       return Option.empty();
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
index 4cc59c2927f1..6d3c3b9eadca 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java
@@ -32,7 +32,6 @@ import org.apache.avro.generic.GenericRecord;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -42,33 +41,13 @@ import java.util.concurrent.ConcurrentHashMap;
  * A utility class for HoodieRecord.
  */
 public class HoodieRecordUtils {
-  private static final Map<String, Object> INSTANCE_CACHE = new HashMap<>();
   private static final Map<String, Constructor<?>> CONSTRUCTOR_CACHE = new 
ConcurrentHashMap<>();
 
-  static {
-    INSTANCE_CACHE.put(HoodieAvroRecordMerger.class.getName(), 
HoodieAvroRecordMerger.INSTANCE);
-  }
-
   /**
    * Instantiate a given class with a record merge.
    */
   public static HoodieRecordMerger loadRecordMerger(String mergerClass) {
-    try {
-      HoodieRecordMerger recordMerger = (HoodieRecordMerger) 
INSTANCE_CACHE.get(mergerClass);
-      if (null == recordMerger) {
-        synchronized (HoodieRecordMerger.class) {
-          recordMerger = (HoodieRecordMerger) INSTANCE_CACHE.get(mergerClass);
-          if (null == recordMerger) {
-            recordMerger = (HoodieRecordMerger) 
ReflectionUtils.loadClass(mergerClass,
-                new Object[] {});
-            INSTANCE_CACHE.put(mergerClass, recordMerger);
-          }
-        }
-      }
-      return recordMerger;
-    } catch (HoodieException e) {
-      throw new HoodieException("Unable to instantiate hoodie merge class ", 
e);
-    }
+    return (HoodieRecordMerger) ReflectionUtils.loadClass(mergerClass, new 
Object[] {});
   }
 
   /**
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 870ad694b652..248d54c82f0d 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -904,7 +904,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
         .map(record -> new HoodieTestDataGenerator.RecordIdentifier(
             record.getRecordKey(),
             removeHiveStylePartition(record.getPartitionPath()),
-            record.getOrderingValue(schema, props).toString(),
+            record.getOrderingValue(schema, props, 
preCombineFields.toArray(new String[0])).toString(),
             readerContext.getRecordContext().getValue(record.getData(), 
schema, RIDER_FIELD_NAME).toString()))
         .collect(Collectors.toList());
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieAvroRecordTestMerger.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieAvroRecordTestMerger.java
index 9bb69b6ceb06..2914ff06c9b9 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieAvroRecordTestMerger.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieAvroRecordTestMerger.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 
@@ -34,6 +35,9 @@ import java.io.IOException;
 import java.util.Properties;
 
 public class HoodieAvroRecordTestMerger extends HoodieAvroRecordMerger {
+
+  private String[] orderingFields;
+
   @Override
   public Option<Pair<HoodieRecord, Schema>> merge(
       HoodieRecord older,
@@ -42,8 +46,12 @@ public class HoodieAvroRecordTestMerger extends 
HoodieAvroRecordMerger {
       Schema newSchema,
       TypedProperties props
   ) throws IOException {
-    Comparable oldOrderingVal = older.getOrderingValue(oldSchema, props);
-    Comparable newOrderingVal = newer.getOrderingValue(newSchema, props);
+    if (orderingFields == null) {
+      orderingFields = ConfigUtils.getOrderingFields(props);
+    }
+
+    Comparable oldOrderingVal = older.getOrderingValue(oldSchema, props, 
orderingFields);
+    Comparable newOrderingVal = newer.getOrderingValue(newSchema, props, 
orderingFields);
 
     // The record with higher ordering value is returned.
     if (oldOrderingVal == null || newOrderingVal.compareTo(oldOrderingVal) > 
0) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
index 594cbc1b8ca1..6cc8c0750dad 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
@@ -92,11 +92,7 @@ public class HoodieFileSliceTestUtils {
   public static final HoodieTestDataGenerator DATA_GEN =
       new HoodieTestDataGenerator(0xDEED);
   public static final TypedProperties PROPERTIES = new TypedProperties();
-
-  static {
-    PROPERTIES.setProperty(
-        "hoodie.datasource.write.precombine.field", "timestamp");
-  }
+  private static String[] orderingFields = new String[] {"timestamp"};
 
   // We use a number to represent a record key, and a (start, end) range
   // to represent a set of record keys between start <= k <= end.
@@ -235,7 +231,7 @@ public class HoodieFileSliceTestUtils {
     return new HoodieDeleteBlock(
         hoodieRecords.stream().map(
             r -> Pair.of(DeleteRecord.create(
-                r.getKey(), r.getOrderingValue(schema, props)), 
r.getCurrentLocation().getPosition()))
+                r.getKey(), r.getOrderingValue(schema, props, 
orderingFields)), r.getCurrentLocation().getPosition()))
             .collect(Collectors.toList()),
         header);
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
index 9f693b01a536..466486158076 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHoodieRecordUtils.java
@@ -37,7 +37,7 @@ class TestHoodieRecordUtils {
     HoodieRecordMerger recordMerger1 = 
HoodieRecordUtils.loadRecordMerger(mergeClassName);
     HoodieRecordMerger recordMerger2 = 
HoodieRecordUtils.loadRecordMerger(mergeClassName);
     assertEquals(recordMerger1.getClass().getName(), mergeClassName);
-    assertEquals(recordMerger1, recordMerger2);
+    assertEquals(recordMerger2.getClass().getName(), mergeClassName);
   }
 
   @Test
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index a17286e9dfdc..5d9dca452fa9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -115,9 +115,9 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
   public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode, 
String mergeStrategyId, String mergeImplClasses) {
     switch (mergeMode) {
       case EVENT_TIME_ORDERING:
-        return Option.of(EventTimeFlinkRecordMerger.INSTANCE);
+        return Option.of(new EventTimeFlinkRecordMerger());
       case COMMIT_TIME_ORDERING:
-        return Option.of(CommitTimeFlinkRecordMerger.INSTANCE);
+        return Option.of(new CommitTimeFlinkRecordMerger());
       default:
         Option<HoodieRecordMerger> recordMerger = 
HoodieRecordUtils.createValidRecordMerger(EngineType.FLINK, mergeImplClasses, 
mergeStrategyId);
         if (recordMerger.isEmpty()) {
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomRecordMerger.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomRecordMerger.java
index 47d75a1f99e8..a1a73a3d20a9 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomRecordMerger.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/read/TestCustomRecordMerger.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import 
org.apache.hudi.common.testutils.reader.HoodieFileGroupReaderTestHarness;
 import org.apache.hudi.common.testutils.reader.HoodieFileSliceTestUtils;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
@@ -239,6 +240,7 @@ public class TestCustomRecordMerger extends 
HoodieFileGroupReaderTestHarness {
     public static final String KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY =
         "KEEP_CERTAIN_TIMESTAMP_VALUE_ONLY";
     public static final String TIMESTAMP = "timestamp";
+    private String[] orderingFields;
 
     @Override
     public Option<Pair<HoodieRecord, Schema>> merge(
@@ -248,8 +250,11 @@ public class TestCustomRecordMerger extends 
HoodieFileGroupReaderTestHarness {
         Schema newSchema,
         TypedProperties props
     ) throws IOException {
-      if (newer.getOrderingValue(newSchema, props).compareTo(
-          older.getOrderingValue(oldSchema, props)) >= 0) {
+      if (orderingFields == null) {
+        this.orderingFields = ConfigUtils.getOrderingFields(props);
+      }
+      if (newer.getOrderingValue(newSchema, props, orderingFields).compareTo(
+          older.getOrderingValue(oldSchema, props, orderingFields)) >= 0) {
         if (newer.isDelete(newSchema, props)) {
           return Option.empty();
         }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/DefaultHiveRecordMerger.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/DefaultHiveRecordMerger.java
index dc03c755ee79..be3c28c91fc6 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/DefaultHiveRecordMerger.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/DefaultHiveRecordMerger.java
@@ -22,6 +22,7 @@ package org.apache.hudi.hadoop;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -34,6 +35,9 @@ import java.io.IOException;
  * Record merger for hive that implements the default merger strategy
  */
 public class DefaultHiveRecordMerger extends HoodieHiveRecordMerger {
+
+  private String[] orderingFields;
+
   @Override
   public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
     ValidationUtils.checkArgument(older.getRecordType() == 
HoodieRecord.HoodieRecordType.HIVE);
@@ -55,7 +59,10 @@ public class DefaultHiveRecordMerger extends 
HoodieHiveRecordMerger {
     } else if (older.getData() == null) {
       return Option.empty();
     }
-    if (older.getOrderingValue(oldSchema, 
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
+    if (orderingFields == null) {
+      orderingFields = ConfigUtils.getOrderingFields(props);
+    }
+    if (older.getOrderingValue(oldSchema, props, 
orderingFields).compareTo(newer.getOrderingValue(newSchema, props, 
orderingFields)) > 0) {
       return Option.of(Pair.of(older, oldSchema));
     } else {
       return Option.of(Pair.of(newer, newSchema));
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
index fd2b8572ce91..caf4b331f019 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
@@ -24,7 +24,6 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.MetadataValues;
-import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.OrderingValues;
 import org.apache.hudi.common.util.collection.Pair;
@@ -107,8 +106,7 @@ public class HoodieHiveRecord extends 
HoodieRecord<ArrayWritable> {
   }
 
   @Override
-  public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties 
props) {
-    String[] orderingFields = ConfigUtils.getOrderingFields(props);
+  public Comparable<?> doGetOrderingValue(Schema recordSchema, Properties 
props, String[] orderingFields) {
     if (orderingFields == null) {
       return OrderingValues.getDefault();
     } else {


Reply via email to