nsivabalan commented on code in PR #13519:
URL: https://github.com/apache/hudi/pull/13519#discussion_r2279985317


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -110,46 +127,104 @@ class TestPayloadDeprecationFlow extends 
SparkClientFunctionalTestHarness {
           option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
           
option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1").
           option(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
-            classOf[MySqlDebeziumAvroPayload].getName). // Position is 
important.
+            classOf[MySqlDebeziumAvroPayload].getName).
           mode(SaveMode.Append).
           save(basePath)
       }
     }
+
     // 5. Validate.
+    // Validate table configs.
+    tableConfig = metaClient.getTableConfig
+    expectedConfigs.foreach { case (key, expectedValue) =>
+      if (expectedValue != null) {
+        assertEquals(expectedValue, tableConfig.getString(key), s"Config $key 
should be $expectedValue")
+      } else {
+        assertFalse(tableConfig.contains(key), s"Config $key should not be 
present")
+      }
+    }
+    // Validate snapshot query.
     val df = spark.read.format("hudi").load(basePath)
-    val finalDf = df.select("ts", "key", "rider", "driver", "fare", 
"Op").sort("key")
+    val finalDf = df.select("ts", "_event_lsn", "rider", "driver", "fare", 
"Op", "_event_seq").sort("_event_lsn")
+    val expectedData = getExpectedResultForSnapshotQuery(payloadClazz)
+    val expectedDf = 
spark.createDataFrame(spark.sparkContext.parallelize(expectedData)).toDF(columns:
 _*).sort("_event_lsn")
+    expectedDf.show(false)
+    finalDf.show(false)
+    assertTrue(expectedDf.except(finalDf).isEmpty && 
finalDf.except(expectedDf).isEmpty)
+    // Validate time travel query.
+    val timeTravelDf = spark.read.format("hudi")
+      .option("as.of.instant", firstUpdateInstantTime).load(basePath)
+      .select("ts", "_event_lsn", "rider", "driver", "fare", "Op", 
"_event_seq").sort("_event_lsn")
+    timeTravelDf.show(false)
+    val expectedTimeTravelData = 
getExpectedResultForTimeTravelQuery(payloadClazz)
+    val expectedTimeTravelDf = spark.createDataFrame(
+      spark.sparkContext.parallelize(expectedTimeTravelData)).toDF(columns: 
_*).sort("_event_lsn")
+    expectedTimeTravelDf.show(false)
+    timeTravelDf.show(false)
+    assertTrue(
+      expectedTimeTravelDf.except(timeTravelDf).isEmpty
+      && timeTravelDf.except(expectedTimeTravelDf).isEmpty)
+  }
+
+  def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig = {
+    val props = TypedProperties.fromMap(hudiOpts.asJava)
+    HoodieWriteConfig.newBuilder()
+      .withProps(props)
+      .withPath(basePath())
+      .build()
+  }
 
-    val expectedData = if 
(!payloadClazz.equals(classOf[AWSDmsAvroPayload].getName)) {
-      if 
(HoodieTableConfig.EVENT_TIME_ORDERING_PAYLOADS.contains(payloadClazz)) {
+  def getExpectedResultForSnapshotQuery(payloadClazz: String): Seq[(Int, Long, 
String, String, Double, String, String)] = {
+    if (!payloadClazz.equals(classOf[AWSDmsAvroPayload].getName)) {
+      if (payloadClazz.equals(classOf[PartialUpdateAvroPayload].getName)
+        || payloadClazz.equals(classOf[EventTimeAvroPayload].getName)
+        || payloadClazz.equals(classOf[DefaultHoodieRecordPayload].getName)
+        || payloadClazz.equals(classOf[PostgresDebeziumAvroPayload].getName)
+        || payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
         Seq(
-          (11, "1", "rider-X", "driver-X", 19.10, "D"),
-          (11, "2", "rider-Y", "driver-Y", 27.70, "u"),
-          (12, "3", "rider-CC", "driver-CC", 33.90, "i"),
-          (10, "4", "rider-D", "driver-D", 34.15, "i"),
-          (12, "5", "rider-EE", "driver-EE", 17.85, "i"))
+          (11, 1, "rider-X", "driver-X", 19.10, "D", "11.1"),
+          (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1"),
+          (12, 3, "rider-CC", "driver-CC", 33.90, "i", "12.1"),
+          (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1"),
+          (12, 5, "rider-EE", "driver-EE", 17.85, "i", "12.1"))
       } else {
         Seq(
-          (11, "1", "rider-X", "driver-X", 19.10, "D"),
-          (11, "2", "rider-Y", "driver-Y", 27.70, "u"),
-          (12, "3", "rider-CC", "driver-CC", 33.90, "i"),
-          (9, "4", "rider-DD", "driver-DD", 34.15, "i"),
-          (12, "5", "rider-EE", "driver-EE", 17.85, "i"))
+          (11, 1, "rider-X", "driver-X", 19.10, "D", "11.1"),
+          (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1"),
+          (12, 3, "rider-CC", "driver-CC", 33.90, "i", "12.1"),
+          (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1"),
+          (12, 5, "rider-EE", "driver-EE", 17.85, "i", "12.1"))
       }
     } else {
       Seq(
-        (11, "2", "rider-Y", "driver-Y", 27.70, "u"),
-        (12, "3", "rider-CC", "driver-CC", 33.90, "i"),
-        (9, "4", "rider-DD", "driver-DD", 34.15, "i"),
-        (12, "5", "rider-EE", "driver-EE", 17.85, "i"))
+        (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1"),
+        (12, 3, "rider-CC", "driver-CC", 33.90, "i", "12.1"),
+        (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1"),
+        (12, 5, "rider-EE", "driver-EE", 17.85, "i", "12.1"))
+    }
+  }
+
+  def getExpectedResultForTimeTravelQuery(payloadClazz: String):
+  Seq[(Int, Long, String, String, Double, String, String)] = {
+    if (!payloadClazz.equals(classOf[AWSDmsAvroPayload].getName)) {
+      Seq(
+        (11, 1, "rider-X", "driver-X", 19.10, "D", "11.1"),
+        (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1"),
+        (10, 3, "rider-C", "driver-C", 33.90, "i", "10.1"),
+        (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1"),
+        (10, 5, "rider-E", "driver-E", 17.85, "i", "10.1"))
+    } else {
+      Seq(
+        (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1"),
+        (10, 3, "rider-C", "driver-C", 33.90, "i", "10.1"),
+        (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1"),
+        (10, 5, "rider-E", "driver-E", 17.85, "i", "10.1"))
     }
-    val expectedDf = spark.createDataFrame(
-      spark.sparkContext.parallelize(expectedData)).toDF(columns: 
_*).sort("key")
-    assertTrue(
-      expectedDf.except(finalDf).isEmpty && finalDf.except(expectedDf).isEmpty)
   }
 }
 
 // TODO: Add COPY_ON_WRITE table type tests when write path is updated 
accordingly.
+// TODO: Add Test for MySqlDebeziumAvroPayload.

Review Comment:
   I see we have a seperate PR https://github.com/apache/hudi/pull/13685
   I am ok w/ adding tests for mysql payloads in a follow up patch. 
   Lets plan to land this patch. And so, it will help Tim test his patch on 
fixing record creations. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to