nsivabalan commented on code in PR #13519:
URL: https://github.com/apache/hudi/pull/13519#discussion_r2218109328


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java:
##########
@@ -228,7 +233,9 @@ public void run(HoodieTableVersion toVersion, String 
instantTime) {
     }
   }
 
-  protected Map<ConfigProperty, String> upgrade(HoodieTableVersion 
fromVersion, HoodieTableVersion toVersion, String instantTime) {
+  protected Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
upgrade(HoodieTableVersion fromVersion,
+                                                                            
HoodieTableVersion toVersion,
+                                                                            
String instantTime) {

Review Comment:
   why does the format changes for untouched files. can you check it out



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java:
##########
@@ -214,4 +237,35 @@ static void rollbackFailedWritesAndCompact(HoodieTable 
table, HoodieEngineContex
       throw new HoodieException(e);
     }
   }
+
+  // If the metadata table is enabled for the data table, and
+  // existing metadata table is behind the data table, then delete it.
+  public static void checkAndHandleMetadataTable(HoodieEngineContext context,
+                                                 HoodieTable table,
+                                                 HoodieWriteConfig config,
+                                                 HoodieTableMetaClient 
metaClient) {
+    if (!table.isMetadataTable()
+        && config.isMetadataTableEnabled()
+        && isMetadataTableBehindDataTable(config, metaClient)) {
+      HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), 
context);
+    }
+  }
+
+  public static boolean isMetadataTableBehindDataTable(HoodieWriteConfig 
config,

Review Comment:
   why public. can we make it private? 



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java:
##########
@@ -239,9 +239,9 @@ public void 
testHoodieFlinkCompactorWithUpgradeAndDowngrade(boolean upgrade) thr
       if (upgrade) {
         metaClient.getTableConfig().setTableVersion(HoodieTableVersion.SIX);
         HoodieTableConfig.update(metaClient.getStorage(), 
metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
-        new UpgradeDowngrade(metaClient, writeClient.getConfig(), 
writeClient.getEngineContext(), 
FlinkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.EIGHT, 
"none");
+        new UpgradeDowngrade(metaClient, writeClient.getConfig(), 
writeClient.getEngineContext(), 
FlinkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.NINE, "none");

Review Comment:
   why not `HoodieTableVersion.current()` 



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.PartialUpdateMode;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.config.RecordMergeMode.COMMIT_TIME_ORDERING;
+import static 
org.apache.hudi.common.config.RecordMergeMode.EVENT_TIME_ORDERING;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.DEBEZIUM_UNAVAILABLE_VALUE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_CUSTOM_MARKER;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static org.apache.hudi.common.table.PartialUpdateMode.IGNORE_MARKERS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestEightToNineUpgradeHandler {
+  private final EightToNineUpgradeHandler handler = new 
EightToNineUpgradeHandler();
+  private final HoodieEngineContext context = mock(HoodieEngineContext.class);
+  private final HoodieTable table = mock(HoodieTable.class);
+  private final HoodieTableMetaClient metaClient = 
mock(HoodieTableMetaClient.class);
+  private final HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+  private final SupportsUpgradeDowngrade upgradeDowngradeHelper =
+      mock(SupportsUpgradeDowngrade.class);
+  private final HoodieWriteConfig config = mock(HoodieWriteConfig.class);
+
+  @BeforeEach
+  public void setUp() {
+    when(upgradeDowngradeHelper.getTable(any(), any())).thenReturn(table);
+    when(table.getMetaClient()).thenReturn(metaClient);
+    when(metaClient.getTableConfig()).thenReturn(tableConfig);
+    when(config.autoUpgrade()).thenReturn(true);
+  }
+
+  @Test
+  void testDefaultHoodieRecordPayload() {
+    try (org.mockito.MockedStatic<UpgradeDowngradeUtils> utilities =
+             org.mockito.Mockito.mockStatic(UpgradeDowngradeUtils.class)) {
+      utilities.when(() -> 
UpgradeDowngradeUtils.rollbackFailedWritesAndCompact(
+              any(), any(), any(), any(), anyBoolean(), any()))
+          .thenAnswer(invocation -> null);
+      
when(tableConfig.getPayloadClass()).thenReturn(DefaultHoodieRecordPayload.class.getName());
+      Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
propertiesToHandle =
+          handler.upgrade(config, context, "anyInstant", 
upgradeDowngradeHelper);
+      Map<ConfigProperty, String> propertiesToAdd = 
propertiesToHandle.getLeft();
+      List<ConfigProperty> propertiesToRemove = propertiesToHandle.getRight();
+      assertTrue(propertiesToAdd.containsKey(MERGE_PROPERTIES));
+      assertTrue(StringUtils.isNullOrEmpty(
+          propertiesToAdd.get(MERGE_PROPERTIES)));
+      assertFalse(propertiesToAdd.containsKey(RECORD_MERGE_MODE));
+      assertTrue(propertiesToAdd.containsKey(PARTIAL_UPDATE_MODE));
+      assertEquals(
+          PartialUpdateMode.NONE.name(),
+          propertiesToAdd.get(PARTIAL_UPDATE_MODE));
+      assertPayloadClassChange(
+          propertiesToAdd, propertiesToRemove, 
DefaultHoodieRecordPayload.class.getName());
+    }
+  }
+
+  @Test
+  void testOverwriteWithLatestAvroPayload() {

Review Comment:
   we could make all these tests as parametrized and reduce the code 
duplication 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java:
##########
@@ -214,4 +237,35 @@ static void rollbackFailedWritesAndCompact(HoodieTable 
table, HoodieEngineContex
       throw new HoodieException(e);
     }
   }
+
+  // If the metadata table is enabled for the data table, and
+  // existing metadata table is behind the data table, then delete it.
+  public static void checkAndHandleMetadataTable(HoodieEngineContext context,

Review Comment:
   why public. can we make it package private.
   



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java:
##########
@@ -147,6 +148,11 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
 
   public static final String EXTRA_COL_SCHEMA1 = "{\"name\": 
\"extra_column1\", \"type\": [\"null\", \"string\"], \"default\": null },";
   public static final String EXTRA_COL_SCHEMA2 = "{\"name\": 
\"extra_column2\", \"type\": [\"null\", \"string\"], \"default\": null},";
+  public static final String EXTRA_COL_SCHEMA_FOR_AWS_DMS_PAYLOAD = 
"{\"name\": \"Op\", \"type\": [\"null\", \"string\"], \"default\": null},";
+  public static final String EXTRA_COL_SCHEMA_FOR_POSTGRES_PAYLOAD = 
"{\"name\": \"_event_lsn\", \"type\": [\"null\", \"long\"], \"default\": 
null},";
+  public static final String TRIP_EXAMPLE_SCHEMA_WITH_SPECIFIC_COLUMNS =

Review Comment:
   `TRIP_EXAMPLE_SCHEMA_WITH_PAYLOAD_SPECIFIC_COLS`



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java:
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.LEGACY_PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.MERGE_PROPERTIES;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PARTIAL_UPDATE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.PAYLOAD_CLASS_NAME;
+import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
+import static 
org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_STRATEGY_ID;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import org.mockito.MockedStatic;
+
+class TestNineToEightDowngradeHandler {
+  private final NineToEightDowngradeHandler handler = new 
NineToEightDowngradeHandler();
+  private final HoodieWriteConfig config = mock(HoodieWriteConfig.class);
+  private final HoodieEngineContext context = mock(HoodieEngineContext.class);
+  private final HoodieTable table = mock(HoodieTable.class);
+  private HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+  private HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+  private SupportsUpgradeDowngrade upgradeDowngradeHelper = 
mock(SupportsUpgradeDowngrade.class);
+
+  @BeforeEach
+  public void setUp() {
+    when(upgradeDowngradeHelper.getTable(any(), any())).thenReturn(table);
+    when(table.getMetaClient()).thenReturn(metaClient);
+    when(metaClient.getTableConfig()).thenReturn(tableConfig);
+  }
+
+  @Test
+  void testDowngradeForAWSDmsAvroPayload() {
+    try (MockedStatic<UpgradeDowngradeUtils> utilities =
+             org.mockito.Mockito.mockStatic(UpgradeDowngradeUtils.class)) {
+      utilities.when(() -> 
UpgradeDowngradeUtils.rollbackFailedWritesAndCompact(
+          any(), any(), any(), any(), anyBoolean(), any()))
+          .thenAnswer(invocation -> null);
+      
when(tableConfig.getLegacyPayloadClass()).thenReturn(AWSDmsAvroPayload.class.getName());
+      Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
propertiesToChange =
+          handler.downgrade(config, context, "anyInstant", 
upgradeDowngradeHelper);
+      assertEquals(3, propertiesToChange.getRight().size());
+      assertEquals(MERGE_PROPERTIES, propertiesToChange.getRight().get(0));
+      assertEquals(PARTIAL_UPDATE_MODE, propertiesToChange.getRight().get(1));
+      assertEquals(LEGACY_PAYLOAD_CLASS_NAME, 
propertiesToChange.getRight().get(2));
+      assertEquals(3, propertiesToChange.getLeft().size());
+      assertEquals(
+          RecordMergeMode.CUSTOM.name(),
+          propertiesToChange.getLeft().get(RECORD_MERGE_MODE));
+      assertEquals(
+          PAYLOAD_BASED_MERGE_STRATEGY_UUID,
+          propertiesToChange.getLeft().get(RECORD_MERGE_STRATEGY_ID));
+      assertEquals(
+          AWSDmsAvroPayload.class.getName(),
+          propertiesToChange.getLeft().get(PAYLOAD_CLASS_NAME));
+    }
+  }
+
+  @Test
+  void testDowngradeForOverwriteNonDefaultsWithLatestAvroPayload() {

Review Comment:
   same comment as above. lets parametrize all these and reduce code 
duplication 



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java:
##########
@@ -477,6 +486,22 @@ private void generateFareNestedValues(GenericRecord rec) {
     rec.put("fare", fareRecord);
   }
 
+  /**
+   * Populate "Op" column.
+   */
+  private void generateOpColumnValue(GenericRecord rec) {
+    // No delete records; otherwise, it is hard to data validation.
+    int index = rand.nextInt(2);
+    rec.put("Op", OPERATIONS[index]);
+  }
+
+  /**
+   * Populate "_event_lsn" column.
+   */
+  private void generateEventLSNValue(GenericRecord rec) {
+    rec.put("_event_lsn", rand.nextLong());

Review Comment:
   this is not guaranteed to always generate records w/ higher ordering value 
right. 
   i.e. 
   in commit 1, if we ingest 10 records. 
   and for commit 2, if we generate updates for the same 10, we expect the 
updates should be part of final snapshot. but w/ this random generation, 
records could end up having lower ordering value right. 
   
   how did you manage to validate your tests



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -116,11 +142,137 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
     assertTrue(
       expectedDf.except(finalDf).isEmpty && finalDf.except(expectedDf).isEmpty)
   }
+
+  /**
+   * Test if the payload based read produce the same result when upgrade 
happens in between.
+   */
+  @ParameterizedTest
+  @MethodSource(Array("provideParamsForUpgradeBehavior"))
+  def testUpgradeUntilTableVersionNine(tableType: String,
+                                       payloadClazz: String): Unit = {
+    val opts: Map[String, String] = Map(
+      HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz,
+      HoodieTableConfig.MERGE_PROPERTIES.key() ->
+        "hoodie.payload.delete.field=Op,hoodie.payload.delete.marker=d")
+    val columns = Seq("ts", "key", "rider", "driver", "fare", "Op")
+
+    // 1. Add an insert.
+    val data = Seq(
+      (10, "1", "rider-A", "driver-A", 19.10, "i"),
+      (10, "2", "rider-B", "driver-B", 27.70, "i"),
+      (10, "3", "rider-C", "driver-C", 33.90, "i"),
+      (10, "4", "rider-D", "driver-D", 34.15, "i"),
+      (10, "5", "rider-E", "driver-E", 17.85, "i"))
+    val inserts = spark.createDataFrame(data).toDF(columns: _*)
+    inserts.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+      option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "6").
+      options(opts).
+      mode(SaveMode.Overwrite).
+      save(basePath)
+
+    // Validate table version.
+    var metaClient = getHoodieMetaClient(storageConf(), basePath())
+    assertEquals(HoodieTableVersion.SIX, 
metaClient.getTableConfig.getTableVersion)
+
+    // 2. Add an update.
+    val firstUpdateData = Seq(
+      (11, "1", "rider-X", "driver-X", 19.10, "d"),
+      (11, "2", "rider-Y", "driver-Y", 27.70, "u"))
+    val firstUpdate = spark.createDataFrame(firstUpdateData).toDF(columns: _*)
+    firstUpdate.write.format("hudi").
+      option(OPERATION.key(), "upsert").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+      option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "8").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    // Validate table version.
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    assertEquals(HoodieTableVersion.EIGHT, 
metaClient.getTableConfig.getTableVersion)
+
+    // 3. Add an update.
+    val secondUpdateData = Seq(
+      (12, "3", "rider-CC", "driver-CC", 33.90, "i"),
+      (9, "4", "rider-DD", "driver-DD", 34.15, "i"),
+      (12, "5", "rider-EE", "driver-EE", 17.85, "i"))
+    val secondUpdate = spark.createDataFrame(secondUpdateData).toDF(columns: 
_*)
+    secondUpdate.write.format("hudi").
+      option(OPERATION.key(), "upsert").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "true").
+      option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 
"1").
+      option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "9").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    // Validate table version.
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    assertEquals(HoodieTableVersion.EIGHT, 
metaClient.getTableConfig.getTableVersion)
+
+    // 4. Validate.
+    val df = spark.read.format("hudi").options(opts).load(basePath)
+    val finalDf = df.select("ts", "key", "rider", "driver", "fare", 
"Op").sort("key")
+
+    val expectedData = if 
(!payloadClazz.equals(classOf[AWSDmsAvroPayload].getName)) {
+      if (payloadClazz.equals(classOf[PartialUpdateAvroPayload].getName)
+        || payloadClazz.equals(classOf[EventTimeAvroPayload].getName)) {
+        Seq(
+          (11, "1", "rider-X", "driver-X", 19.10, "d"),
+          (11, "2", "rider-Y", "driver-Y", 27.70, "u"),
+          (12, "3", "rider-CC", "driver-CC", 33.90, "i"),
+          (10, "4", "rider-D", "driver-D", 34.15, "i"),
+          (12, "5", "rider-EE", "driver-EE", 17.85, "i"))
+      } else {
+        Seq(
+          (11, "1", "rider-X", "driver-X", 19.10, "d"),
+          (11, "2", "rider-Y", "driver-Y", 27.70, "u"),
+          (12, "3", "rider-CC", "driver-CC", 33.90, "i"),
+          (9, "4", "rider-DD", "driver-DD", 34.15, "i"),
+          (12, "5", "rider-EE", "driver-EE", 17.85, "i"))
+      }
+    } else {
+      Seq(
+      (11, "2", "rider-Y", "driver-Y", 27.70, "u"),
+      (12, "3", "rider-CC", "driver-CC", 33.90, "i"),
+      (9, "4", "rider-DD", "driver-DD", 34.15, "i"),
+      (12, "5", "rider-EE", "driver-EE", 17.85, "i"))
+    }
+    val expectedDf = spark.createDataFrame(
+      spark.sparkContext.parallelize(expectedData)).toDF(columns: 
_*).sort("key")
+    assertTrue(
+      expectedDf.except(finalDf).isEmpty && finalDf.except(expectedDf).isEmpty)
+  }
 }
 
