This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d79e1bcbd17 [HUDI-8664] Fix TestSparkSqlCoreFlow to be based on
completion time (#12602)
d79e1bcbd17 is described below
commit d79e1bcbd171d4eafa3a2ef69ad4730b3ae46aa4
Author: Davis-Zhang-Onehouse
<[email protected]>
AuthorDate: Tue Jan 14 09:57:44 2025 -0800
[HUDI-8664] Fix TestSparkSqlCoreFlow to be based on completion time (#12602)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../org/apache/hudi/HoodieDataSourceHelpers.java | 33 +++-
.../hudi/functional/TestSparkSqlCoreFlow.scala | 196 +++++++++++++--------
2 files changed, 148 insertions(+), 81 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
index 43a08aeb310..8db7d81c9cb 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/HoodieDataSourceHelpers.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
import java.util.List;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* List of helpers to aid, construction of instanttime for read and write
operations using datasource.
@@ -75,13 +76,17 @@ public class HoodieDataSourceHelpers {
}
// this is used in the integration test script:
docker/demo/sparksql-incremental.commands
- public static List<String> listCompletionTimeSince(FileSystem fs, String
basePath,
- String instantTimestamp) {
+ public static Stream<String> streamCompletionTimeSince(FileSystem fs, String
basePath,
+ String
instantTimestamp) {
+ return streamCompletedInstantSince(fs, basePath, instantTimestamp)
+ .map(HoodieInstant::getCompletionTime);
+ }
+
+ public static Stream<HoodieInstant> streamCompletedInstantSince(FileSystem
fs, String basePath,
+ String
instantTimestamp) {
HoodieTimeline timeline = allCompletedCommitsCompactions(fs, basePath);
return timeline.findInstantsAfter(instantTimestamp, Integer.MAX_VALUE)
- .getInstantsOrderedByCompletionTime()
- .map(HoodieInstant::getCompletionTime)
- .collect(Collectors.toList());
+ .getInstantsOrderedByCompletionTime();
}
/**
@@ -89,13 +94,25 @@ public class HoodieDataSourceHelpers {
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.STABLE)
public static String latestCommit(FileSystem fs, String basePath) {
- HoodieTimeline timeline = allCompletedCommitsCompactions(fs, basePath);
- return timeline.lastInstant().get().requestedTime();
+ return latestCompletedCommit(fs, basePath).requestedTime();
}
public static String latestCommit(HoodieStorage storage, String basePath) {
+ return latestCompletedCommit(storage, basePath).requestedTime();
+ }
+
+ /**
+ * Returns the last successful write operation's completed instant.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public static HoodieInstant latestCompletedCommit(FileSystem fs, String
basePath) {
+ HoodieTimeline timeline = allCompletedCommitsCompactions(fs, basePath);
+ return timeline.lastInstant().get();
+ }
+
+ public static HoodieInstant latestCompletedCommit(HoodieStorage storage,
String basePath) {
HoodieTimeline timeline = allCompletedCommitsCompactions(storage,
basePath);
- return timeline.lastInstant().get().requestedTime();
+ return timeline.lastInstant().get();
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
index d2be806e5a0..e1db2adbbe4 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
@@ -19,8 +19,8 @@
package org.apache.hudi.functional
-import
org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_READ_OPTIMIZED_OPT_VAL,
QUERY_TYPE_SNAPSHOT_OPT_VAL}
-import org.apache.hudi.HoodieDataSourceHelpers.{hasNewCommits, latestCommit,
listCommitsSince}
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE,
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.HoodieDataSourceHelpers.{hasNewCommits,
latestCompletedCommit, listCommitsSince, streamCompletedInstantSince}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.WriteOperationType.{BULK_INSERT, INSERT,
UPSERT}
import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
@@ -30,15 +30,15 @@ import
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.hadoop.fs.HadoopFSUtils
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
-import org.apache.hudi.{DataSourceReadOptions, HoodieSparkUtils}
-
import org.apache.spark.sql
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
-import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.{DataFrame, Row}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.scalatest.Inspectors.forAll
import java.io.File
+import java.util.stream.Collectors
import scala.collection.JavaConverters._
@@ -93,85 +93,99 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
//Bulk insert first set of records
val inputDf0 = generateInserts(dataGen, "000", 100).cache()
insertInto(tableName, tableBasePath, inputDf0, BULK_INSERT,
isMetadataEnabled, 1)
+ val inputDf0Rows = canonicalizeDF(inputDf0).collect()
+ inputDf0.unpersist(true)
assertTrue(hasNewCommits(fs, tableBasePath, "000"))
//Verify bulk insert works correctly
- val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled).cache()
+ val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled)
+ val snapshotDf1Rows =
canonicalizeDF(dropMetaColumns(snapshotDf1)).collect()
assertEquals(100, snapshotDf1.count())
- compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1)
- snapshotDf1.unpersist(true)
+ compareEntireInputRowsWithHudiRows(inputDf0Rows, snapshotDf1Rows)
//Test updated records
val updateDf = generateUniqueUpdates(dataGen, "001", 50).cache()
insertInto(tableName, tableBasePath, updateDf, UPSERT, isMetadataEnabled,
2)
- val commitInstantTime2 = latestCommit(fs, tableBasePath)
- val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled).cache()
- assertEquals(100, snapshotDf2.count())
- compareUpdateDfWithHudiDf(updateDf, snapshotDf2, snapshotDf1)
- snapshotDf2.unpersist(true)
+ val commitInstant2 = latestCompletedCommit(fs, tableBasePath)
+ val commitCompletionTime2 = commitInstant2.getCompletionTime
+ val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled)
+ val snapshotDf2Rows =
canonicalizeDF(dropMetaColumns(snapshotDf2)).collect()
+ assertEquals(100, snapshotDf2Rows.length)
+ compareUpdateRowsWithHudiRows(
+ canonicalizeDF(updateDf).collect(),
+ snapshotDf2Rows,
+ snapshotDf1Rows)
+ updateDf.unpersist(true)
val inputDf2 = generateUniqueUpdates(dataGen, "002", 60).cache()
val uniqueKeyCnt2 = inputDf2.select("_row_key").distinct().count()
insertInto(tableName, tableBasePath, inputDf2, UPSERT, isMetadataEnabled,
3)
- val commitInstantTime3 = latestCommit(fs, tableBasePath)
+ val commitInstant3 = latestCompletedCommit(fs, tableBasePath)
+ val commitCompletionTime3 = commitInstant3.getCompletionTime
assertEquals(3, listCommitsSince(fs, tableBasePath, "000").size())
- val snapshotDf3 = doSnapshotRead(tableName, isMetadataEnabled).cache()
- assertEquals(100, snapshotDf3.count())
- compareUpdateDfWithHudiDf(inputDf2, snapshotDf3, snapshotDf3)
- snapshotDf3.unpersist(true)
+ val snapshotDf3Rows = canonicalizeDF(doSnapshotRead(tableName,
isMetadataEnabled)).collect()
+ assertEquals(100, snapshotDf3Rows.length)
+ compareUpdateRowsWithHudiRows(canonicalizeDF(inputDf2).collect(),
+ snapshotDf3Rows, snapshotDf3Rows)
+ inputDf2.unpersist(true)
// Read Incremental Query, uses hudi_table_changes() table valued function
for spark sql
// we have 2 commits, try pulling the first commit (which is not the
latest)
//HUDI-5266
- val firstCommit = listCommitsSince(fs, tableBasePath, "000").get(0)
- val hoodieIncViewDf1 = spark.sql(s"select * from
hudi_table_changes('$tableName', 'latest_state', 'earliest', '$firstCommit')")
+ val firstCommitInstant = streamCompletedInstantSince(fs, tableBasePath,
"000").findFirst().get()
+ val firstCommitCompletionTime = firstCommitInstant.getCompletionTime
+ val hoodieIncViewDf1 = spark.sql(s"select * from
hudi_table_changes('$tableName', 'latest_state', 'earliest',
'$firstCommitCompletionTime')")
assertEquals(100, hoodieIncViewDf1.count()) // 100 initial inserts must be
pulled
var countsPerCommit =
hoodieIncViewDf1.groupBy("_hoodie_commit_time").count().collect()
assertEquals(1, countsPerCommit.length)
- assertEquals(firstCommit, countsPerCommit(0).get(0).toString)
+ assertEquals(firstCommitInstant.requestedTime(),
countsPerCommit(0).get(0).toString)
- val inputDf3 = generateUniqueUpdates(dataGen, "003", 80).cache()
+ val inputDf3 = generateUniqueUpdates(dataGen, "003", 80)
insertInto(tableName, tableBasePath, inputDf3, UPSERT, isMetadataEnabled,
4)
//another incremental query with commit2 and commit3
//HUDI-5266
- val hoodieIncViewDf2 = spark.sql(s"select * from
hudi_table_changes('$tableName', 'latest_state', '$commitInstantTime2',
'$commitInstantTime3')")
+ val commitCompletionTime2_1 = addMinimumTimeUnit(commitCompletionTime2)
+ val hoodieIncViewDf2 = spark.sql(s"select * from
hudi_table_changes('$tableName', 'latest_state', '$commitCompletionTime2_1',
'$commitCompletionTime3')")
assertEquals(uniqueKeyCnt2, hoodieIncViewDf2.count()) // 60 records must
be pulled
countsPerCommit =
hoodieIncViewDf2.groupBy("_hoodie_commit_time").count().collect()
assertEquals(1, countsPerCommit.length)
- assertEquals(commitInstantTime3, countsPerCommit(0).get(0).toString)
+ assertEquals(commitInstant3.requestedTime(),
countsPerCommit(0).get(0).toString)
- val timeTravelDf = spark.sql(s"select * from $tableName timestamp as of
'$commitInstantTime2'").cache()
- assertEquals(100, timeTravelDf.count())
- compareEntireInputDfWithHudiDf(snapshotDf2, timeTravelDf)
+ val commit2RequestTime = commitInstant2.requestedTime()
+ val timeTravelDf = spark.sql(s"select * from $tableName timestamp as of
'$commit2RequestTime'")
+ val timeTravelDfRows =
dropMetaColumns(canonicalizeDF(timeTravelDf)).collect()
+ assertEquals(100, timeTravelDfRows.length)
+ compareEntireInputRowsWithHudiRows(snapshotDf2Rows, timeTravelDfRows)
timeTravelDf.unpersist(true)
if (tableType.equals("MERGE_ON_READ")) {
- val readOptDf = doMORReadOptimizedQuery(isMetadataEnabled, tableBasePath)
- compareEntireInputDfWithHudiDf(inputDf0, readOptDf)
+ val readOptRows =
canonicalizeDF(doMORReadOptimizedQuery(isMetadataEnabled,
tableBasePath)).collect()
+ compareEntireInputRowsWithHudiRows(inputDf0Rows, readOptRows)
- val snapshotDf4 = doSnapshotRead(tableName, isMetadataEnabled)
+ val snapshotDf4Rows = canonicalizeDF(doSnapshotRead(tableName,
isMetadataEnabled)).collect()
// trigger compaction and try out Read optimized query.
- val inputDf4 = generateUniqueUpdates(dataGen, "004", 40).cache
+ val inputDf4 = generateUniqueUpdates(dataGen, "004", 40).cache()
//count is increased by 2 because inline compaction will add extra
commit to the timeline
doInlineCompact(tableName, tableBasePath, inputDf4, UPSERT,
isMetadataEnabled, "3", 6)
- val snapshotDf5 = doSnapshotRead(tableName, isMetadataEnabled)
- snapshotDf5.cache()
- compareUpdateDfWithHudiDf(inputDf4, snapshotDf5, snapshotDf4)
+ val snapshotDf5Rows = canonicalizeDF(doSnapshotRead(tableName,
isMetadataEnabled)).collect()
+ compareUpdateRowsWithHudiRows(canonicalizeDF(inputDf4).collect(),
snapshotDf5Rows, snapshotDf4Rows)
inputDf4.unpersist(true)
- snapshotDf5.unpersist(true)
// compaction is expected to have completed. both RO and RT are expected
to return same results.
compareROAndRT(isMetadataEnabled, tableName, tableBasePath)
}
+ }
- inputDf0.unpersist(true)
- updateDf.unpersist(true)
- inputDf2.unpersist(true)
- inputDf3.unpersist(true)
+ private def canonicalizeDF(inputDf0: DataFrame) = {
+ inputDf0.selectExpr(colsToCompare.split(","): _*)
+ }
+
+ private def addMinimumTimeUnit(commitCompletionTime2: String) : String = {
+ String.valueOf(commitCompletionTime2.toLong + 1)
}
def doSnapshotRead(tableName: String, isMetadataEnabledOnRead: Boolean):
sql.DataFrame = {
@@ -232,6 +246,7 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
val metadata = TimelineUtils.getCommitMetadata(latestCommit.get(),
timeline)
metadata.getOperationType.equals(operationType)
}
+
def insertInto(tableName: String, tableBasePath: String, inputDf:
sql.DataFrame, writeOp: WriteOperationType,
isMetadataEnabledOnWrite: Boolean, count: Int): Unit = {
inputDf.select("timestamp", "_row_key", "rider", "driver", "begin_lat",
"begin_lon", "end_lat", "end_lon", "fare",
@@ -267,6 +282,7 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
spark.conf.unset("hoodie.sql.insert.mode")
}
}
+
def createTable(tableName: String, keyGenClass: String, writeOptions:
String, tableBasePath: String): Unit = {
//If you have partitioned by (partition_path) with nonpartitioned keygen,
the partition_path will be empty in the table
val partitionedBy = if
(!keyGenClass.equals(classOf[NonpartitionedKeyGenerator].getName)) {
@@ -309,31 +325,67 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase
{
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(recs).asScala.toSeq,
2))
}
- def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row],
beforeDf: Dataset[Row]): Unit = {
- dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
- inputDf.createOrReplaceTempView("inputTbl")
- beforeDf.createOrReplaceTempView("beforeTbl")
- val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from hudiTbl")
- val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from inputTbl")
- val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from beforeTbl")
+ // Helper function to check if two rows are equal (comparing only the
columns we care about)
+ def rowsEqual(row1: Row, row2: Row): Boolean = {
+ // Get schemas from rows
+ val schema1 = row1.asInstanceOf[GenericRowWithSchema].schema
+ val schema2 = row2.asInstanceOf[GenericRowWithSchema].schema
+
+ // Verify schemas are identical
+ if (schema1 != schema2) {
+ throw new AssertionError(
+ s"""Schemas are different:
+ |Schema 1: ${schema1.treeString}
+ |Schema 2: ${schema2.treeString}""".stripMargin)
+ }
- assertEquals(hudiDfToCompare.intersect(inputDfToCompare).count,
inputDfToCompare.count)
-
assertEquals(hudiDfToCompare.except(inputDfToCompare).except(beforeDfToCompare).count,
0)
+ // Compare all fields using schema
+ schema1.fields.forall { field =>
+ val idx1 = row1.fieldIndex(field.name)
+ val idx2 = row2.fieldIndex(field.name)
+ row1.get(idx1) == row2.get(idx2)
+ }
}
- def compareEntireInputDfWithHudiDf(inputDf: Dataset[Row], hudiDf:
Dataset[Row]): Unit = {
- dropMetaColumns(hudiDf).createOrReplaceTempView("hudiTbl")
- inputDf.createOrReplaceTempView("inputTbl")
- val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from hudiTbl")
- val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from inputTbl")
+ // Verify beforeRows + deltaRows = afterRows
+ // Make sure rows in [[afterRows]] are presented in either [[deltaRows]] or
[[beforeRows]]
+ def compareUpdateRowsWithHudiRows(deltaRows: Array[Row], afterRows:
Array[Row], beforeRows: Array[Row]): Unit = {
+ // Helper function to get _row_key from a Row
+ def getRowKey(row: Row): String = row.getAs[String]("_row_key")
+
+ // Create hashmaps for O(1) lookups
+ val deltaRowsMap = deltaRows.map(row => getRowKey(row) -> row).toMap
+ val beforeRowsMap = beforeRows.map(row => getRowKey(row) -> row).toMap
+ // Ensure no duplicated record keys.
+ assertEquals(deltaRowsMap.size, deltaRows.length)
+ assertEquals(beforeRowsMap.size, beforeRows.length)
+
+ // Check that all input rows exist in afterRows
+ deltaRows.foreach { inputRow =>
+ val key = getRowKey(inputRow)
+ val hudiRow = afterRows.find(row => getRowKey(row) == key)
+ assertTrue(hudiRow.isDefined && rowsEqual(inputRow, hudiRow.get),
+ s"Input row with _row_key: $key not found in Hudi rows or content
mismatch")
+ }
- assertEquals(hudiDfToCompare.intersect(inputDfToCompare).count,
inputDfToCompare.count)
- assertEquals(hudiDfToCompare.except(inputDfToCompare).count, 0)
+ // Check that each hudi row either exists in input or before
+ afterRows.foreach { hudiRow =>
+ val key = getRowKey(hudiRow)
+ val foundInInput = deltaRowsMap.get(key).exists(row =>
rowsEqual(hudiRow, row))
+ val foundInBefore = !foundInInput && beforeRowsMap.get(key).exists(row
=> rowsEqual(hudiRow, row))
+
+ assertTrue(foundInInput || foundInBefore,
+ s"Hudi row with _row_key: $key not found in either input or before
rows")
+ }
+ }
+
+ def compareEntireInputRowsWithHudiRows(expectedRows: Array[Row], actualRows:
Array[Row]): Unit = {
+ compareUpdateRowsWithHudiRows(Array.empty, expectedRows, actualRows)
}
def doMORReadOptimizedQuery(isMetadataEnabledOnRead: Boolean, basePath:
String): sql.DataFrame = {
spark.read.format("org.apache.hudi")
- .option(DataSourceReadOptions.QUERY_TYPE.key,
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+ .option(QUERY_TYPE.key, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
.option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead)
.load(basePath)
}
@@ -428,37 +480,35 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase
{
val dataGen = new
HoodieTestDataGenerator(HoodieTestDataGenerator.TRIP_NESTED_EXAMPLE_SCHEMA,
0xDEED)
val inputDf0 = generateInserts(dataGen, "000", 100).cache
insertInto(tableName, tableBasePath, inputDf0, BULK_INSERT,
isMetadataEnabled, 1)
+ val inputDf0Rows = canonicalizeDF(inputDf0).collect()
+ inputDf0.unpersist(true)
assertTrue(hasNewCommits(fs, tableBasePath, "000"))
//Snapshot query
- val snapshotDf1 = doSnapshotRead(tableName, isMetadataEnabled)
- assertEquals(100, snapshotDf1.count())
- compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1)
+ val snapshotDf1Rows = canonicalizeDF(doSnapshotRead(tableName,
isMetadataEnabled)).collect()
+ assertEquals(100, snapshotDf1Rows.length)
+ compareEntireInputRowsWithHudiRows(inputDf0Rows, snapshotDf1Rows)
val inputDf1 = generateInserts(dataGen, "001", 50).cache
insertInto(tableName, tableBasePath, inputDf1, writeOp, isMetadataEnabled,
2)
+ val inputDf1rows = canonicalizeDF(inputDf0).collect()
+ inputDf1.unpersist(true)
- val snapshotDf2 = doSnapshotRead(tableName, isMetadataEnabled).cache
- assertEquals(150, snapshotDf2.count())
- compareEntireInputDfWithHudiDf(inputDf1.union(inputDf0), snapshotDf2)
- snapshotDf2.unpersist(true)
-
+ val snapshotDf2 = canonicalizeDF(doSnapshotRead(tableName,
isMetadataEnabled)).collect()
+ assertEquals(150, snapshotDf2.length)
+ compareEntireInputRowsWithHudiRows(inputDf1rows ++ inputDf0Rows,
snapshotDf2)
val inputDf2 = generateInserts(dataGen, "002", 60).cache()
insertInto(tableName, tableBasePath, inputDf2, writeOp, isMetadataEnabled,
3)
+ val inputDf2rows = canonicalizeDF(inputDf0).collect()
+ inputDf2.unpersist(true)
assertEquals(3, listCommitsSince(fs, tableBasePath, "000").size())
// Snapshot Query
- val snapshotDf3 = doSnapshotRead(tableName, isMetadataEnabled).cache
- assertEquals(210, snapshotDf3.count())
- compareEntireInputDfWithHudiDf(inputDf1.union(inputDf0).union(inputDf2),
snapshotDf3)
- snapshotDf3.unpersist(true)
-
- inputDf0.unpersist(true)
- inputDf1.unpersist(true)
- inputDf2.unpersist(true)
+ val snapshotDf3 = canonicalizeDF(doSnapshotRead(tableName,
isMetadataEnabled)).collect()
+ assertEquals(210, snapshotDf3.length)
+ compareEntireInputRowsWithHudiRows(inputDf1rows ++ inputDf0Rows ++
inputDf2rows, snapshotDf3)
}
-
}