This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c94d1b2c4b5 [HUDI-6006] Align hoodie.payload.ordering.field with 
preCombine field (#8326)
c94d1b2c4b5 is described below

commit c94d1b2c4b5246954d9d6aa51c9e220b162e10cb
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Apr 25 11:44:51 2023 +0530

    [HUDI-6006] Align hoodie.payload.ordering.field with preCombine field 
(#8326)
    
    Aligns `hoodie.payload.ordering.field` config with preCombine field so if 
`hoodie.payload.ordering.field` is not set, then we defer to 
`hoodie.datasource.write.precombine.field` and `hoodie.table.precombine.field`.
    
    ---------
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../apache/hudi/config/HoodiePayloadConfig.java    | 16 +++---
 .../common/model/DefaultHoodieRecordPayload.java   |  3 +-
 .../common/model/PartialUpdateAvroPayload.java     |  3 +-
 .../model/TestDefaultHoodieRecordPayload.java      | 14 ++++--
 .../hudi/common/testutils/PreCombineTestUtils.java | 57 ++++++++++++++++++++++
 .../org/apache/hudi/TestDataSourceDefaults.scala   | 12 +++--
 6 files changed, 88 insertions(+), 17 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
index a3e2e5aa7db..3929dcba047 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
@@ -41,13 +41,6 @@ import static 
org.apache.hudi.common.model.HoodiePayloadProps.PAYLOAD_ORDERING_F
         + "control merges based on specific business fields in the data.")
 public class HoodiePayloadConfig extends HoodieConfig {
 
-  public static final ConfigProperty<String> ORDERING_FIELD = ConfigProperty
-      .key(PAYLOAD_ORDERING_FIELD_PROP_KEY)
-      .defaultValue("ts")
-      .markAdvanced()
-      .withDocumentation("Table column/field name to order records that have 
the same key, before "
-          + "merging and writing to storage.");
-
   public static final ConfigProperty<String> EVENT_TIME_FIELD = ConfigProperty
       .key(PAYLOAD_EVENT_TIME_FIELD_PROP_KEY)
       .defaultValue("ts")
@@ -63,6 +56,15 @@ public class HoodiePayloadConfig extends HoodieConfig {
         + "the record payload class to merge records in the log against each 
other, merge again with the base file and "
         + "produce the final record to be written after compaction.");
 
+  /** @deprecated Use {@link HoodieWriteConfig#PRECOMBINE_FIELD_NAME} and its 
methods instead */
+  @Deprecated
+  public static final ConfigProperty<String> ORDERING_FIELD = ConfigProperty
+      .key(PAYLOAD_ORDERING_FIELD_PROP_KEY)
+      .defaultValue("ts")
+      .markAdvanced()
+      .withDocumentation("Table column/field name to order records that have 
the same key, before "
+          + "merging and writing to storage.");
+
   /** @deprecated Use {@link #PAYLOAD_CLASS_NAME} and its methods instead */
   @Deprecated
   public static final String DEFAULT_PAYLOAD_CLASS = 
PAYLOAD_CLASS_NAME.defaultValue();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
index 76cdea48ef5..eae2f58af94 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.model;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -149,7 +150,7 @@ public class DefaultHoodieRecordPayload extends 
OverwriteWithLatestAvroPayload {
      * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation 
type do not hit this code path
      * and need to be dealt with separately.
      */
-    String orderField = 
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY);
+    String orderField = ConfigUtils.getOrderingField(properties);
     if (orderField == null) {
       return true;
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
index 7c09a38d56f..1c13a11ee59 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
@@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.generic.IndexedRecord;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
@@ -229,7 +230,7 @@ public class PartialUpdateAvroPayload extends 
OverwriteNonDefaultsWithLatestAvro
    * @return true if the given record is newer
    */
   private static boolean isRecordNewer(Comparable orderingVal, IndexedRecord 
record, Properties prop) {
-    String orderingField = 
prop.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY);
+    String orderingField = ConfigUtils.getOrderingField(prop);
     if (!StringUtils.isNullOrEmpty(orderingField)) {
       boolean consistentLogicalTimestampEnabled = 
Boolean.parseBoolean(prop.getProperty(
           
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
index 6bc0783e652..1cb146ec97e 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.model;
 
+import org.apache.hudi.common.testutils.PreCombineTestUtils;
 import org.apache.hudi.common.util.Option;
 
 import org.apache.avro.Schema;
@@ -27,6 +28,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
@@ -61,8 +63,10 @@ public class TestDefaultHoodieRecordPayload {
     props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, 
"ts");
   }
 
-  @Test
-  public void testActiveRecords() throws IOException {
+  @ParameterizedTest
+  
@MethodSource("org.apache.hudi.common.testutils.PreCombineTestUtils#configurePreCombine")
+  public void testActiveRecords(String key) throws IOException {
+    PreCombineTestUtils.setPreCombineConfig(props, key, "ts");
     GenericRecord record1 = new GenericData.Record(schema);
     record1.put("id", "1");
     record1.put("partition", "partition0");
@@ -87,8 +91,10 @@ public class TestDefaultHoodieRecordPayload {
     assertEquals(payload2.combineAndGetUpdateValue(record1, schema, 
props).get(), record2);
   }
 
-  @Test
-  public void testDeletedRecord() throws IOException {
+  @ParameterizedTest
+  
@MethodSource("org.apache.hudi.common.testutils.PreCombineTestUtils#configurePreCombine")
+  public void testDeletedRecord(String key) throws IOException {
+    PreCombineTestUtils.setPreCombineConfig(props, key, "ts");
     GenericRecord record1 = new GenericData.Record(schema);
     record1.put("id", "1");
     record1.put("partition", "partition0");
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/PreCombineTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/PreCombineTestUtils.java
new file mode 100644
index 00000000000..84488f6ac99
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/PreCombineTestUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.hudi.common.model.HoodiePayloadProps;
+import org.apache.hudi.common.table.HoodieTableConfig;
+
+import org.junit.jupiter.params.provider.Arguments;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+public class PreCombineTestUtils {
+  private static String[] preCombineConfigs = new String[] {
+      HoodieTableConfig.PRECOMBINE_FIELD.key(),
+      "hoodie.datasource.write.precombine.field",
+      HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY
+  };
+
+  public static Stream<Arguments> configurePreCombine() {
+    return Stream.of(
+        
Arrays.stream(preCombineConfigs).map(Arguments::of).toArray(Arguments[]::new)
+    );
+  }
+
+  /**
+   * Sets specified key to the value provided. The other preCombine related 
configs are
+   * removed from properties.
+   */
+  public static void setPreCombineConfig(Properties props, String key, String 
value) {
+    for (String config : preCombineConfigs) {
+      if (key.equals(config)) {
+        props.setProperty(key, value);
+      } else {
+        props.remove(key);
+      }
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
index 1bb81f7f92e..c9b58fdff01 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
@@ -22,7 +22,7 @@ import org.apache.avro.generic.GenericRecord
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.model._
-import org.apache.hudi.common.testutils.SchemaTestUtil
+import org.apache.hudi.common.testutils.{SchemaTestUtil, PreCombineTestUtils}
 import org.apache.hudi.common.util.Option
 import 
org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH
 import org.apache.hudi.config.HoodiePayloadConfig
@@ -35,6 +35,8 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.types.UTF8String
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.MethodSource
 
 /**
  * Tests on the default key generator, payload classes.
@@ -552,11 +554,13 @@ class TestDataSourceDefaults extends 
ScalaAssertionSupport {
     assertEquals("field2", combinedGR21.get("field1").toString)
   }
 
-  @Test def testOverwriteWithLatestAvroPayloadCombineAndGetUpdateValue(): Unit 
= {
+  @ParameterizedTest
+  
@MethodSource(Array("org.apache.hudi.common.testutils.PreCombineTestUtils#configurePreCombine"))
+  def testOverwriteWithLatestAvroPayloadCombineAndGetUpdateValue(key: String): 
Unit = {
+    val props = new TypedProperties()
+    PreCombineTestUtils.setPreCombineConfig(props, key, "favoriteIntNumber")
     val baseOrderingVal: Object = baseRecord.get("favoriteIntNumber")
     val fieldSchema: Schema = 
baseRecord.getSchema().getField("favoriteIntNumber").schema()
-    val props = new TypedProperties()
-    props.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, 
"favoriteIntNumber");
 
     val basePayload = new OverwriteWithLatestAvroPayload(baseRecord, 
HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, baseOrderingVal, 
false).asInstanceOf[Comparable[_]])
 

Reply via email to