This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1876998d22c [HUDI-8401] Remove payload class FirstValueAvroPayload 
(#13420)
1876998d22c is described below

commit 1876998d22c323222099a0be9105410a51af4ffc
Author: Lin Liu <[email protected]>
AuthorDate: Thu Jun 12 17:25:18 2025 -0700

    [HUDI-8401] Remove payload class FirstValueAvroPayload (#13420)
    
    This reverts commit f98a40bd369ac4085a5da0e4864e39fe10a607a9.
---
 .../hudi/common/model/FirstValueAvroPayload.java   | 125 ---------------------
 .../common/model/TestFirstValueAvroPayload.java    |  81 -------------
 .../spark/sql/hudi/dml/TestInsertTable.scala       |  58 ----------
 .../spark/sql/hudi/dml/TestMergeIntoTable2.scala   |  80 -------------
 4 files changed, 344 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/FirstValueAvroPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/FirstValueAvroPayload.java
deleted file mode 100644
index d3f9c430c39..00000000000
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/FirstValueAvroPayload.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.common.model;
-
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.util.ConfigUtils;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
-
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-
-import java.util.Properties;
-
-/**
- * Payload clazz that is used for Hudi Table.
- *
- * <p>Simplified FirstValueAvroPayload Logic:
- * <pre>
- *
- *  Illustration with simple data.
- *  the order field is 'ts', recordkey is 'id' and schema is :
- *  {
- *    [
- *      {"name":"id","type":"string"},
- *      {"name":"ts","type":"long"},
- *      {"name":"name","type":"string"},
- *      {"name":"price","type":"string"}
- *    ]
- *  }
- *
- *  case 1
- *  Current data:
- *      id      ts      name    price
- *      1       1       name_1  price_1
- *  Insert data:
- *      id      ts      name    price
- *      1       1       name_2  price_2
- *
- *  Result data after #preCombine or #combineAndGetUpdateValue:
- *      id      ts      name    price
- *      1       1       name_1  price_1
- *
- *  If precombine is the same, would keep the first one record
- *
- *  case 2
- *  Current data:
- *      id      ts      name    price
- *      1       1       name_1  price_1
- *  Insert data:
- *      id      ts      name    price
- *      1       2       name_2  price_2
- *
- *  Result data after preCombine or combineAndGetUpdateValue:
- *      id      ts      name    price
- *      1       2       name_2  price_2
- *
- *  The other functionalities are inherited from DefaultHoodieRecordPayload.
- * </pre>
- */
-public class FirstValueAvroPayload extends DefaultHoodieRecordPayload {
-
-  public FirstValueAvroPayload(GenericRecord record, Comparable orderingVal) {
-    super(record, orderingVal);
-  }
-
-  public FirstValueAvroPayload(Option<GenericRecord> record) {
-    super(record);
-  }
-
-  @Override
-  public OverwriteWithLatestAvroPayload 
preCombine(OverwriteWithLatestAvroPayload oldValue) {
-    if (oldValue.recordBytes.length == 0) {
-      // use natural order for delete record
-      return this;
-    }
-    if (oldValue.orderingVal.compareTo(orderingVal) >= 0) {
-      // pick the payload with greatest ordering value
-      return oldValue;
-    } else {
-      return this;
-    }
-  }
-
-  @Override
-  protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
-                                                IndexedRecord incomingRecord, 
Properties properties) {
-    /*
-     * Combining strategy here returns currentValue on disk if incoming record 
is older absolutely.
-     * The incoming record can be either a delete (sent as an upsert with 
_hoodie_is_deleted set to true)
-     * or an insert/update record. In any case, if it is older absolutely than 
the record in disk, the currentValue
-     * in disk is returned (to be rewritten with new commit time).
-     */
-    String orderField = ConfigUtils.getOrderingField(properties);
-    if (orderField == null) {
-      return true;
-    }
-    boolean consistentLogicalTimestampEnabled = 
Boolean.parseBoolean(properties.getProperty(
-            
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-            
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
-    Object persistedOrderingVal = 
HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
-            orderField,
-            true, consistentLogicalTimestampEnabled);
-    Comparable incomingOrderingVal = (Comparable) 
HoodieAvroUtils.getNestedFieldVal((GenericRecord) incomingRecord,
-            orderField,
-            true, consistentLogicalTimestampEnabled);
-    return persistedOrderingVal == null || ((Comparable) 
persistedOrderingVal).compareTo(incomingOrderingVal) < 0;
-  }
-}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestFirstValueAvroPayload.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestFirstValueAvroPayload.java
deleted file mode 100644
index de1bcc65d65..00000000000
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestFirstValueAvroPayload.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.common.model;
-
-import org.apache.hudi.common.testutils.PreCombineTestUtils;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Properties;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-public class TestFirstValueAvroPayload {
-
-  private Schema schema;
-  private Properties props;
-
-  @BeforeEach
-  public void setUp() throws Exception {
-    schema = Schema.createRecord(Arrays.asList(
-            new Schema.Field("id", Schema.create(Schema.Type.STRING), "", 
null),
-            new Schema.Field("partition", Schema.create(Schema.Type.STRING), 
"", null),
-            new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null),
-            new Schema.Field("_hoodie_is_deleted", 
Schema.create(Schema.Type.BOOLEAN), "", false)
-    ));
-    props = new Properties();
-    props.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, 
"ts");
-    props.setProperty(HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP_KEY, 
"ts");
-  }
-
-  @ParameterizedTest
-  
@MethodSource("org.apache.hudi.common.testutils.PreCombineTestUtils#configurePreCombine")
-  public void testActiveRecordsForFirstValueAvroPayload(String key) throws 
IOException {
-    PreCombineTestUtils.setPreCombineConfig(props, key, "ts");
-    GenericRecord record1 = new GenericData.Record(schema);
-    record1.put("id", "0");
-    record1.put("partition", "partition0");
-    record1.put("ts", 0L);
-    record1.put("_hoodie_is_deleted", false);
-
-    GenericRecord record2 = new GenericData.Record(schema);
-    record2.put("id", "0");
-    record2.put("partition", "partition0");
-    record2.put("ts", 0L);
-    record2.put("_hoodie_is_deleted", false);
-
-    DefaultHoodieRecordPayload payload1 = new FirstValueAvroPayload(record1, 
1);
-    DefaultHoodieRecordPayload payload2 = new FirstValueAvroPayload(record2, 
1);
-    assertEquals(payload1.preCombine(payload2, props), payload2);
-    assertEquals(payload2.preCombine(payload1, props), payload1);
-
-    assertEquals(record1, payload1.getInsertValue(schema, props).get());
-    assertEquals(record2, payload2.getInsertValue(schema, props).get());
-
-    assertEquals(payload1.combineAndGetUpdateValue(record2, schema, 
props).get(), record2);
-    assertEquals(payload2.combineAndGetUpdateValue(record1, schema, 
props).get(), record1);
-  }
-}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index 43c080464ce..bb7b4533418 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -231,64 +231,6 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     }
   }
 
