yihua commented on code in PR #12602:
URL: https://github.com/apache/hudi/pull/12602#discussion_r1914094643
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -309,16 +322,73 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase
{
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs).asScala.toSeq,
2))
}
- def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row],
beforeDf: Dataset[Row]): Unit = {
- dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
- inputDf.createOrReplaceTempView("inputTbl")
- beforeDf.createOrReplaceTempView("beforeTbl")
- val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from hudiTbl")
- val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from inputTbl")
- val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from beforeTbl")
+ def compareUpdateDfWithHudiRows(inputRows: Array[Row], hudiRows: Array[Row],
beforeRows: Array[Row]): Unit = {
+ // Helper function to get _row_key from a Row
+ def getRowKey(row: Row): String = row.getAs[String]("_row_key")
- assertEquals(hudiDfToCompare.intersect(inputDfToCompare).count,
inputDfToCompare.count)
-
assertEquals(hudiDfToCompare.except(inputDfToCompare).except(beforeDfToCompare).count,
0)
+ // Create hashmaps for O(1) lookups
+ val inputRowMap = inputRows.map(row => getRowKey(row) -> row).toMap
+ val beforeRowMap = beforeRows.map(row => getRowKey(row) -> row).toMap
+
+ // Check that all input rows exist in hudiRows
+ inputRows.foreach { inputRow =>
Review Comment:
Got it. I think it's fine now. The reason I asked is because there are two
methods doing similar things; so wondering if we can make that simpler if we do
want to duplicate the code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]