nsivabalan commented on code in PR #13519:
URL: https://github.com/apache/hudi/pull/13519#discussion_r2279985317
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPayloadDeprecationFlow.scala:
##########
@@ -110,46 +127,104 @@ class TestPayloadDeprecationFlow extends
SparkClientFunctionalTestHarness {
option(HoodieCompactionConfig.INLINE_COMPACT.key(), "false").
option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1").
option(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(),
- classOf[MySqlDebeziumAvroPayload].getName). // Position is
important.
+ classOf[MySqlDebeziumAvroPayload].getName).
mode(SaveMode.Append).
save(basePath)
}
}
+
// 5. Validate.
+ // Validate table configs.
+ tableConfig = metaClient.getTableConfig
+ expectedConfigs.foreach { case (key, expectedValue) =>
+ if (expectedValue != null) {
+ assertEquals(expectedValue, tableConfig.getString(key), s"Config $key
should be $expectedValue")
+ } else {
+ assertFalse(tableConfig.contains(key), s"Config $key should not be
present")
+ }
+ }
+ // Validate snapshot query.
val df = spark.read.format("hudi").load(basePath)
- val finalDf = df.select("ts", "key", "rider", "driver", "fare",
"Op").sort("key")
+ val finalDf = df.select("ts", "_event_lsn", "rider", "driver", "fare",
"Op", "_event_seq").sort("_event_lsn")
+ val expectedData = getExpectedResultForSnapshotQuery(payloadClazz)
+ val expectedDf =
spark.createDataFrame(spark.sparkContext.parallelize(expectedData)).toDF(columns:
_*).sort("_event_lsn")
+ expectedDf.show(false)
+ finalDf.show(false)
+ assertTrue(expectedDf.except(finalDf).isEmpty &&
finalDf.except(expectedDf).isEmpty)
+ // Validate time travel query.
+ val timeTravelDf = spark.read.format("hudi")
+ .option("as.of.instant", firstUpdateInstantTime).load(basePath)
+ .select("ts", "_event_lsn", "rider", "driver", "fare", "Op",
"_event_seq").sort("_event_lsn")
+ timeTravelDf.show(false)
+ val expectedTimeTravelData =
getExpectedResultForTimeTravelQuery(payloadClazz)
+ val expectedTimeTravelDf = spark.createDataFrame(
+ spark.sparkContext.parallelize(expectedTimeTravelData)).toDF(columns:
_*).sort("_event_lsn")
+ expectedTimeTravelDf.show(false)
+ timeTravelDf.show(false)
+ assertTrue(
+ expectedTimeTravelDf.except(timeTravelDf).isEmpty
+ && timeTravelDf.except(expectedTimeTravelDf).isEmpty)
+ }
+
+ def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig = {
+ val props = TypedProperties.fromMap(hudiOpts.asJava)
+ HoodieWriteConfig.newBuilder()
+ .withProps(props)
+ .withPath(basePath())
+ .build()
+ }
- val expectedData = if
(!payloadClazz.equals(classOf[AWSDmsAvroPayload].getName)) {
- if
(HoodieTableConfig.EVENT_TIME_ORDERING_PAYLOADS.contains(payloadClazz)) {
+ def getExpectedResultForSnapshotQuery(payloadClazz: String): Seq[(Int, Long,
String, String, Double, String, String)] = {
+ if (!payloadClazz.equals(classOf[AWSDmsAvroPayload].getName)) {
+ if (payloadClazz.equals(classOf[PartialUpdateAvroPayload].getName)
+ || payloadClazz.equals(classOf[EventTimeAvroPayload].getName)
+ || payloadClazz.equals(classOf[DefaultHoodieRecordPayload].getName)
+ || payloadClazz.equals(classOf[PostgresDebeziumAvroPayload].getName)
+ || payloadClazz.equals(classOf[MySqlDebeziumAvroPayload].getName)) {
Seq(
- (11, "1", "rider-X", "driver-X", 19.10, "D"),
- (11, "2", "rider-Y", "driver-Y", 27.70, "u"),
- (12, "3", "rider-CC", "driver-CC", 33.90, "i"),
- (10, "4", "rider-D", "driver-D", 34.15, "i"),
- (12, "5", "rider-EE", "driver-EE", 17.85, "i"))
+ (11, 1, "rider-X", "driver-X", 19.10, "D", "11.1"),
+ (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1"),
+ (12, 3, "rider-CC", "driver-CC", 33.90, "i", "12.1"),
+ (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1"),
+ (12, 5, "rider-EE", "driver-EE", 17.85, "i", "12.1"))
} else {
Seq(
- (11, "1", "rider-X", "driver-X", 19.10, "D"),
- (11, "2", "rider-Y", "driver-Y", 27.70, "u"),
- (12, "3", "rider-CC", "driver-CC", 33.90, "i"),
- (9, "4", "rider-DD", "driver-DD", 34.15, "i"),
- (12, "5", "rider-EE", "driver-EE", 17.85, "i"))
+ (11, 1, "rider-X", "driver-X", 19.10, "D", "11.1"),
+ (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1"),
+ (12, 3, "rider-CC", "driver-CC", 33.90, "i", "12.1"),
+ (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1"),
+ (12, 5, "rider-EE", "driver-EE", 17.85, "i", "12.1"))
}
} else {
Seq(
- (11, "2", "rider-Y", "driver-Y", 27.70, "u"),
- (12, "3", "rider-CC", "driver-CC", 33.90, "i"),
- (9, "4", "rider-DD", "driver-DD", 34.15, "i"),
- (12, "5", "rider-EE", "driver-EE", 17.85, "i"))
+ (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1"),
+ (12, 3, "rider-CC", "driver-CC", 33.90, "i", "12.1"),
+ (9, 4, "rider-DD", "driver-DD", 34.15, "i", "9.1"),
+ (12, 5, "rider-EE", "driver-EE", 17.85, "i", "12.1"))
+ }
+ }
+
+ def getExpectedResultForTimeTravelQuery(payloadClazz: String):
+ Seq[(Int, Long, String, String, Double, String, String)] = {
+ if (!payloadClazz.equals(classOf[AWSDmsAvroPayload].getName)) {
+ Seq(
+ (11, 1, "rider-X", "driver-X", 19.10, "D", "11.1"),
+ (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1"),
+ (10, 3, "rider-C", "driver-C", 33.90, "i", "10.1"),
+ (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1"),
+ (10, 5, "rider-E", "driver-E", 17.85, "i", "10.1"))
+ } else {
+ Seq(
+ (11, 2, "rider-Y", "driver-Y", 27.70, "u", "11.1"),
+ (10, 3, "rider-C", "driver-C", 33.90, "i", "10.1"),
+ (10, 4, "rider-D", "driver-D", 34.15, "i", "10.1"),
+ (10, 5, "rider-E", "driver-E", 17.85, "i", "10.1"))
}
- val expectedDf = spark.createDataFrame(
- spark.sparkContext.parallelize(expectedData)).toDF(columns:
_*).sort("key")
- assertTrue(
- expectedDf.except(finalDf).isEmpty && finalDf.except(expectedDf).isEmpty)
}
}
// TODO: Add COPY_ON_WRITE table type tests when write path is updated
accordingly.
+// TODO: Add Test for MySqlDebeziumAvroPayload.
Review Comment:
I see we have a seperate PR https://github.com/apache/hudi/pull/13685
I am ok w/ adding tests for mysql payloads in a follow up patch.
Lets plan to land this patch. And so, it will help Tim test his patch on
fixing record creations.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]