This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c94d1b2c4b5 [HUDI-6006] Align hoodie.payload.ordering.field with
preCombine field (#8326)
c94d1b2c4b5 is described below
commit c94d1b2c4b5246954d9d6aa51c9e220b162e10cb
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Apr 25 11:44:51 2023 +0530
[HUDI-6006] Align hoodie.payload.ordering.field with preCombine field
(#8326)
Aligns `hoodie.payload.ordering.field` config with preCombine field so if
`hoodie.payload.ordering.field` is not set, then we defer to
`hoodie.datasource.write.precombine.field` and `hoodie.table.precombine.field`.
---------
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../apache/hudi/config/HoodiePayloadConfig.java | 16 +++---
.../common/model/DefaultHoodieRecordPayload.java | 3 +-
.../common/model/PartialUpdateAvroPayload.java | 3 +-
.../model/TestDefaultHoodieRecordPayload.java | 14 ++++--
.../hudi/common/testutils/PreCombineTestUtils.java | 57 ++++++++++++++++++++++
.../org/apache/hudi/TestDataSourceDefaults.scala | 12 +++--
6 files changed, 88 insertions(+), 17 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
index a3e2e5aa7db..3929dcba047 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePayloadConfig.java
@@ -41,13 +41,6 @@ import static
org.apache.hudi.common.model.HoodiePayloadProps.PAYLOAD_ORDERING_F
+ "control merges based on specific business fields in the data.")
public class HoodiePayloadConfig extends HoodieConfig {
- public static final ConfigProperty<String> ORDERING_FIELD = ConfigProperty
- .key(PAYLOAD_ORDERING_FIELD_PROP_KEY)
- .defaultValue("ts")
- .markAdvanced()
- .withDocumentation("Table column/field name to order records that have
the same key, before "
- + "merging and writing to storage.");
-
public static final ConfigProperty<String> EVENT_TIME_FIELD = ConfigProperty
.key(PAYLOAD_EVENT_TIME_FIELD_PROP_KEY)
.defaultValue("ts")
@@ -63,6 +56,15 @@ public class HoodiePayloadConfig extends HoodieConfig {
+ "the record payload class to merge records in the log against each
other, merge again with the base file and "
+ "produce the final record to be written after compaction.");
+ /** @deprecated Use {@link HoodieWriteConfig#PRECOMBINE_FIELD_NAME} and its
methods instead */
+ @Deprecated
+ public static final ConfigProperty<String> ORDERING_FIELD = ConfigProperty
+ .key(PAYLOAD_ORDERING_FIELD_PROP_KEY)
+ .defaultValue("ts")
+ .markAdvanced()
+ .withDocumentation("Table column/field name to order records that have
the same key, before "
+ + "merging and writing to storage.");
+
/** @deprecated Use {@link #PAYLOAD_CLASS_NAME} and its methods instead */
@Deprecated
public static final String DEFAULT_PAYLOAD_CLASS =
PAYLOAD_CLASS_NAME.defaultValue();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
index 76cdea48ef5..eae2f58af94 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.model;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
@@ -149,7 +150,7 @@ public class DefaultHoodieRecordPayload extends
OverwriteWithLatestAvroPayload {
* NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation
type do not hit this code path
* and need to be dealt with separately.
*/
- String orderField =
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY);
+ String orderField = ConfigUtils.getOrderingField(properties);
if (orderField == null) {
return true;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
index 7c09a38d56f..1c13a11ee59 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
@@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
@@ -229,7 +230,7 @@ public class PartialUpdateAvroPayload extends
OverwriteNonDefaultsWithLatestAvro
* @return true if the given record is newer
*/
private static boolean isRecordNewer(Comparable orderingVal, IndexedRecord
record, Properties prop) {
- String orderingField =
prop.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY);
+ String orderingField = ConfigUtils.getOrderingField(prop);
if (!StringUtils.isNullOrEmpty(orderingField)) {
boolean consistentLogicalTimestampEnabled =
Boolean.parseBoolean(prop.getProperty(
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
index 6bc0783e652..1cb146ec97e 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.model;
+import org.apache.hudi.common.testutils.PreCombineTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.avro.Schema;
@@ -27,6 +28,7 @@ import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
@@ -61,8 +63,10 @@ public class TestDefaultHoodieRecordPayload {
props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY,
"ts");
}
- @Test
- public void testActiveRecords() throws IOException {
+ @ParameterizedTest
+
@MethodSource("org.apache.hudi.common.testutils.PreCombineTestUtils#configurePreCombine")
+ public void testActiveRecords(String key) throws IOException {
+ PreCombineTestUtils.setPreCombineConfig(props, key, "ts");
GenericRecord record1 = new GenericData.Record(schema);
record1.put("id", "1");
record1.put("partition", "partition0");
@@ -87,8 +91,10 @@ public class TestDefaultHoodieRecordPayload {
assertEquals(payload2.combineAndGetUpdateValue(record1, schema,
props).get(), record2);
}
- @Test
- public void testDeletedRecord() throws IOException {
+ @ParameterizedTest
+
@MethodSource("org.apache.hudi.common.testutils.PreCombineTestUtils#configurePreCombine")
+ public void testDeletedRecord(String key) throws IOException {
+ PreCombineTestUtils.setPreCombineConfig(props, key, "ts");
GenericRecord record1 = new GenericData.Record(schema);
record1.put("id", "1");
record1.put("partition", "partition0");
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/PreCombineTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/PreCombineTestUtils.java
new file mode 100644
index 00000000000..84488f6ac99
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/PreCombineTestUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.hudi.common.model.HoodiePayloadProps;
+import org.apache.hudi.common.table.HoodieTableConfig;
+
+import org.junit.jupiter.params.provider.Arguments;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+public class PreCombineTestUtils {
+ private static String[] preCombineConfigs = new String[] {
+ HoodieTableConfig.PRECOMBINE_FIELD.key(),
+ "hoodie.datasource.write.precombine.field",
+ HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY
+ };
+
+ public static Stream<Arguments> configurePreCombine() {
+ return Stream.of(
+
Arrays.stream(preCombineConfigs).map(Arguments::of).toArray(Arguments[]::new)
+ );
+ }
+
+ /**
+ * Sets specified key to the value provided. The other preCombine related
configs are
+ * removed from properties.
+ */
+ public static void setPreCombineConfig(Properties props, String key, String
value) {
+ for (String config : preCombineConfigs) {
+ if (key.equals(config)) {
+ props.setProperty(key, value);
+ } else {
+ props.remove(key);
+ }
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
index 1bb81f7f92e..c9b58fdff01 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
@@ -22,7 +22,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model._
-import org.apache.hudi.common.testutils.SchemaTestUtil
+import org.apache.hudi.common.testutils.{SchemaTestUtil, PreCombineTestUtils}
import org.apache.hudi.common.util.Option
import
org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH
import org.apache.hudi.config.HoodiePayloadConfig
@@ -35,6 +35,8 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.MethodSource
/**
* Tests on the default key generator, payload classes.
@@ -552,11 +554,13 @@ class TestDataSourceDefaults extends
ScalaAssertionSupport {
assertEquals("field2", combinedGR21.get("field1").toString)
}
- @Test def testOverwriteWithLatestAvroPayloadCombineAndGetUpdateValue(): Unit
= {
+ @ParameterizedTest
+
@MethodSource(Array("org.apache.hudi.common.testutils.PreCombineTestUtils#configurePreCombine"))
+ def testOverwriteWithLatestAvroPayloadCombineAndGetUpdateValue(key: String):
Unit = {
+ val props = new TypedProperties()
+ PreCombineTestUtils.setPreCombineConfig(props, key, "favoriteIntNumber")
val baseOrderingVal: Object = baseRecord.get("favoriteIntNumber")
val fieldSchema: Schema =
baseRecord.getSchema().getField("favoriteIntNumber").schema()
- val props = new TypedProperties()
- props.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY,
"favoriteIntNumber");
val basePayload = new OverwriteWithLatestAvroPayload(baseRecord,
HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, baseOrderingVal,
false).asInstanceOf[Comparable[_]])