Davis-Zhang-Onehouse commented on code in PR #12602:
URL: https://github.com/apache/hudi/pull/12602#discussion_r1908061113


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -30,16 +30,15 @@ import 
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
-import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils}
-
+import org.apache.hudi.DataSourceReadOptions

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java:
##########
@@ -98,6 +106,20 @@ public static String latestCommit(HoodieStorage storage, 
String basePath) {
     return timeline.lastInstant().get().requestedTime();
   }
 
+  /**
+   * Returns the last successful write operation's completed instant.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public static HoodieInstant latestCompletedCommitCompletionTime(FileSystem 
fs, String basePath) {

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -309,16 +322,73 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase 
{
     
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs).asScala.toSeq,
 2))
   }
 
-  def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row], 
beforeDf: Dataset[Row]): Unit = {
-    dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
-    inputDf.createOrReplaceTempView("inputTbl")
-    beforeDf.createOrReplaceTempView("beforeTbl")
-    val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from hudiTbl")
-    val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from inputTbl")
-    val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from beforeTbl")
+  def compareUpdateDfWithHudiRows(inputRows: Array[Row], hudiRows: Array[Row], 
beforeRows: Array[Row]): Unit = {

Review Comment:
   I can do that, it will requires refactoring other consumers as well



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -309,16 +322,73 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase 
{
     
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs).asScala.toSeq,
 2))
   }
 
-  def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row], 
beforeDf: Dataset[Row]): Unit = {
-    dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
-    inputDf.createOrReplaceTempView("inputTbl")
-    beforeDf.createOrReplaceTempView("beforeTbl")
-    val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from hudiTbl")
-    val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from inputTbl")
-    val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from beforeTbl")
+  def compareUpdateDfWithHudiRows(inputRows: Array[Row], hudiRows: Array[Row], 
beforeRows: Array[Row]): Unit = {
+    // Helper function to get _row_key from a Row
+    def getRowKey(row: Row): String = row.getAs[String]("_row_key")
 
-    assertEquals(hudiDfToCompare.intersect(inputDfToCompare).count, 
inputDfToCompare.count)
-    
assertEquals(hudiDfToCompare.except(inputDfToCompare).except(beforeDfToCompare).count,
 0)
+    // Create hashmaps for O(1) lookups
+    val inputRowMap = inputRows.map(row => getRowKey(row) -> row).toMap
+    val beforeRowMap = beforeRows.map(row => getRowKey(row) -> row).toMap
+
+    // Check that all input rows exist in hudiRows
+    inputRows.foreach { inputRow =>
+      val key = getRowKey(inputRow)
+      val hudiRow = hudiRows.find(row => getRowKey(row) == key)
+      assertTrue(hudiRow.isDefined && rowsEqual(inputRow, hudiRow.get),
+        s"Input row with _row_key: $key not found in Hudi rows or content 
mismatch")
+    }
+
+    // Check that each hudi row either exists in input or before
+    hudiRows.foreach { hudiRow =>
+      val key = getRowKey(hudiRow)
+      val foundInInput = inputRowMap.get(key).exists(row => rowsEqual(hudiRow, 
row))
+      val foundInBefore = !foundInInput && beforeRowMap.get(key).exists(row => 
rowsEqual(hudiRow, row))
+
+      assertTrue(foundInInput || foundInBefore,
+        s"Hudi row with _row_key: $key not found in either input or before 
rows")
+    }
+  }
+
+  // Helper function to check if two rows are equal (comparing only the 
columns we care about)
+  def rowsEqual(row1: Row, row2: Row): Boolean = {
+    // Get schemas from rows
+    val schema1 = row1.asInstanceOf[GenericRowWithSchema].schema
+    val schema2 = row2.asInstanceOf[GenericRowWithSchema].schema
+
+    // Verify schemas are identical
+    if (schema1 != schema2) {
+      throw new AssertionError(
+        s"""Schemas are different:
+            |Schema 1: ${schema1.treeString}
+            |Schema 2: ${schema2.treeString}""".stripMargin)
+    }
+
+    // Compare all fields using schema
+    schema1.fields.forall { field =>
+      val idx1 = row1.fieldIndex(field.name)
+      val idx2 = row2.fieldIndex(field.name)
+      row1.get(idx1) == row2.get(idx2)
+    }
+  }
+
+  def compareUpdateRowsWithHudiRows(inputRows: Array[Row], hudiRows: 
Array[Row], beforeRows: Array[Row]): Unit = {

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -91,87 +90,101 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
     val dataGen = new 
HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA, 
0xDEED)
 
     //Bulk insert first set of records
-    val inputDf0 = generateInserts(dataGen, "000", 100).cache()
+    val inputDf0 = generateInserts(dataGen, "000", 10).cache()
     insertInto(tableName, tableBasePath, inputDf0, BULK_INSERT, 
isMetadataEnabled, 1)
+    val inputDf0Rows = canonicalizeDF(inputDf0).collect()
+    inputDf0.unpersist(true)
     assertTrue(hasNewCommits(fs, tableBasePath, "000"))
     //Verify bulk insert works correctly
-    val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled).cache()
-    assertEquals(100, snapshotDf1.count())
-    compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1)
-    snapshotDf1.unpersist(true)
+    val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled)
+    val snapshotDf1Rows = 
canonicalizeDF(dropMetaColumns(snapshotDf1)).collect()
+    assertEquals(10, snapshotDf1.count())
+    compareEntireInputRowsWithHudiRows(inputDf0Rows, snapshotDf1Rows)
 
     //Test updated records
-    val updateDf = generateUniqueUpdates(dataGen, "001", 50).cache()
+    val updateDf = generateUniqueUpdates(dataGen, "001", 5).cache()
     insertInto(tableName, tableBasePath, updateDf, UPSERT, isMetadataEnabled, 
2)
-    val commitInstantTime2 = latestCommit(fs, tableBasePath)
-    val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled).cache()
-    assertEquals(100, snapshotDf2.count())
-    compareUpdateDfWithHudiDf(updateDf, snapshotDf2, snapshotDf1)
-    snapshotDf2.unpersist(true)
+    val commitCompletedInstant2 = latestCompletedCommitCompletionTime(fs, 
tableBasePath)
+    val commitCompletionTime2 = commitCompletedInstant2.getCompletionTime
+    val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled)
+    val snapshotDf2Rows = 
canonicalizeDF(dropMetaColumns(snapshotDf2)).collect()
+    assertEquals(10, snapshotDf2Rows.length)
+    compareUpdateRowsWithHudiRows(
+      canonicalizeDF(updateDf).collect(),
+      snapshotDf2Rows,
+      snapshotDf1Rows)
+    updateDf.unpersist(true)
 
-    val inputDf2 = generateUniqueUpdates(dataGen, "002", 60).cache()
+    val inputDf2 = generateUniqueUpdates(dataGen, "002", 6).cache()
     val uniqueKeyCnt2 = inputDf2.select("_row_key").distinct().count()
     insertInto(tableName, tableBasePath, inputDf2, UPSERT, isMetadataEnabled, 
3)
-    val commitInstantTime3 = latestCommit(fs, tableBasePath)
+    val commitCompletedInstant3 = latestCompletedCommitCompletionTime(fs, 
tableBasePath)

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java:
##########
@@ -98,6 +106,20 @@ public static String latestCommit(HoodieStorage storage, 
String basePath) {
     return timeline.lastInstant().get().requestedTime();
   }
 
+  /**
+   * Returns the last successful write operation's completed instant.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public static HoodieInstant latestCompletedCommitCompletionTime(FileSystem 
fs, String basePath) {

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -309,16 +322,73 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase 
{
     
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs).asScala.toSeq,
 2))
   }
 
-  def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row], 
beforeDf: Dataset[Row]): Unit = {
-    dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
-    inputDf.createOrReplaceTempView("inputTbl")
-    beforeDf.createOrReplaceTempView("beforeTbl")
-    val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from hudiTbl")
-    val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from inputTbl")
-    val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from beforeTbl")
+  def compareUpdateDfWithHudiRows(inputRows: Array[Row], hudiRows: Array[Row], 
beforeRows: Array[Row]): Unit = {
+    // Helper function to get _row_key from a Row
+    def getRowKey(row: Row): String = row.getAs[String]("_row_key")
 
-    assertEquals(hudiDfToCompare.intersect(inputDfToCompare).count, 
inputDfToCompare.count)
-    
assertEquals(hudiDfToCompare.except(inputDfToCompare).except(beforeDfToCompare).count,
 0)
+    // Create hashmaps for O(1) lookups
+    val inputRowMap = inputRows.map(row => getRowKey(row) -> row).toMap
+    val beforeRowMap = beforeRows.map(row => getRowKey(row) -> row).toMap
+
+    // Check that all input rows exist in hudiRows
+    inputRows.foreach { inputRow =>

Review Comment:
   I was thinking the same but it is not the case:
   we need to keep 3 idx for the 3 arrays, search in both inputRows and 
beforeRows for each row in hudiRows. Also need to handle various cases where 
the key cannot be found in total it leads to ~100 lines of code.
   
   I can do that if required, the current one is the most concise one (but not 
the most efficient one since we are just handling couple of hundred rows)



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -91,87 +90,101 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
     val dataGen = new 
HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA, 
0xDEED)
 
     //Bulk insert first set of records
-    val inputDf0 = generateInserts(dataGen, "000", 100).cache()
+    val inputDf0 = generateInserts(dataGen, "000", 10).cache()
     insertInto(tableName, tableBasePath, inputDf0, BULK_INSERT, 
isMetadataEnabled, 1)
+    val inputDf0Rows = canonicalizeDF(inputDf0).collect()
+    inputDf0.unpersist(true)
     assertTrue(hasNewCommits(fs, tableBasePath, "000"))
     //Verify bulk insert works correctly
-    val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled).cache()
-    assertEquals(100, snapshotDf1.count())
-    compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1)
-    snapshotDf1.unpersist(true)
+    val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled)
+    val snapshotDf1Rows = 
canonicalizeDF(dropMetaColumns(snapshotDf1)).collect()
+    assertEquals(10, snapshotDf1.count())
+    compareEntireInputRowsWithHudiRows(inputDf0Rows, snapshotDf1Rows)
 
     //Test updated records
-    val updateDf = generateUniqueUpdates(dataGen, "001", 50).cache()
+    val updateDf = generateUniqueUpdates(dataGen, "001", 5).cache()
     insertInto(tableName, tableBasePath, updateDf, UPSERT, isMetadataEnabled, 
2)
-    val commitInstantTime2 = latestCommit(fs, tableBasePath)
-    val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled).cache()
-    assertEquals(100, snapshotDf2.count())
-    compareUpdateDfWithHudiDf(updateDf, snapshotDf2, snapshotDf1)
-    snapshotDf2.unpersist(true)
+    val commitCompletedInstant2 = latestCompletedCommitCompletionTime(fs, 
tableBasePath)

Review Comment:
   done for all



##########
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java:
##########
@@ -98,6 +106,20 @@ public static String latestCommit(HoodieStorage storage, 
String basePath) {
     return timeline.lastInstant().get().requestedTime();
   }
 
+  /**
+   * Returns the last successful write operation's completed instant.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public static HoodieInstant latestCompletedCommitCompletionTime(FileSystem 
fs, String basePath) {
+    HoodieTimeline timeline = allCompletedCommitsCompactions(fs, basePath);
+    return timeline.lastInstant().get();
+  }
+
+  public static HoodieInstant 
latestCompletedCommitCompletionTime(HoodieStorage storage, String basePath) {

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -91,87 +90,101 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
     val dataGen = new 
HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA, 
0xDEED)
 
     //Bulk insert first set of records
-    val inputDf0 = generateInserts(dataGen, "000", 100).cache()
+    val inputDf0 = generateInserts(dataGen, "000", 10).cache()

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java:
##########
@@ -76,14 +76,22 @@ public static List<String> listCommitsSince(HoodieStorage 
storage, String basePa
 
   // this is used in the integration test script: 
docker/demo/sparksql-incremental.commands
   public static List<String> listCompletionTimeSince(FileSystem fs, String 
basePath,
-      String instantTimestamp) {
+                                                     String instantTimestamp) {

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -91,87 +90,101 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
     val dataGen = new 
HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA, 
0xDEED)
 
     //Bulk insert first set of records
-    val inputDf0 = generateInserts(dataGen, "000", 100).cache()
+    val inputDf0 = generateInserts(dataGen, "000", 10).cache()
     insertInto(tableName, tableBasePath, inputDf0, BULK_INSERT, 
isMetadataEnabled, 1)
+    val inputDf0Rows = canonicalizeDF(inputDf0).collect()
+    inputDf0.unpersist(true)
     assertTrue(hasNewCommits(fs, tableBasePath, "000"))
     //Verify bulk insert works correctly
-    val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled).cache()
-    assertEquals(100, snapshotDf1.count())
-    compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1)
-    snapshotDf1.unpersist(true)
+    val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled)
+    val snapshotDf1Rows = 
canonicalizeDF(dropMetaColumns(snapshotDf1)).collect()
+    assertEquals(10, snapshotDf1.count())
+    compareEntireInputRowsWithHudiRows(inputDf0Rows, snapshotDf1Rows)
 
     //Test updated records
-    val updateDf = generateUniqueUpdates(dataGen, "001", 50).cache()
+    val updateDf = generateUniqueUpdates(dataGen, "001", 5).cache()
     insertInto(tableName, tableBasePath, updateDf, UPSERT, isMetadataEnabled, 
2)
-    val commitInstantTime2 = latestCommit(fs, tableBasePath)
-    val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled).cache()
-    assertEquals(100, snapshotDf2.count())
-    compareUpdateDfWithHudiDf(updateDf, snapshotDf2, snapshotDf1)
-    snapshotDf2.unpersist(true)
+    val commitCompletedInstant2 = latestCompletedCommitCompletionTime(fs, 
tableBasePath)
+    val commitCompletionTime2 = commitCompletedInstant2.getCompletionTime
+    val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled)
+    val snapshotDf2Rows = 
canonicalizeDF(dropMetaColumns(snapshotDf2)).collect()
+    assertEquals(10, snapshotDf2Rows.length)
+    compareUpdateRowsWithHudiRows(
+      canonicalizeDF(updateDf).collect(),
+      snapshotDf2Rows,
+      snapshotDf1Rows)
+    updateDf.unpersist(true)
 
-    val inputDf2 = generateUniqueUpdates(dataGen, "002", 60).cache()
+    val inputDf2 = generateUniqueUpdates(dataGen, "002", 6).cache()
     val uniqueKeyCnt2 = inputDf2.select("_row_key").distinct().count()
     insertInto(tableName, tableBasePath, inputDf2, UPSERT, isMetadataEnabled, 
3)
-    val commitInstantTime3 = latestCommit(fs, tableBasePath)
+    val commitCompletedInstant3 = latestCompletedCommitCompletionTime(fs, 
tableBasePath)
+    val commitCompletionTime3 = commitCompletedInstant3.getCompletionTime
     assertEquals(3, listCommitsSince(fs, tableBasePath, "000").size())
 
-    val snapshotDf3 = doSnapshotRead(tableName, isMetadataEnabled).cache()
-    assertEquals(100, snapshotDf3.count())
-    compareUpdateDfWithHudiDf(inputDf2, snapshotDf3, snapshotDf3)
-    snapshotDf3.unpersist(true)
+    val snapshotDf3Rows = canonicalizeDF(doSnapshotRead(tableName, 
isMetadataEnabled)).collect()
+    assertEquals(10, snapshotDf3Rows.length)
+    compareUpdateRowsWithHudiRows(canonicalizeDF(inputDf2).collect(),
+      snapshotDf3Rows, snapshotDf3Rows)
+    inputDf2.unpersist(true)
 
     // Read Incremental Query, uses hudi_table_changes() table valued function 
for spark sql
     // we have 2 commits, try pulling the first commit (which is not the 
latest)
     //HUDI-5266
-    val firstCommit = listCommitsSince(fs, tableBasePath, "000").get(0)
+    val firstCommitInstant = listCompletedInstantSince(fs, tableBasePath, 
"000").get(0)
+    val firstCommit = firstCommitInstant.getCompletionTime

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -331,6 +401,32 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
     assertEquals(hudiDfToCompare.except(inputDfToCompare).count, 0)
   }
 
+  private def compareEntireInputRowsWithHudiRows(snapshotDf2Rows: Array[Row], 
timeTravelDfRows: Array[Row]): Unit = {

Review Comment:
   done



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala:
##########
@@ -331,6 +401,32 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
     assertEquals(hudiDfToCompare.except(inputDfToCompare).count, 0)
   }
 
+  private def compareEntireInputRowsWithHudiRows(snapshotDf2Rows: Array[Row], 
timeTravelDfRows: Array[Row]): Unit = {

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to