yihua commented on code in PR #13947:
URL: https://github.com/apache/hudi/pull/13947#discussion_r2366387566


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -138,8 +163,8 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
 
     // 5. Add a delete.
     val fourthUpdateData = Seq(
-      (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1),
-      (12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1))
+      (12, 3L, "rider-CC", "driver-CC", 33.90, "i", "12.1", 12, 1, "i"),
+      (12, 5L, "rider-EE", "driver-EE", 17.85, "i", "12.1", 12, 1, "i"))

Review Comment:
   Should these be `d` and let the operation be `upsert` instead of `delete`, 
or here the deletes are intended to be always commit time ordered (which 
`delete` operation follows)?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -160,55 +185,64 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
     }
     // Validate snapshot query.
     val df = spark.read.format("hudi").load(basePath)
-    val finalDf = df.select("ts", "_event_lsn", "rider", "driver", "fare", 
"Op", "_event_seq", DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME)
+    val finalDf = df.select("ts", "_event_lsn", "rider", "driver", "fare", 
"Op", "_event_seq", DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME, 
DebeziumConstants.FLATTENED_OP_COL_NAME)
       .sort("_event_lsn")
-    val expectedData = getExpectedResultForSnapshotQuery(payloadClazz)
+    val expectedData = getExpectedResultForSnapshotQuery(payloadClazz, 
useOpAsDelete)
     val expectedDf = 
spark.createDataFrame(spark.sparkContext.parallelize(expectedData)).toDF(columns:
 _*).sort("_event_lsn")
-    expectedDf.show(false)
-    finalDf.show(false)
     assertTrue(expectedDf.except(finalDf).isEmpty && 
finalDf.except(expectedDf).isEmpty)
     // Validate time travel query.
     val timeTravelDf = spark.read.format("hudi")
       .option("as.of.instant", firstUpdateInstantTime).load(basePath)
-      .select("ts", "_event_lsn", "rider", "driver", "fare", "Op", 
"_event_seq", DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME)
+      .select("ts", "_event_lsn", "rider", "driver", "fare", "Op", 
"_event_seq", DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME, 
DebeziumConstants.FLATTENED_OP_COL_NAME)
       .sort("_event_lsn")
-    timeTravelDf.show(false)
-    val expectedTimeTravelData = 
getExpectedResultForTimeTravelQuery(payloadClazz)
+    val expectedTimeTravelData = 
getExpectedResultForTimeTravelQuery(payloadClazz, useOpAsDelete)
     val expectedTimeTravelDf = spark.createDataFrame(
       spark.sparkContext.parallelize(expectedTimeTravelData)).toDF(columns: 
_*).sort("_event_lsn")
-    expectedTimeTravelDf.show(false)
-    timeTravelDf.show(false)
     assertTrue(
       expectedTimeTravelDf.except(timeTravelDf).isEmpty
-      && timeTravelDf.except(expectedTimeTravelDf).isEmpty)
+        && timeTravelDf.except(expectedTimeTravelDf).isEmpty)
   }
 
   @ParameterizedTest
   @MethodSource(Array("providePayloadClassTestCases"))
   def testMergerBuiltinPayloadFromTableCreationPath(tableType: String,
                                                     payloadClazz: String,
+                                                    useOpAsDeleteStr: String,
                                                     expectedConfigs: 
Map[String, String]): Unit = {
+    val useOpAsDelete = useOpAsDeleteStr.equals("true")
+    val deleteOpts: Map[String, String] = if (useOpAsDelete) {
+      Map(DefaultHoodieRecordPayload.DELETE_KEY -> "Op", 
DefaultHoodieRecordPayload.DELETE_MARKER -> "D")
+    } else {
+      Map.empty
+    }
     val opts: Map[String, String] = Map(
       HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClazz,
-      HoodieMetadataConfig.ENABLE.key() -> "false")
+      HoodieMetadataConfig.ENABLE.key() -> "false") ++ deleteOpts
     val columns = Seq("ts", "_event_lsn", "rider", "driver", "fare", "Op", 
"_event_seq",
-      DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME)
+      DebeziumConstants.FLATTENED_FILE_COL_NAME, 
DebeziumConstants.FLATTENED_POS_COL_NAME, 
DebeziumConstants.FLATTENED_OP_COL_NAME)
     // 1. Add an insert.
     val data = Seq(
-      (10, 1L, "rider-A", "driver-A", 19.10, "i", "10.1", 10, 1),
-      (10, 2L, "rider-B", "driver-B", 27.70, "i", "10.1", 10, 1),
-      (10, 3L, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1),
-      (10, 4L, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1),
-      (10, 5L, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1))
+      (10, 1L, "rider-A", "driver-A", 19.10, "i", "10.1", 10, 1, "i"),
+      (10, 2L, "rider-B", "driver-B", 27.70, "i", "10.1", 10, 1, "i"),
+      (10, 3L, "rider-C", "driver-C", 33.90, "i", "10.1", 10, 1, "i"),
+      (10, 4L, "rider-D", "driver-D", 34.15, "i", "10.1", 10, 1, "i"),
+      (10, 5L, "rider-E", "driver-E", 17.85, "i", "10.1", 10, 1, "i"))
     val inserts = spark.createDataFrame(data).toDF(columns: _*)
-    val orderingFields = if 
(payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
-      "_event_bin_file,_event_pos"
+    val originalOrderingFields = if 
(payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
+      "_event_seq"
     } else {
       "ts"
     }
+    val expectedOrderingFields = if 
(payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
+      "_event_bin_file,_event_pos"
+    } else if 
(payloadClazz.equals(classOf[PostgresDebeziumAvroPayload].getName)) {
+      "_event_lsn"
+    } else {
+      originalOrderingFields
+    }
     inserts.write.format("hudi").
       option(RECORDKEY_FIELD.key(), "_event_lsn").

Review Comment:
   It's a bit odd that the record key field is `_event_lsn` which is the same 
as the ordering field for `PostgresDebeziumAvroPayload` test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to