-// TODO: Add COPY_ON_WRITE table type tests when write path is updated 
accordingly.
+// TODO: Add COPY_ON_WRITE table type tests when write path is updated 
accordingly.s
 object TestPayloadDeprecationFlow {
-  def provideParams(): java.util.List[Arguments] = {
+  def provideParamsForPayloadBehavior(): java.util.List[Arguments] = {
+    java.util.Arrays.asList(
+      Arguments.of("MERGE_ON_READ", 
classOf[OverwriteWithLatestAvroPayload].getName, "6"),

Review Comment:
   can we add COW table type as well 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -116,11 +142,137 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
     assertTrue(
       expectedDf.except(finalDf).isEmpty && finalDf.except(expectedDf).isEmpty)
   }
+
+  /**
+   * Test if the payload based read produce the same result when upgrade 
happens in between.
+   */
+  @ParameterizedTest
+  @MethodSource(Array("provideParamsForUpgradeBehavior"))
+  def testUpgradeUntilTableVersionNine(tableType: String,
+                                       payloadClazz: String): Unit = {
+    val opts: Map[String, String] = Map(
+      HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz,
+      HoodieTableConfig.MERGE_PROPERTIES.key() ->
+        "hoodie.payload.delete.field=Op,hoodie.payload.delete.marker=d")
+    val columns = Seq("ts", "key", "rider", "driver", "fare", "Op")
+
+    // 1. Add an insert.
+    val data = Seq(
+      (10, "1", "rider-A", "driver-A", 19.10, "i"),
+      (10, "2", "rider-B", "driver-B", 27.70, "i"),
+      (10, "3", "rider-C", "driver-C", 33.90, "i"),
+      (10, "4", "rider-D", "driver-D", 34.15, "i"),
+      (10, "5", "rider-E", "driver-E", 17.85, "i"))
+    val inserts = spark.createDataFrame(data).toDF(columns: _*)
+    inserts.write.format("hudi").
+      option(RECORDKEY_FIELD.key(), "key").
+      option(PRECOMBINE_FIELD.key(), "ts").
+      option(TABLE_TYPE.key(), tableType).
+      option(DataSourceWriteOptions.TABLE_NAME.key(), "test_table").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+      option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "6").
+      options(opts).
+      mode(SaveMode.Overwrite).
+      save(basePath)
+
+    // Validate table version.
+    var metaClient = getHoodieMetaClient(storageConf(), basePath())
+    assertEquals(HoodieTableVersion.SIX, 
metaClient.getTableConfig.getTableVersion)
+
+    // 2. Add an update.
+    val firstUpdateData = Seq(
+      (11, "1", "rider-X", "driver-X", 19.10, "d"),
+      (11, "2", "rider-Y", "driver-Y", 27.70, "u"))
+    val firstUpdate = spark.createDataFrame(firstUpdateData).toDF(columns: _*)
+    firstUpdate.write.format("hudi").
+      option(OPERATION.key(), "upsert").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
+      option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "8").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    // Validate table version.
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    assertEquals(HoodieTableVersion.EIGHT, 
metaClient.getTableConfig.getTableVersion)
+
+    // 3. Add an update.
+    val secondUpdateData = Seq(
+      (12, "3", "rider-CC", "driver-CC", 33.90, "i"),
+      (9, "4", "rider-DD", "driver-DD", 34.15, "i"),
+      (12, "5", "rider-EE", "driver-EE", 17.85, "i"))
+    val secondUpdate = spark.createDataFrame(secondUpdateData).toDF(columns: 
_*)
+    secondUpdate.write.format("hudi").
+      option(OPERATION.key(), "upsert").
+      option(HoodieCompactionConfig.INLINE_COMPACT.key(), "true").
+      option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 
"1").
+      option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), "9").
+      options(opts).
+      mode(SaveMode.Append).
+      save(basePath)
+
+    // Validate table version.
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    assertEquals(HoodieTableVersion.EIGHT, 
metaClient.getTableConfig.getTableVersion)
+
+    // 4. Validate.
+    val df = spark.read.format("hudi").options(opts).load(basePath)