-  test("Test FirstValueAvroPayload test") {
-    withTempDir { tmp =>
-      val targetTable = generateTableName
-      val tablePath = s"${tmp.getCanonicalPath}/$targetTable"
-
-      spark.sql(
-        s"""
-           |create table ${targetTable} (
-           |  `id` string,
-           |  `name` string,
-           |  `dt` bigint,
-           |  `day` STRING,
-           |  `hour` INT
-           |) using hudi
-           |tblproperties (
-           |  'primaryKey' = 'id',
-           |  'type' = 'mor',
-           |  'preCombineField'='dt',
-           |  'hoodie.index.type' = 'BUCKET',
-           |  'hoodie.bucket.index.hash.field' = 'id',
-           |  'hoodie.bucket.index.num.buckets'=12,
-           |  
'hoodie.datasource.write.payload.class'='org.apache.hudi.common.model.FirstValueAvroPayload'
-           | )
-           partitioned by (`day`,`hour`)
-           location '${tablePath}'
-           """.stripMargin)
-
-      spark.sql("set hoodie.file.group.reader.enabled=false")
-
-      spark.sql(
-        s"""
-           |insert into ${targetTable}
-           |select '1' as id, 'aa' as name, 123 as dt, '2024-02-19' as `day`, 
10 as `hour`
-           |""".stripMargin)
-
-      spark.sql(
-        s"""
-           |insert into ${targetTable}
-           |select '1' as id, 'bb' as name, 123 as dt, '2024-02-19' as `day`, 
10 as `hour`
-           |""".stripMargin)
-
-      checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 
10")(
-        Seq("1", "aa", 123, "2024-02-19", 10)
-      )
-
-      spark.sql(
-        s"""
-           |insert into ${targetTable}
-           |select '1' as id, 'cc' as name, 124 as dt, '2024-02-19' as `day`, 
10 as `hour`
-           |""".stripMargin)
-
-      checkAnswer(s"select id, name, dt, day, hour from $targetTable limit 
10")(
-        Seq("1", "cc", 124, "2024-02-19", 10)
-      )
-
-    }
-  }
-
   test("Test Insert Into with values") {
     withRecordType()(withTempDir { tmp =>
       val tableName = generateTableName
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
index ea19f9812b7..22244c8f4d5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable2.scala
@@ -1289,84 +1289,4 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase 
{
       }
     }
   }
-
-  test("Test MergeInto with CUSTOM merge mode using FirstValueAvroPayload") {
-    withRecordType()(withTempDir { tmp =>
-      Seq("cow", "mor").foreach { tableType =>
-        val targetTable = generateTableName
-        spark.sql(
-          s"""
-            |create table $targetTable (
-            |  id INT,
-            |  name STRING,
-            |  price INT,
-            |  ts BIGINT
-            |) using hudi
-            |TBLPROPERTIES (
-            |  type = '$tableType',
-            |  primaryKey = 'id',
-            |  preCombineField = 'ts',
-            |  recordMergeMode = '${RecordMergeMode.CUSTOM.name()}',
-            |  
'hoodie.datasource.write.payload.class'='org.apache.hudi.common.model.FirstValueAvroPayload',
-            |  hoodie.datasource.write.recordkey.field = 'id'
-            | )
-            |LOCATION '${tmp.getCanonicalPath}/$targetTable'
-            |""".stripMargin)
-
-          spark.sql(
-            s"""
-              |INSERT INTO $targetTable
-              |SELECT id, name, price, ts
-              |FROM (
-              |    SELECT 1 as id, 'John Doe Initial' as name, 19 as price, 
1598886001 as ts
-              |     UNION ALL
-              |     SELECT 2, 'Jane Doe', 24, 1598972400
-              |     UNION ALL
-              |     SELECT 3, 'Bob Smith', 14, 1599058800
-              |)
-              |""".stripMargin)
-
-          spark.sql(
-            s"""
-              |MERGE INTO $targetTable t
-              |USING (
-              | SELECT
-              |   CAST(1 AS INT) as id,
-              |   CAST('John Doe Updated' AS STRING) as name,
-              |   CAST(19 AS INT) as price,
-              |   CAST(1598886001 AS BIGINT) as ts
-              | UNION ALL
-              | SELECT
-              |   CAST(2 AS INT),
-              |   CAST('Jane Doe Updated' AS STRING),
-              |   CAST(24 AS INT),
-              |   CAST(1598972401 AS BIGINT)
-              | UNION ALL
-              | SELECT
-              |   CAST(4 AS INT),
-              |   CAST('Alice Johnson' AS STRING),
-              |   CAST(49 AS INT),
-              |   CAST(2 AS BIGINT)
-              |) s
-              |ON t.price = s.price
-              |WHEN MATCHED THEN UPDATE SET
-              |    t.id = s.id,
-              |    t.name = s.name,
-              |    t.price = s.price,
-              |    t.ts = s.ts
-              |WHEN NOT MATCHED THEN INSERT
-              |    (id, name, price, ts)
-              |VALUES
-              |    (s.id, s.name, s.price, s.ts)
-              |""".stripMargin)
-
-          checkAnswer(s"select id, name, price, ts from $targetTable ORDER BY 
id")(
-            Seq(1, "John Doe Initial", 19, 1598886001L),
-            Seq(2, "Jane Doe Updated", 24, 1598972401L),
-            Seq(3, "Bob Smith", 14, 1599058800L),
-            Seq(4, "Alice Johnson", 49, 2L))
-        }
-      }
-    )
-  }
 }

Reply via email to