linliu-code commented on code in PR #13947:
URL: https://github.com/apache/hudi/pull/13947#discussion_r2368556645


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -160,55 +185,64 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
     }
     // Validate snapshot query.
     val df = spark.read.format("hudi").load(basePath)
-    val finalDf = df.select("ts", "_event_lsn", "rider", "driver", "fare", 
"Op", "_event_seq", DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME)
+    val finalDf = df.select("ts", "_event_lsn", "rider", "driver", "fare", 
"Op", "_event_seq", DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME, 
DebeziumConstants.FLATTENED_OP_COL_NAME)
       .sort("_event_lsn")
-    val expectedData = getExpectedResultForSnapshotQuery(payloadClazz)
+    val expectedData = getExpectedResultForSnapshotQuery(payloadClazz, 
useOpAsDelete)
     val expectedDf = 
spark.createDataFrame(spark.sparkContext.parallelize(expectedData)).toDF(columns:
 _*).sort("_event_lsn")
-    expectedDf.show(false)
-    finalDf.show(false)
     assertTrue(expectedDf.except(finalDf).isEmpty && 
finalDf.except(expectedDf).isEmpty)
     // Validate time travel query.
     val timeTravelDf = spark.read.format("hudi")
       .option("as.of.instant", firstUpdateInstantTime).load(basePath)
-      .select("ts", "_event_lsn", "rider", "driver", "fare", "Op", 
"_event_seq", DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME)
+      .select("ts", "_event_lsn", "rider", "driver", "fare", "Op", 
"_event_seq", DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME, 
DebeziumConstants.FLATTENED_OP_COL_NAME)
       .sort("_event_lsn")
-    timeTravelDf.show(false)
-    val expectedTimeTravelData = 
getExpectedResultForTimeTravelQuery(payloadClazz)
+    val expectedTimeTravelData = 
getExpectedResultForTimeTravelQuery(payloadClazz, useOpAsDelete)
     val expectedTimeTravelDf = spark.createDataFrame(
       spark.sparkContext.parallelize(expectedTimeTravelData)).toDF(columns: 
_*).sort("_event_lsn")
-    expectedTimeTravelDf.show(false)
-    timeTravelDf.show(false)
     assertTrue(
       expectedTimeTravelDf.except(timeTravelDf).isEmpty
-      && timeTravelDf.except(expectedTimeTravelDf).isEmpty)
+        && timeTravelDf.except(expectedTimeTravelDf).isEmpty)
   }
 
   @ParameterizedTest
   @MethodSource(Array("providePayloadClassTestCases"))
   def testMergerBuiltinPayloadFromTableCreationPath(tableType: String,
                                                     payloadClazz: String,
+                                                    useOpAsDeleteStr: String,
                                                     expectedConfigs: 
Map[String, String]): Unit = {
+    val useOpAsDelete = useOpAsDeleteStr.equals("true")
+    val deleteOpts: Map[String, String] = if (useOpAsDelete) {
+      Map(DefaultHoodieRecordPayload.DELETE_KEY -> "Op", 
DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
+    } else {
+      Map.empty
+    }
     val opts: Map[String, String] = Map(
       HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz,
-      HoodieMetadataConfig.ENABLE.key() -> "false")
+      HoodieMetadataConfig.ENABLE.key() -> "false") ++ deleteOpts
     val columns = Seq("ts", "_event_lsn", "rider", "driver", "fare", "Op", 
"_event_seq",
-      DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME)
+      DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME, 
DebeziumConstants.FLATTENED_OP_COL_NAME)
     // 1. Add an insert.
     val data = Seq(
-      (10, 1L, "rider-A", "driver-A", 19.10, "i", "10.1", 10, 1),
-      (10, 2L, "rider-B", "driver-B", 27.70, "i", "10.1", 10, 1),
-      (10, 3L, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1),
-      (10, 4L, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1),
-      (10, 5L, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1))
+      (10, 1L, "rider-A", "driver-A", 19.10, "i", "10.1", 10, 1, "i"),
+      (10, 2L, "rider-B", "driver-B", 27.70, "i", "10.1", 10, 1, "i"),
+      (10, 3L, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1, "i"),
+      (10, 4L, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"),
+      (10, 5L, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1, "i"))
     val inserts = spark.createDataFrame(data).toDF(columns: _*)
-    val orderingFields = if 
(payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
-      "_event_bin_file,_event_pos"
+    val originalOrderingFields = if 
(payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
+      "_event_seq"
     } else {
       "ts"
     }
+    val expectedOrderingFields = if 
(payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
+      "_event_bin_file,_event_pos"
+    } else if 
(payloadClazz.equals(classOf[PostgresDebeziumAvroPayload].getName)) {
+      "_event_lsn"
+    } else {
+      originalOrderingFields
+    }
     inserts.write.format("hudi").
       option(RECORDKEY_FIELD.key(), "_event_lsn").

Review Comment:
   For other payload, the ordering fields can be any columns; but for 
`PostgresDebeziumAvroPayload` it has to be `_event_lsn`. Therefore, I felt 
there is no need to set different columns for other payloads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to