Review Comment:
   we might also need to do time travel query to older commit times.
   bcoz, the latest snapshot you are reading is written using version 9. But we 
are also interested to see if a file slice was written using version 8, after 
upgrading the table to version 9, reading the older file slice using the new 
merge mode and partial update mode works as expected. 
   
   So, lets do the following. 
   
   Lets start w/ version 6. 
   Commit 1 -> inserts. 
   commit 2 -> updates. 
   commit 3 -> updates. upgrade the table to version 8 in this commit. // this 
will automatically trigger compaction as well. 
   commit 4 -> updates. 
   commit 5 -> updates. again, let this commit upgrade the table to v9. // this 
will automatically trigger compaction. 
   commit 6 -> updates. 
   
   Lets do snapshot query for commit 6. 
   And do time travel query for commit 5, 4, 3, 2 and 1. 
   All of them should succeed w/o any issues. 
   
   
   



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional
+
+import 
org.apache.hudi.DataSourceWriteOptions.{INSERT_OVERWRITE_OPERATION_OPT_VAL, 
PARTITIONPATH_FIELD, PAYLOAD_CLASS_NAME, RECORD_MERGE_IMPL_CLASSES, TABLE_TYPE, 
UPSERT_OPERATION_OPT_VAL}
+import org.apache.hudi.common.config.{HoodieStorageConfig, RecordMergeMode}
+import org.apache.hudi.common.model.{AWSDmsAvroPayload, HoodieRecordMerger, 
HoodieTableType, OverwriteNonDefaultsWithLatestAvroPayload, 
PartialUpdateAvroPayload}
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload.{DELETE_KEY, 
DELETE_MARKER}
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
HoodieTableVersion, PartialUpdateMode}
+import 
org.apache.hudi.common.table.HoodieTableConfig.{DEBEZIUM_UNAVAILABLE_VALUE, 
PARTIAL_UPDATE_CUSTOM_MARKER}
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, 
UpgradeDowngrade}
+
+import org.apache.spark.sql.SaveMode
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+class TestEightToNineUpgrade extends RecordLevelIndexTestBase {
+  @ParameterizedTest
+  @MethodSource(Array("payloadConfigs"))
+  def testPartitionFieldsWithUpgrade(tableType: HoodieTableType, payloadClass: 
String): Unit = {
+    val partitionFields = "partition:simple"
+    val mergerClasses = "org.apache.hudi.DefaultSparkRecordMerger," +
+      "org.apache.hudi.OverwriteWithLatestSparkRecordMerger," +
+      "org.apache.hudi.common.model.HoodieAvroRecordMerger"
+    val hudiOpts= commonOpts ++ Map(

Review Comment:
   I assume the test set up is as follows: 
   start a table w/ v8. 
   do 1 or 2 commits. 
   and then it upgrades to v 9. 
   we do some validation.
   and then downgrade.
   and do validations. 
   
   but where exactly we are enforcing version 8 table for 1st commit. I don't 
see it.
   
   



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional
+
+import 
org.apache.hudi.DataSourceWriteOptions.{INSERT_OVERWRITE_OPERATION_OPT_VAL, 
PARTITIONPATH_FIELD, PAYLOAD_CLASS_NAME, RECORD_MERGE_IMPL_CLASSES, TABLE_TYPE, 
UPSERT_OPERATION_OPT_VAL}
+import org.apache.hudi.common.config.{HoodieStorageConfig, RecordMergeMode}
+import org.apache.hudi.common.model.{AWSDmsAvroPayload, HoodieRecordMerger, 
HoodieTableType, OverwriteNonDefaultsWithLatestAvroPayload, 
PartialUpdateAvroPayload}
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload.{DELETE_KEY, 
DELETE_MARKER}
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
HoodieTableVersion, PartialUpdateMode}
+import 
org.apache.hudi.common.table.HoodieTableConfig.{DEBEZIUM_UNAVAILABLE_VALUE, 
PARTIAL_UPDATE_CUSTOM_MARKER}
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, 
UpgradeDowngrade}
+
+import org.apache.spark.sql.SaveMode
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+class TestEightToNineUpgrade extends RecordLevelIndexTestBase {
+  @ParameterizedTest
+  @MethodSource(Array("payloadConfigs"))
+  def testPartitionFieldsWithUpgrade(tableType: HoodieTableType, payloadClass: 
String): Unit = {
+    val partitionFields = "partition:simple"
+    val mergerClasses = "org.apache.hudi.DefaultSparkRecordMerger," +
+      "org.apache.hudi.OverwriteWithLatestSparkRecordMerger," +
+      "org.apache.hudi.common.model.HoodieAvroRecordMerger"
+    val hudiOpts= commonOpts ++ Map(
+      TABLE_TYPE.key -> tableType.name(),
+      PARTITIONPATH_FIELD.key -> partitionFields,
+      PAYLOAD_CLASS_NAME.key -> payloadClass,
+      RECORD_MERGE_IMPL_CLASSES.key -> mergerClasses,
+      HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet"
+    )
+
+    doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = INSERT_OVERWRITE_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite,
+      validate = true,
+      schemaStr = 
HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA_WITH_SPECIFIC_COLUMNS)
+    metaClient = getLatestMetaClient(true)
+
+    // Assert table version is 9 and the partition fields in table config has 
partition type.
+    assertEquals(HoodieTableVersion.NINE, 
metaClient.getTableConfig.getTableVersion)
+    assertEquals(
+      partitionFields,
+      
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+    assertEquals(payloadClass, metaClient.getTableConfig.getPayloadClass)
+
+    // Downgrade table to version 8
+    // Assert table version is 8 and the partition fields in table config does 
not have partition type.
+    new UpgradeDowngrade(metaClient, getWriteConfig(hudiOpts), context, 
SparkUpgradeDowngradeHelper.getInstance)
+      .run(HoodieTableVersion.EIGHT, null)

Review Comment:
   slightly orthogonal question. 
   do you know where exactly do we disallow creating a table in v9 by setting a 
payload class? 



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional
+
+import 
org.apache.hudi.DataSourceWriteOptions.{INSERT_OVERWRITE_OPERATION_OPT_VAL, 
PARTITIONPATH_FIELD, PAYLOAD_CLASS_NAME, RECORD_MERGE_IMPL_CLASSES, TABLE_TYPE, 
UPSERT_OPERATION_OPT_VAL}
+import org.apache.hudi.common.config.{HoodieStorageConfig, RecordMergeMode}
+import org.apache.hudi.common.model.{AWSDmsAvroPayload, HoodieRecordMerger, 
HoodieTableType, OverwriteNonDefaultsWithLatestAvroPayload, 
PartialUpdateAvroPayload}
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload.{DELETE_KEY, 
DELETE_MARKER}
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
HoodieTableVersion, PartialUpdateMode}
+import 
org.apache.hudi.common.table.HoodieTableConfig.{DEBEZIUM_UNAVAILABLE_VALUE, 
PARTIAL_UPDATE_CUSTOM_MARKER}
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.hudi.common.util.StringUtils
+import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, 
UpgradeDowngrade}
+
+import org.apache.spark.sql.SaveMode
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+class TestEightToNineUpgrade extends RecordLevelIndexTestBase {
+  @ParameterizedTest
+  @MethodSource(Array("payloadConfigs"))
+  def testPartitionFieldsWithUpgrade(tableType: HoodieTableType, payloadClass: 
String): Unit = {
+    val partitionFields = "partition:simple"
+    val mergerClasses = "org.apache.hudi.DefaultSparkRecordMerger," +
+      "org.apache.hudi.OverwriteWithLatestSparkRecordMerger," +
+      "org.apache.hudi.common.model.HoodieAvroRecordMerger"
+    val hudiOpts= commonOpts ++ Map(
+      TABLE_TYPE.key -> tableType.name(),
+      PARTITIONPATH_FIELD.key -> partitionFields,
+      PAYLOAD_CLASS_NAME.key -> payloadClass,
+      RECORD_MERGE_IMPL_CLASSES.key -> mergerClasses,
+      HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet"
+    )
+
+    doWriteAndValidateDataAndRecordIndex(hudiOpts,
+      operation = INSERT_OVERWRITE_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite,
+      validate = true,
+      schemaStr = 
HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA_WITH_SPECIFIC_COLUMNS)
+    metaClient = getLatestMetaClient(true)
+
+    // Assert table version is 9 and the partition fields in table config has 
partition type.
+    assertEquals(HoodieTableVersion.NINE, 
metaClient.getTableConfig.getTableVersion)
+    assertEquals(
+      partitionFields,
+      
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+    assertEquals(payloadClass, metaClient.getTableConfig.getPayloadClass)
+
+    // Downgrade table to version 8
+    // Assert table version is 8 and the partition fields in table config does 
not have partition type.

Review Comment:
   is this comment right?
   why are we talking about partition type. 



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java:
##########
@@ -239,9 +239,9 @@ public void 
testHoodieFlinkCompactorWithUpgradeAndDowngrade(boolean upgrade) thr
       if (upgrade) {
         metaClient.getTableConfig().setTableVersion(HoodieTableVersion.SIX);
         HoodieTableConfig.update(metaClient.getStorage(), 
metaClient.getMetaPath(), metaClient.getTableConfig().getProps());
-        new UpgradeDowngrade(metaClient, writeClient.getConfig(), 
writeClient.getEngineContext(), 
FlinkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.EIGHT, 
"none");
+        new UpgradeDowngrade(metaClient, writeClient.getConfig(), 
writeClient.getEngineContext(), 
FlinkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.NINE, "none");
       } else {
-        metaClient.getTableConfig().setTableVersion(HoodieTableVersion.EIGHT);
+        metaClient.getTableConfig().setTableVersion(HoodieTableVersion.NINE);

Review Comment:
   same here



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -509,7 +509,9 @@ class HoodieSparkSqlWriterInternal {
             // scalastyle:on
 
             val writeConfig = client.getConfig
-            if (writeConfig.getRecordMerger.getRecordType == 
HoodieRecordType.SPARK && tableType == MERGE_ON_READ && 
writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != 
HoodieLogBlockType.PARQUET_DATA_BLOCK) {
+            if (writeConfig.getRecordMerger.getRecordType == 
HoodieRecordType.SPARK
+              && tableType == MERGE_ON_READ
+              && 
writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != 
HoodieLogBlockType.PARQUET_DATA_BLOCK) {

Review Comment:
   again. lets try to avoid making format changes for untouched lines



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to