This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1876998d22c [HUDI-8401] Remove payload class FirstValueAvroPayload
(#13420)
1876998d22c is described below
commit 1876998d22c323222099a0be9105410a51af4ffc
Author: Lin Liu <[email protected]>
AuthorDate: Thu Jun 12 17:25:18 2025 -0700
[HUDI-8401] Remove payload class FirstValueAvroPayload (#13420)
This reverts commit f98a40bd369ac4085a5da0e4864e39fe10a607a9.
---
.../hudi/common/model/FirstValueAvroPayload.java | 125 ---------------------
.../common/model/TestFirstValueAvroPayload.java | 81 -------------
.../spark/sql/hudi/dml/TestInsertTable.scala | 58 ----------
.../spark/sql/hudi/dml/TestMergeIntoTable2.scala | 80 -------------
4 files changed, 344 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/FirstValueAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/FirstValueAvroPayload.java
deleted file mode 100644
index d3f9c430c39..00000000000
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/FirstValueAvroPayload.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.common.model;
-
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.util.ConfigUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-
-import java.util.Properties;
-
-/**
- * Payload clazz that is used for Hudi Table.
- *
- * <p>Simplified FirstValueAvroPayload Logic:
- * <pre>
- *
- * Illustration with simple data.
- * the order field is 'ts', recordkey is 'id' and schema is :
- * {
- * [
- * {"name":"id","type":"string"},
- * {"name":"ts","type":"long"},
- * {"name":"name","type":"string"},
- * {"name":"price","type":"string"}
- * ]
- * }
- *
- * case 1
- * Current data:
- * id ts name price
- * 1 1 name_1 price_1
- * Insert data:
- * id ts name price
- * 1 1 name_2 price_2
- *
- * Result data after #preCombine or #combineAndGetUpdateValue:
- * id ts name price
- * 1 1 name_1 price_1
- *
- * If precombine is the same, would keep the first one record
- *
- * case 2
- * Current data:
- * id ts name price
- * 1 1 name_1 price_1
- * Insert data:
- * id ts name price
- * 1 2 name_2 price_2
- *
- * Result data after preCombine or combineAndGetUpdateValue:
- * id ts name price
- * 1 2 name_2 price_2
- *
- * The other functionalities are inherited from DefaultHoodieRecordPayload.
- * </pre>
- */
-public class FirstValueAvroPayload extends DefaultHoodieRecordPayload {
-
- public FirstValueAvroPayload(GenericRecord record, Comparable orderingVal) {
- super(record, orderingVal);
- }
-
- public FirstValueAvroPayload(Option<GenericRecord> record) {
- super(record);
- }
-
- @Override
- public OverwriteWithLatestAvroPayload
preCombine(OverwriteWithLatestAvroPayload oldValue) {
- if (oldValue.recordBytes.length == 0) {
- // use natural order for delete record
- return this;
- }
- if (oldValue.orderingVal.compareTo(orderingVal) >= 0) {
- // pick the payload with greatest ordering value
- return oldValue;
- } else {
- return this;
- }
- }
-
- @Override
- protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
- IndexedRecord incomingRecord,
Properties properties) {
- /*
- * Combining strategy here returns currentValue on disk if incoming record
is older absolutely.
- * The incoming record can be either a delete (sent as an upsert with
_hoodie_is_deleted set to true)
- * or an insert/update record. In any case, if it is older absolutely than
the record in disk, the currentValue
- * in disk is returned (to be rewritten with new commit time).
- */
- String orderField = ConfigUtils.getOrderingField(properties);
- if (orderField == null) {
- return true;
- }
- boolean consistentLogicalTimestampEnabled =
Boolean.parseBoolean(properties.getProperty(
-
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
- Object persistedOrderingVal =
HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
- orderField,
- true, consistentLogicalTimestampEnabled);
- Comparable incomingOrderingVal = (Comparable)
HoodieAvroUtils.getNestedFieldVal((GenericRecord) incomingRecord,
- orderField,
- true, consistentLogicalTimestampEnabled);
- return persistedOrderingVal == null || ((Comparable)
persistedOrderingVal).compareTo(incomingOrderingVal) < 0;
- }
-}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestFirstValueAvroPayload.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestFirstValueAvroPayload.java
deleted file mode 100644
index de1bcc65d65..00000000000
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestFirstValueAvroPayload.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.common.model;
-
-import org.apache.hudi.common.testutils.PreCombineTestUtils;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Properties;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class TestFirstValueAvroPayload {
-
- private Schema schema;
- private Properties props;
-
- @BeforeEach
- public void setUp() throws Exception {
- schema = Schema.createRecord(Arrays.asList(
- new Schema.Field("id", Schema.create(Schema.Type.STRING), "",
null),
- new Schema.Field("partition", Schema.create(Schema.Type.STRING),
"", null),
- new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null),
- new Schema.Field("_hoodie_is_deleted",
Schema.create(Schema.Type.BOOLEAN), "", false)
- ));
- props = new Properties();
- props.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY,
"ts");
- props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY,
"ts");
- }
-
- @ParameterizedTest
-
@MethodSource("org.apache.hudi.common.testutils.PreCombineTestUtils#configurePreCombine")
- public void testActiveRecordsForFirstValueAvroPayload(String key) throws
IOException {
- PreCombineTestUtils.setPreCombineConfig(props, key, "ts");
- GenericRecord record1 = new GenericData.Record(schema);
- record1.put("id", "0");
- record1.put("partition", "partition0");
- record1.put("ts", 0L);
- record1.put("_hoodie_is_deleted", false);
-
- GenericRecord record2 = new GenericData.Record(schema);
- record2.put("id", "0");
- record2.put("partition", "partition0");
- record2.put("ts", 0L);
- record2.put("_hoodie_is_deleted", false);
-
- DefaultHoodieRecordPayload payload1 = new FirstValueAvroPayload(record1,
1);
- DefaultHoodieRecordPayload payload2 = new FirstValueAvroPayload(record2,
1);
- assertEquals(payload1.preCombine(payload2, props), payload2);
- assertEquals(payload2.preCombine(payload1, props), payload1);
-
- assertEquals(record1, payload1.getInsertValue(schema, props).get());
- assertEquals(record2, payload2.getInsertValue(schema, props).get());
-
- assertEquals(payload1.combineAndGetUpdateValue(record2, schema,
props).get(), record2);
- assertEquals(payload2.combineAndGetUpdateValue(record1, schema,
props).get(), record1);
- }
-}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index 43c080464ce..bb7b4533418 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -231,64 +231,6 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
}
- test("Test FirstValueAvroPayload test") {
- withTempDir { tmp =>
- val targetTable = generateTableName
- val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
-
- spark.sql(
- s"""
- |create table ${targetTable} (
- | `id` string,
- | `name` string,
- | `dt` bigint,
- | `day` STRING,
- | `hour` INT
- |) using hudi
- |tblproperties (
- | 'primaryKey' = 'id',
- | 'type' = 'mor',
- | 'preCombineField'='dt',
- | 'hoodie.index.type' = 'BUCKET',
- | 'hoodie.bucket.index.hash.field' = 'id',
- | 'hoodie.bucket.index.num.buckets'=12,
- |
'hoodie.datasource.write.payload.class'='org.apache.hudi.common.model.FirstValueAvroPayload'
- | )
- partitioned by (`day`,`hour`)
- location '${tablePath}'
- """.stripMargin)
-
- spark.sql("set hoodie.file.group.reader.enabled=false")
-
- spark.sql(
- s"""
- |insert into ${targetTable}
- |select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as `day`,
10 as `hour`
- |""".stripMargin)
-
- spark.sql(
- s"""
- |insert into ${targetTable}
- |select '1' as id, 'bb' as name, 123 as dt, '2024-02-19' as `day`,
10 as `hour`
- |""".stripMargin)
-
- checkAnswer(s"select id, name, dt, day, hour from $targetTable limit
10")(
- Seq("1", "aa", 123, "2024-02-19", 10)
- )
-
- spark.sql(
- s"""
- |insert into ${targetTable}
- |select '1' as id, 'cc' as name, 124 as dt, '2024-02-19' as `day`,
10 as `hour`
- |""".stripMargin)
-
- checkAnswer(s"select id, name, dt, day, hour from $targetTable limit
10")(
- Seq("1", "cc", 124, "2024-02-19", 10)
- )
-
- }
- }
-
test("Test Insert Into with values") {
withRecordType()(withTempDir { tmp =>
val tableName = generateTableName
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
index ea19f9812b7..22244c8f4d5 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
@@ -1289,84 +1289,4 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase
{
}
}
}
-
- test("Test MergeInto with CUSTOM merge mode using FirstValueAvroPayload") {
- withRecordType()(withTempDir { tmp =>
- Seq("cow", "mor").foreach { tableType =>
- val targetTable = generateTableName
- spark.sql(
- s"""
- |create table $targetTable (
- | id INT,
- | name STRING,
- | price INT,
- | ts BIGINT
- |) using hudi
- |TBLPROPERTIES (
- | type = '$tableType',
- | primaryKey = 'id',
- | preCombineField = 'ts',
- | recordMergeMode = '${RecordMergeMode.CUSTOM.name()}',
- |
'hoodie.datasource.write.payload.class'='org.apache.hudi.common.model.FirstValueAvroPayload',
- | hoodie.datasource.write.recordkey.field = 'id'
- | )
- |LOCATION '${tmp.getCanonicalPath}/$targetTable'
- |""".stripMargin)
-
- spark.sql(
- s"""
- |INSERT INTO $targetTable
- |SELECT id, name, price, ts
- |FROM (
- | SELECT 1 as id, 'John Doe Initial' as name, 19 as price,
1598886001 as ts
- | UNION ALL
- | SELECT 2, 'Jane Doe', 24, 1598972400
- | UNION ALL
- | SELECT 3, 'Bob Smith', 14, 1599058800
- |)
- |""".stripMargin)
-
- spark.sql(
- s"""
- |MERGE INTO $targetTable t
- |USING (
- | SELECT
- | CAST(1 AS INT) as id,
- | CAST('John Doe Updated' AS STRING) as name,
- | CAST(19 AS INT) as price,
- | CAST(1598886001 AS BIGINT) as ts
- | UNION ALL
- | SELECT
- | CAST(2 AS INT),
- | CAST('Jane Doe Updated' AS STRING),
- | CAST(24 AS INT),
- | CAST(1598972401 AS BIGINT)
- | UNION ALL
- | SELECT
- | CAST(4 AS INT),
- | CAST('Alice Johnson' AS STRING),
- | CAST(49 AS INT),
- | CAST(2 AS BIGINT)
- |) s
- |ON t.price = s.price
- |WHEN MATCHED THEN UPDATE SET
- | t.id = s.id,
- | t.name = s.name,
- | t.price = s.price,
- | t.ts = s.ts
- |WHEN NOT MATCHED THEN INSERT
- | (id, name, price, ts)
- |VALUES
- | (s.id, s.name, s.price, s.ts)
- |""".stripMargin)
-
- checkAnswer(s"select id, name, price, ts from $targetTable ORDER BY
id")(
- Seq(1, "John Doe Initial", 19, 1598886001L),
- Seq(2, "Jane Doe Updated", 24, 1598972401L),
- Seq(3, "Bob Smith", 14, 1599058800L),
- Seq(4, "Alice Johnson", 49, 2L))
- }
- }
- )
- }
}