This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d79e1bcbd17 [HUDI-8664] Fix TestSparkSqlCoreFlow to be based on 
completion time (#12602)
d79e1bcbd17 is described below

commit d79e1bcbd171d4eafa3a2ef69ad4730b3ae46aa4
Author: Davis-Zhang-Onehouse 
<[email protected]>
AuthorDate: Tue Jan 14 09:57:44 2025 -0800

    [HUDI-8664] Fix TestSparkSqlCoreFlow to be based on completion time (#12602)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../org/apache/hudi/HoodieDataSourceHelpers.java   |  33 +++-
 .../hudi/functional/TestSparkSqlCoreFlow.scala     | 196 +++++++++++++--------
 2 files changed, 148 insertions(+), 81 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
index 43a08aeb310..8db7d81c9cb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
 
 import java.util.List;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * List of helpers to aid, construction of instanttime for read and write 
operations using datasource.
@@ -75,13 +76,17 @@ public class HoodieDataSourceHelpers {
   }
 
   // this is used in the integration test script: 
docker/demo/sparksql-incremental.commands
-  public static List<String> listCompletionTimeSince(FileSystem fs, String 
basePath,
-      String instantTimestamp) {
+  public static Stream<String> streamCompletionTimeSince(FileSystem fs, String 
basePath,
+                                                         String 
instantTimestamp) {
+    return streamCompletedInstantSince(fs, basePath, instantTimestamp)
+        .map(HoodieInstant::getCompletionTime);
+  }
+
+  public static Stream<HoodieInstant> streamCompletedInstantSince(FileSystem 
fs, String basePath,
+                                                                  String 
instantTimestamp) {
     HoodieTimeline timeline = allCompletedCommitsCompactions(fs, basePath);
     return timeline.findInstantsAfter(instantTimestamp, Integer.MAX_VALUE)
-        .getInstantsOrderedByCompletionTime()
-        .map(HoodieInstant::getCompletionTime)
-        .collect(Collectors.toList());
+        .getInstantsOrderedByCompletionTime();
   }
 
   /**
@@ -89,13 +94,25 @@ public class HoodieDataSourceHelpers {
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
   public static String latestCommit(FileSystem fs, String basePath) {
-    HoodieTimeline timeline = allCompletedCommitsCompactions(fs, basePath);
-    return timeline.lastInstant().get().requestedTime();
+    return latestCompletedCommit(fs, basePath).requestedTime();
   }
 
   public static String latestCommit(HoodieStorage storage, String basePath) {
+    return latestCompletedCommit(storage, basePath).requestedTime();
+  }
+
+  /**
+   * Returns the last successful write operation's completed instant.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public static HoodieInstant latestCompletedCommit(FileSystem fs, String 
basePath) {
+    HoodieTimeline timeline = allCompletedCommitsCompactions(fs, basePath);
+    return timeline.lastInstant().get();
+  }
+
+  public static HoodieInstant latestCompletedCommit(HoodieStorage storage, 
String basePath) {
     HoodieTimeline timeline = allCompletedCommitsCompactions(storage, 
basePath);
-    return timeline.lastInstant().get().requestedTime();
+    return timeline.lastInstant().get();
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
index d2be806e5a0..e1db2adbbe4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
@@ -19,8 +19,8 @@
 
 package org.apache.hudi.functional
 
-import 
org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, 
QUERY_TYPE_SNAPSHOT_OPT_VAL}
-import org.apache.hudi.HoodieDataSourceHelpers.{hasNewCommits, latestCommit, 
listCommitsSince}
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.HoodieDataSourceHelpers.{hasNewCommits, 
latestCompletedCommit, listCommitsSince, streamCompletedInstantSince}
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.model.WriteOperationType.{BULK_INSERT, INSERT, 
UPSERT}
 import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
@@ -30,15 +30,15 @@ import 
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
-import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils}
-
 import org.apache.spark.sql
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
-import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.{DataFrame, Row}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.scalatest.Inspectors.forAll
 
 import java.io.File
+import java.util.stream.Collectors
 
 import scala.collection.JavaConverters._
 
@@ -93,85 +93,99 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
     //Bulk insert first set of records
     val inputDf0 = generateInserts(dataGen, "000", 100).cache()
     insertInto(tableName, tableBasePath, inputDf0, BULK_INSERT, 
isMetadataEnabled, 1)
+    val inputDf0Rows = canonicalizeDF(inputDf0).collect()
+    inputDf0.unpersist(true)
     assertTrue(hasNewCommits(fs, tableBasePath, "000"))
     //Verify bulk insert works correctly
-    val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled).cache()
+    val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled)
+    val snapshotDf1Rows = 
canonicalizeDF(dropMetaColumns(snapshotDf1)).collect()
     assertEquals(100, snapshotDf1.count())
-    compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1)
-    snapshotDf1.unpersist(true)
+    compareEntireInputRowsWithHudiRows(inputDf0Rows, snapshotDf1Rows)
 
     //Test updated records
     val updateDf = generateUniqueUpdates(dataGen, "001", 50).cache()
     insertInto(tableName, tableBasePath, updateDf, UPSERT, isMetadataEnabled, 
2)
-    val commitInstantTime2 = latestCommit(fs, tableBasePath)
-    val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled).cache()
-    assertEquals(100, snapshotDf2.count())
-    compareUpdateDfWithHudiDf(updateDf, snapshotDf2, snapshotDf1)
-    snapshotDf2.unpersist(true)
+    val commitInstant2 = latestCompletedCommit(fs, tableBasePath)
+    val commitCompletionTime2 = commitInstant2.getCompletionTime
+    val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled)
+    val snapshotDf2Rows = 
canonicalizeDF(dropMetaColumns(snapshotDf2)).collect()
+    assertEquals(100, snapshotDf2Rows.length)
+    compareUpdateRowsWithHudiRows(
+      canonicalizeDF(updateDf).collect(),
+      snapshotDf2Rows,
+      snapshotDf1Rows)
+    updateDf.unpersist(true)
 
     val inputDf2 = generateUniqueUpdates(dataGen, "002", 60).cache()
     val uniqueKeyCnt2 = inputDf2.select("_row_key").distinct().count()
     insertInto(tableName, tableBasePath, inputDf2, UPSERT, isMetadataEnabled, 
3)
-    val commitInstantTime3 = latestCommit(fs, tableBasePath)
+    val commitInstant3 = latestCompletedCommit(fs, tableBasePath)
+    val commitCompletionTime3 = commitInstant3.getCompletionTime
     assertEquals(3, listCommitsSince(fs, tableBasePath, "000").size())
 
-    val snapshotDf3 = doSnapshotRead(tableName, isMetadataEnabled).cache()
-    assertEquals(100, snapshotDf3.count())
-    compareUpdateDfWithHudiDf(inputDf2, snapshotDf3, snapshotDf3)
-    snapshotDf3.unpersist(true)
+    val snapshotDf3Rows = canonicalizeDF(doSnapshotRead(tableName, 
isMetadataEnabled)).collect()
+    assertEquals(100, snapshotDf3Rows.length)
+    compareUpdateRowsWithHudiRows(canonicalizeDF(inputDf2).collect(),
+      snapshotDf3Rows, snapshotDf3Rows)
+    inputDf2.unpersist(true)
 
     // Read Incremental Query, uses hudi_table_changes() table valued function 
for spark sql
     // we have 2 commits, try pulling the first commit (which is not the 
latest)
     //HUDI-5266
-    val firstCommit = listCommitsSince(fs, tableBasePath, "000").get(0)
-    val hoodieIncViewDf1 = spark.sql(s"select * from 
hudi_table_changes('$tableName', 'latest_state', 'earliest', '$firstCommit')")
+    val firstCommitInstant = streamCompletedInstantSince(fs, tableBasePath, 
"000").findFirst().get()
+    val firstCommitCompletionTime = firstCommitInstant.getCompletionTime
+    val hoodieIncViewDf1 = spark.sql(s"select * from 
hudi_table_changes('$tableName', 'latest_state', 'earliest', 
'$firstCommitCompletionTime')")
 
     assertEquals(100, hoodieIncViewDf1.count()) // 100 initial inserts must be 
pulled
     var countsPerCommit = 
hoodieIncViewDf1.groupBy("_hoodie_commit_time").count().collect()
     assertEquals(1, countsPerCommit.length)
-    assertEquals(firstCommit, countsPerCommit(0).get(0).toString)
+    assertEquals(firstCommitInstant.requestedTime(), 
countsPerCommit(0).get(0).toString)
 
-    val inputDf3 = generateUniqueUpdates(dataGen, "003", 80).cache()
+    val inputDf3 = generateUniqueUpdates(dataGen, "003", 80)
     insertInto(tableName, tableBasePath, inputDf3, UPSERT, isMetadataEnabled, 
4)
 
     //another incremental query with commit2 and commit3
     //HUDI-5266
-    val hoodieIncViewDf2 = spark.sql(s"select * from 
hudi_table_changes('$tableName', 'latest_state', '$commitInstantTime2', 
'$commitInstantTime3')")
+    val commitCompletionTime2_1 = addMinimumTimeUnit(commitCompletionTime2)
+    val hoodieIncViewDf2 = spark.sql(s"select * from 
hudi_table_changes('$tableName', 'latest_state', '$commitCompletionTime2_1', 
'$commitCompletionTime3')")
 
     assertEquals(uniqueKeyCnt2, hoodieIncViewDf2.count()) // 60 records must 
be pulled
     countsPerCommit = 
hoodieIncViewDf2.groupBy("_hoodie_commit_time").count().collect()
     assertEquals(1, countsPerCommit.length)
-    assertEquals(commitInstantTime3, countsPerCommit(0).get(0).toString)
+    assertEquals(commitInstant3.requestedTime(), 
countsPerCommit(0).get(0).toString)
 
-    val timeTravelDf = spark.sql(s"select * from $tableName timestamp as of 
'$commitInstantTime2'").cache()
-    assertEquals(100, timeTravelDf.count())
-    compareEntireInputDfWithHudiDf(snapshotDf2, timeTravelDf)
+    val commit2RequestTime = commitInstant2.requestedTime()
+    val timeTravelDf = spark.sql(s"select * from $tableName timestamp as of 
'$commit2RequestTime'")
+    val timeTravelDfRows = 
dropMetaColumns(canonicalizeDF(timeTravelDf)).collect()
+    assertEquals(100, timeTravelDfRows.length)
+    compareEntireInputRowsWithHudiRows(snapshotDf2Rows, timeTravelDfRows)
     timeTravelDf.unpersist(true)
 
     if (tableType.equals("MERGE_ON_READ")) {
-      val readOptDf = doMORReadOptimizedQuery(isMetadataEnabled, tableBasePath)
-      compareEntireInputDfWithHudiDf(inputDf0, readOptDf)
+      val readOptRows = 
canonicalizeDF(doMORReadOptimizedQuery(isMetadataEnabled, 
tableBasePath)).collect()
+      compareEntireInputRowsWithHudiRows(inputDf0Rows, readOptRows)
 
-      val snapshotDf4 = doSnapshotRead(tableName, isMetadataEnabled)
+      val snapshotDf4Rows = canonicalizeDF(doSnapshotRead(tableName, 
isMetadataEnabled)).collect()
 
       // trigger compaction and try out Read optimized query.
-      val inputDf4 = generateUniqueUpdates(dataGen, "004", 40).cache
+      val inputDf4 = generateUniqueUpdates(dataGen, "004", 40).cache()
       //count is increased by 2 because inline compaction will add extra 
commit to the timeline
       doInlineCompact(tableName, tableBasePath, inputDf4, UPSERT, 
isMetadataEnabled, "3", 6)
-      val snapshotDf5 = doSnapshotRead(tableName, isMetadataEnabled)
-      snapshotDf5.cache()
-      compareUpdateDfWithHudiDf(inputDf4, snapshotDf5, snapshotDf4)
+      val snapshotDf5Rows = canonicalizeDF(doSnapshotRead(tableName, 
isMetadataEnabled)).collect()
+      compareUpdateRowsWithHudiRows(canonicalizeDF(inputDf4).collect(), 
snapshotDf5Rows, snapshotDf4Rows)
       inputDf4.unpersist(true)
-      snapshotDf5.unpersist(true)
 
       // compaction is expected to have completed. both RO and RT are expected 
to return same results.
       compareROAndRT(isMetadataEnabled, tableName, tableBasePath)
     }
+  }
 
-    inputDf0.unpersist(true)
-    updateDf.unpersist(true)
-    inputDf2.unpersist(true)
-    inputDf3.unpersist(true)
+  private def canonicalizeDF(inputDf0: DataFrame) = {
+    inputDf0.selectExpr(colsToCompare.split(","): _*)
+  }
+
+  private def addMinimumTimeUnit(commitCompletionTime2: String) : String = {
+    String.valueOf(commitCompletionTime2.toLong + 1)
   }
 
   def doSnapshotRead(tableName: String, isMetadataEnabledOnRead: Boolean): 
sql.DataFrame = {
@@ -232,6 +246,7 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
     val metadata = TimelineUtils.getCommitMetadata(latestCommit.get(), 
timeline)
     metadata.getOperationType.equals(operationType)
   }
+
   def insertInto(tableName: String, tableBasePath: String, inputDf: 
sql.DataFrame, writeOp: WriteOperationType,
                  isMetadataEnabledOnWrite: Boolean, count: Int): Unit = {
     inputDf.select("timestamp", "_row_key", "rider", "driver", "begin_lat", 
"begin_lon", "end_lat", "end_lon", "fare",
@@ -267,6 +282,7 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
       spark.conf.unset("hoodie.sql.insert.mode")
     }
   }
+
   def createTable(tableName: String, keyGenClass: String, writeOptions: 
String, tableBasePath: String): Unit = {
     //If you have partitioned by (partition_path) with nonpartitioned keygen, 
the partition_path will be empty in the table
     val partitionedBy = if 
(!keyGenClass.equals(classOf[NonpartitionedKeyGenerator].getName)) {
@@ -309,31 +325,67 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase 
{
     
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs).asScala.toSeq,
 2))
   }
 
-  def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row], 
beforeDf: Dataset[Row]): Unit = {
-    dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
-    inputDf.createOrReplaceTempView("inputTbl")
-    beforeDf.createOrReplaceTempView("beforeTbl")
-    val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from hudiTbl")
-    val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from inputTbl")
-    val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from beforeTbl")
+  // Helper function to check if two rows are equal (comparing only the 
columns we care about)
+  def rowsEqual(row1: Row, row2: Row): Boolean = {
+    // Get schemas from rows
+    val schema1 = row1.asInstanceOf[GenericRowWithSchema].schema
+    val schema2 = row2.asInstanceOf[GenericRowWithSchema].schema
+
+    // Verify schemas are identical
+    if (schema1 != schema2) {
+      throw new AssertionError(
+        s"""Schemas are different:
+            |Schema 1: ${schema1.treeString}
+            |Schema 2: ${schema2.treeString}""".stripMargin)
+    }
 
-    assertEquals(hudiDfToCompare.intersect(inputDfToCompare).count, 
inputDfToCompare.count)
-    
assertEquals(hudiDfToCompare.except(inputDfToCompare).except(beforeDfToCompare).count,
 0)
+    // Compare all fields using schema
+    schema1.fields.forall { field =>
+      val idx1 = row1.fieldIndex(field.name)
+      val idx2 = row2.fieldIndex(field.name)
+      row1.get(idx1) == row2.get(idx2)
+    }
   }
 
-  def compareEntireInputDfWithHudiDf(inputDf: Dataset[Row], hudiDf: 
Dataset[Row]): Unit = {
-    dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
-    inputDf.createOrReplaceTempView("inputTbl")
-    val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from hudiTbl")
-    val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + " 
from inputTbl")
+  // Verify beforeRows + deltaRows = afterRows
+  // Make sure rows in [[afterRows]] are presented in either [[deltaRows]] or 
[[beforeRows]]
+  def compareUpdateRowsWithHudiRows(deltaRows: Array[Row], afterRows: 
Array[Row], beforeRows: Array[Row]): Unit = {
+    // Helper function to get _row_key from a Row
+    def getRowKey(row: Row): String = row.getAs[String]("_row_key")
+
+    // Create hashmaps for O(1) lookups
+    val deltaRowsMap = deltaRows.map(row => getRowKey(row) -> row).toMap
+    val beforeRowsMap = beforeRows.map(row => getRowKey(row) -> row).toMap
+    // Ensure no duplicated record keys.
+    assertEquals(deltaRowsMap.size, deltaRows.length)
+    assertEquals(beforeRowsMap.size, beforeRows.length)
+
+    // Check that all input rows exist in afterRows
+    deltaRows.foreach { inputRow =>
+      val key = getRowKey(inputRow)
+      val hudiRow = afterRows.find(row => getRowKey(row) == key)
+      assertTrue(hudiRow.isDefined && rowsEqual(inputRow, hudiRow.get),
+        s"Input row with _row_key: $key not found in Hudi rows or content 
mismatch")
+    }
 
-    assertEquals(hudiDfToCompare.intersect(inputDfToCompare).count, 
inputDfToCompare.count)
-    assertEquals(hudiDfToCompare.except(inputDfToCompare).count, 0)
+    // Check that each hudi row either exists in input or before
+    afterRows.foreach { hudiRow =>
+      val key = getRowKey(hudiRow)
+      val foundInInput = deltaRowsMap.get(key).exists(row => 
rowsEqual(hudiRow, row))
+      val foundInBefore = !foundInInput && beforeRowsMap.get(key).exists(row 
=> rowsEqual(hudiRow, row))
+
+      assertTrue(foundInInput || foundInBefore,
+        s"Hudi row with _row_key: $key not found in either input or before 
rows")
+    }
+  }
+
+  def compareEntireInputRowsWithHudiRows(expectedRows: Array[Row], actualRows: 
Array[Row]): Unit = {
+    compareUpdateRowsWithHudiRows(Array.empty, expectedRows, actualRows)
   }
 
   def doMORReadOptimizedQuery(isMetadataEnabledOnRead: Boolean, basePath: 
String): sql.DataFrame = {
     spark.read.format("org.apache.hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE.key, 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+      .option(QUERY_TYPE.key, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
       .option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead)
       .load(basePath)
   }
@@ -428,37 +480,35 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase 
{
     val dataGen = new 
HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA, 
0xDEED)
     val inputDf0 = generateInserts(dataGen, "000", 100).cache
     insertInto(tableName, tableBasePath, inputDf0, BULK_INSERT, 
isMetadataEnabled, 1)
+    val inputDf0Rows = canonicalizeDF(inputDf0).collect()
+    inputDf0.unpersist(true)
 
     assertTrue(hasNewCommits(fs, tableBasePath, "000"))
 
     //Snapshot query
-    val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled)
-    assertEquals(100, snapshotDf1.count())
-    compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1)
+    val snapshotDf1Rows = canonicalizeDF(doSnapshotRead(tableName, 
isMetadataEnabled)).collect()
+    assertEquals(100, snapshotDf1Rows.length)
+    compareEntireInputRowsWithHudiRows(inputDf0Rows, snapshotDf1Rows)
 
     val inputDf1 = generateInserts(dataGen, "001", 50).cache
     insertInto(tableName, tableBasePath, inputDf1, writeOp, isMetadataEnabled, 
2)
+    val inputDf1rows = canonicalizeDF(inputDf0).collect()
+    inputDf1.unpersist(true)
 
-    val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled).cache
-    assertEquals(150, snapshotDf2.count())
-    compareEntireInputDfWithHudiDf(inputDf1.union(inputDf0), snapshotDf2)
-    snapshotDf2.unpersist(true)
-
+    val snapshotDf2 = canonicalizeDF(doSnapshotRead(tableName, 
isMetadataEnabled)).collect()
+    assertEquals(150, snapshotDf2.length)
+    compareEntireInputRowsWithHudiRows(inputDf1rows ++ inputDf0Rows, 
snapshotDf2)
 
     val inputDf2 = generateInserts(dataGen, "002", 60).cache()
     insertInto(tableName, tableBasePath, inputDf2, writeOp, isMetadataEnabled, 
3)
+    val inputDf2rows = canonicalizeDF(inputDf0).collect()
+    inputDf2.unpersist(true)
 
     assertEquals(3, listCommitsSince(fs, tableBasePath, "000").size())
 
     // Snapshot Query
-    val snapshotDf3 = doSnapshotRead(tableName, isMetadataEnabled).cache
-    assertEquals(210, snapshotDf3.count())
-    compareEntireInputDfWithHudiDf(inputDf1.union(inputDf0).union(inputDf2), 
snapshotDf3)
-    snapshotDf3.unpersist(true)
-
-    inputDf0.unpersist(true)
-    inputDf1.unpersist(true)
-    inputDf2.unpersist(true)
+    val snapshotDf3 = canonicalizeDF(doSnapshotRead(tableName, 
isMetadataEnabled)).collect()
+    assertEquals(210, snapshotDf3.length)
+    compareEntireInputRowsWithHudiRows(inputDf1rows ++ inputDf0Rows ++ 
inputDf2rows, snapshotDf3)
   }
-
 }

Reply via email to