This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 182a2109372 [HUDI-5193] Improve test coverage for Spark DataSource
write flows (#7179)
182a2109372 is described below
commit 182a21093722514920318aa8e6bb0d9aa2e0edbd
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon May 29 05:17:01 2023 -0400
[HUDI-5193] Improve test coverage for Spark DataSource write flows (#7179)
---------
Co-authored-by: Raymond Xu <[email protected]>
---
.../hudi/functional/TestSparkDataSource.scala | 383 +++++++++++++++++++++
1 file changed, 383 insertions(+)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
new file mode 100644
index 00000000000..3f64e24dfc9
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig,
HoodieWriteConfig}
+import org.apache.hudi.keygen.NonpartitionedKeyGenerator
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieDataSourceHelpers}
+import org.apache.spark.sql._
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.CsvSource
+
+import scala.collection.JavaConversions._
+
+class TestSparkDataSource extends SparkClientFunctionalTestHarness {
+
+ val parallelism: Integer = 4
+
+ val commonOpts: Map[String, String] = Map(
+ "hoodie.insert.shuffle.parallelism" -> s"$parallelism",
+ "hoodie.upsert.shuffle.parallelism" -> s"$parallelism",
+ "hoodie.bulkinsert.shuffle.parallelism" -> s"$parallelism",
+ "hoodie.delete.shuffle.parallelism" -> s"$parallelism",
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
+ )
+
+ @ParameterizedTest
+ @CsvSource(value = Array(
+
"COPY_ON_WRITE|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+ "COPY_ON_WRITE|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+ "COPY_ON_WRITE|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"COPY_ON_WRITE|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"COPY_ON_WRITE|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+ "COPY_ON_WRITE|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"COPY_ON_WRITE|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"COPY_ON_WRITE|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"COPY_ON_WRITE|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"MERGE_ON_READ|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+ "MERGE_ON_READ|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+ "MERGE_ON_READ|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"MERGE_ON_READ|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"MERGE_ON_READ|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+ "MERGE_ON_READ|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"MERGE_ON_READ|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"MERGE_ON_READ|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"MERGE_ON_READ|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM"
+ ), delimiter = '|')
+ def testCoreFlow(tableType: String, isMetadataEnabledOnWrite: Boolean,
isMetadataEnabledOnRead: Boolean, keyGenClass: String, indexType: String): Unit
= {
+ val partitionField = if
(classOf[NonpartitionedKeyGenerator].getName.equals(keyGenClass)) "" else
"partition"
+ val options: Map[String, String] = commonOpts +
+ (HoodieMetadataConfig.ENABLE.key ->
String.valueOf(isMetadataEnabledOnWrite)) +
+ (DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass) +
+ (DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> partitionField) +
+ (DataSourceWriteOptions.TABLE_TYPE.key() -> tableType) +
+ (HoodieIndexConfig.INDEX_TYPE.key() -> indexType)
+ // order of cols in inputDf and hudiDf differs slightly. so had to choose
columns specifically to compare df directly.
+ val colsToSelect = "_row_key, begin_lat, begin_lon, city_to_state.LA,
current_date, current_ts, distance_in_meters, driver, end_lat, end_lon,
fare.amount, fare.currency, partition, partition_path, rider, timestamp,
weight, _hoodie_is_deleted"
+ val dataGen = new HoodieTestDataGenerator(0xDEED)
+ val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
+ // Insert Operation
+ val records0 = recordsToStrings(dataGen.generateInserts("000", 10)).toList
+ val inputDf0 = spark.read.json(spark.sparkContext.parallelize(records0,
parallelism)).cache
+ inputDf0.write.format("org.apache.hudi")
+ .options(options)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+
+ // Snapshot query
+ val snapshotDf1 = spark.read.format("org.apache.hudi")
+ .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabledOnRead)
+ .load(basePath).cache
+ assertEquals(10, snapshotDf1.count())
+ compareEntireInputDfWithHudiDf(inputDf0, snapshotDf1, colsToSelect)
+ val snapshotRows1 = snapshotDf1.collect.toList
+ snapshotDf1.unpersist(true)
+
+ val records1 = recordsToStrings(dataGen.generateUniqueUpdates("001",
5)).toList
+ val updateDf = spark.read.json(spark.sparkContext.parallelize(records1,
parallelism)).cache
+ updateDf.write.format("org.apache.hudi")
+ .options(options)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+
+ val snapshotDf2 = spark.read.format("org.apache.hudi")
+ .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabledOnRead)
+ .load(basePath).cache
+ assertEquals(10, snapshotDf2.count())
+ compareUpdateDfWithHudiDf(updateDf, snapshotDf2, snapshotRows1,
colsToSelect)
+ val snapshotRows2 = snapshotDf2.collect.toList
+ snapshotDf2.unpersist(true)
+
+ val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002",
6)).toList
+ val inputDf2 = spark.read.json(spark.sparkContext.parallelize(records2,
parallelism)).cache
+ val uniqueKeyCnt2 = inputDf2.select("_row_key").distinct().count()
+ inputDf2.write.format("org.apache.hudi")
+ .options(options)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ val commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+ assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
"000").size())
+
+ // Snapshot Query
+ val snapshotDf3 = spark.read.format("org.apache.hudi")
+ .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabledOnRead)
+ .load(basePath).cache
+ assertEquals(10, snapshotDf3.count(), "should still be 10, since we only
updated")
+ compareUpdateDfWithHudiDf(inputDf2, snapshotDf3, snapshotRows2,
colsToSelect)
+ snapshotDf3.unpersist(true)
+
+ // Read Incremental Query
+ // we have 2 commits, try pulling the first commit (which is not the
latest)
+ val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
"000").get(0)
+ val hoodieIncViewDf1 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "000")
+ .option(DataSourceReadOptions.END_INSTANTTIME.key, firstCommit)
+ .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabledOnRead)
+ .load(basePath)
+ assertEquals(10, hoodieIncViewDf1.count(), "should have pulled 10 initial
inserts")
+ var countsPerCommit =
hoodieIncViewDf1.groupBy("_hoodie_commit_time").count().collect()
+ assertEquals(1, countsPerCommit.length)
+ assertEquals(firstCommit, countsPerCommit(0).get(0))
+
+ val records3 = recordsToStrings(dataGen.generateUniqueUpdates("003",
8)).toList
+ val inputDf3 = spark.read.json(spark.sparkContext.parallelize(records3,
parallelism)).cache
+ inputDf3.write.format("org.apache.hudi")
+ .options(options)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ // another incremental query with commit2 and commit3
+ val hoodieIncViewDf2 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime2)
+ .option(DataSourceReadOptions.END_INSTANTTIME.key(), commitInstantTime3)
+ .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabledOnRead)
+ .load(basePath)
+
+ assertEquals(uniqueKeyCnt2, hoodieIncViewDf2.count(), "should have pulled
6 records")
+ countsPerCommit =
hoodieIncViewDf2.groupBy("_hoodie_commit_time").count().collect()
+ assertEquals(1, countsPerCommit.length)
+ assertEquals(commitInstantTime3, countsPerCommit(0).get(0))
+
+ // time travel query.
+ val timeTravelDf = spark.read.format("org.apache.hudi")
+ .option("as.of.instant", commitInstantTime2)
+ .load(basePath).cache
+ assertEquals(10, timeTravelDf.count())
+ compareEntireInputRowsWithHudiDf(snapshotRows2, timeTravelDf, colsToSelect)
+ timeTravelDf.unpersist(true)
+
+ if (tableType.equals("MERGE_ON_READ")) {
+ doMORReadOptimizedQuery(inputDf0, colsToSelect, isMetadataEnabledOnRead)
+
+ val snapshotRows4 = spark.read.format("org.apache.hudi")
+ .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabledOnRead)
+ .load(basePath).collect.toList
+ assertEquals(10, snapshotRows4.length)
+
+ // trigger compaction and try out Read optimized query.
+ val records4 = recordsToStrings(dataGen.generateUniqueUpdates("004",
4)).toList
+ val inputDf4 = spark.read.json(spark.sparkContext.parallelize(records4,
parallelism)).cache
+ inputDf4.write.format("org.apache.hudi")
+ .options(options)
+ .option(HoodieCompactionConfig.INLINE_COMPACT.key(), "true")
+ .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"3")
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ val snapshotDf5 = spark.read.format("org.apache.hudi")
+ .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabledOnRead)
+ .load(basePath).cache
+
+ compareUpdateDfWithHudiDf(inputDf4, snapshotDf5, snapshotRows4,
colsToSelect)
+ inputDf4.unpersist(true)
+ snapshotDf5.unpersist(true)
+ // compaction is expected to have completed. both RO and RT are expected
to return same results.
+ compareROAndRT(basePath, colsToSelect, isMetadataEnabledOnRead)
+ }
+
+ inputDf0.unpersist(true)
+ updateDf.unpersist(true)
+ inputDf2.unpersist(true)
+ inputDf3.unpersist(true)
+ }
+
+ @ParameterizedTest
+ @CsvSource(value = Array(
+
"COPY_ON_WRITE|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"COPY_ON_WRITE|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"COPY_ON_WRITE|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"COPY_ON_WRITE|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"COPY_ON_WRITE|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"COPY_ON_WRITE|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"COPY_ON_WRITE|insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"COPY_ON_WRITE|insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"COPY_ON_WRITE|insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"MERGE_ON_READ|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"MERGE_ON_READ|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"MERGE_ON_READ|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"MERGE_ON_READ|insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"MERGE_ON_READ|insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"MERGE_ON_READ|insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"MERGE_ON_READ|insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"MERGE_ON_READ|insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"MERGE_ON_READ|insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"COPY_ON_WRITE|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"COPY_ON_WRITE|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"COPY_ON_WRITE|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"COPY_ON_WRITE|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"COPY_ON_WRITE|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"COPY_ON_WRITE|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"COPY_ON_WRITE|bulk_insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"COPY_ON_WRITE|bulk_insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"COPY_ON_WRITE|bulk_insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"MERGE_ON_READ|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"MERGE_ON_READ|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"MERGE_ON_READ|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|BLOOM",
+
"MERGE_ON_READ|bulk_insert|false|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"MERGE_ON_READ|bulk_insert|true|false|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"MERGE_ON_READ|bulk_insert|true|true|org.apache.hudi.keygen.SimpleKeyGenerator|SIMPLE",
+
"MERGE_ON_READ|bulk_insert|false|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"MERGE_ON_READ|bulk_insert|true|false|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM",
+
"MERGE_ON_READ|bulk_insert|true|true|org.apache.hudi.keygen.NonpartitionedKeyGenerator|GLOBAL_BLOOM"
+ ), delimiter = '|')
+ def testImmutableUserFlow(tableType: String, operation: String,
isMetadataEnabledOnWrite: Boolean, isMetadataEnabledOnRead: Boolean,
keyGenClass: String, indexType: String): Unit = {
+ val partitionField = if
(classOf[NonpartitionedKeyGenerator].getName.equals(keyGenClass)) "" else
"partition"
+ val options: Map[String, String] = commonOpts +
+ (HoodieMetadataConfig.ENABLE.key ->
String.valueOf(isMetadataEnabledOnWrite)) +
+ (DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass) +
+ (DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> partitionField) +
+ (DataSourceWriteOptions.TABLE_TYPE.key() -> tableType) +
+ (HoodieIndexConfig.INDEX_TYPE.key() -> indexType)
+ // order of cols in inputDf and hudiDf differs slightly. so had to choose
columns specifically to compare df directly.
+ val colsToSelect = "_row_key, begin_lat, begin_lon, city_to_state.LA,
current_date, current_ts, distance_in_meters, driver, end_lat, end_lon,
fare.amount, fare.currency, partition, partition_path, rider, timestamp,
weight, _hoodie_is_deleted"
+ val dataGen = new HoodieTestDataGenerator(0xDEED)
+ val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
+ // Insert Operation
+ val records0 = recordsToStrings(dataGen.generateInserts("000", 10)).toList
+ val inputDf0 = spark.read.json(spark.sparkContext.parallelize(records0,
parallelism)).cache
+ inputDf0.write.format("org.apache.hudi")
+ .options(options)
+ .option(DataSourceWriteOptions.OPERATION.key, operation)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+
+ // Snapshot query
+ val snapshotDf1 = spark.read.format("org.apache.hudi")
+ .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabledOnRead)
+ .load(basePath)
+ assertEquals(10, snapshotDf1.count())
+
+ val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList
+ val inputDf1 = spark.read.json(spark.sparkContext.parallelize(records1,
parallelism)).cache
+ inputDf1.write.format("org.apache.hudi")
+ .options(options)
+ .option(DataSourceWriteOptions.OPERATION.key, operation)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ val snapshotDf2 = spark.read.format("hudi")
+ .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabledOnRead)
+ .load(basePath).cache
+ assertEquals(15, snapshotDf2.count())
+ compareEntireInputDfWithHudiDf(inputDf1.union(inputDf0), snapshotDf2,
colsToSelect)
+ snapshotDf2.unpersist(true)
+
+ val records2 = recordsToStrings(dataGen.generateInserts("002", 6)).toList
+ val inputDf2 = spark.read.json(spark.sparkContext.parallelize(records2,
parallelism)).cache
+ inputDf2.write.format("org.apache.hudi")
+ .options(options)
+ .option(DataSourceWriteOptions.OPERATION.key, operation)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
"000").size())
+
+ // Snapshot Query
+ val snapshotDf3 = spark.read.format("org.apache.hudi")
+ .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabledOnRead)
+ .load(basePath).cache
+ assertEquals(21, snapshotDf3.count())
+ compareEntireInputDfWithHudiDf(inputDf1.union(inputDf0).union(inputDf2),
snapshotDf3, colsToSelect)
+ snapshotDf3.unpersist(true)
+
+ inputDf0.unpersist(true)
+ inputDf1.unpersist(true)
+ inputDf2.unpersist(true)
+ }
+
+ def compareUpdateDfWithHudiDf(inputDf: Dataset[Row], hudiDf: Dataset[Row],
beforeRows: List[Row], colsToCompare: String): Unit = {
+ val hudiWithoutMetaDf =
hudiDf.drop(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD,
HoodieRecord.COMMIT_SEQNO_METADATA_FIELD,
HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD)
+ hudiWithoutMetaDf.registerTempTable("hudiTbl")
+ inputDf.registerTempTable("inputTbl")
+ val beforeDf = spark.createDataFrame(beforeRows, hudiDf.schema)
+ beforeDf.registerTempTable("beforeTbl")
+ val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from hudiTbl")
+ val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from inputTbl")
+ val beforeDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from beforeTbl")
+
+ assertEquals(inputDfToCompare.count,
hudiDfToCompare.intersect(inputDfToCompare).count)
+ assertEquals(0,
hudiDfToCompare.except(inputDfToCompare).except(beforeDfToCompare).count, 0)
+ }
+
+ def compareEntireInputRowsWithHudiDf(inputRows: List[Row], hudiDf:
Dataset[Row], colsToCompare: String): Unit = {
+ val inputDf = spark.createDataFrame(inputRows, hudiDf.schema)
+ compareEntireInputDfWithHudiDf(inputDf, hudiDf, colsToCompare)
+ }
+
+ def compareEntireInputDfWithHudiDf(inputDf: Dataset[Row], hudiDf:
Dataset[Row], colsToCompare: String): Unit = {
+ val hudiWithoutMetaDf =
hudiDf.drop(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD,
HoodieRecord.COMMIT_SEQNO_METADATA_FIELD,
HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD)
+ hudiWithoutMetaDf.registerTempTable("hudiTbl")
+ inputDf.registerTempTable("inputTbl")
+ val hudiDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from hudiTbl")
+ val inputDfToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from inputTbl")
+
+ assertEquals(inputDfToCompare.count,
hudiDfToCompare.intersect(inputDfToCompare).count)
+ assertEquals(0, hudiDfToCompare.except(inputDfToCompare).count)
+ }
+
+ def doMORReadOptimizedQuery(inputDf: Dataset[Row], colsToSelect: String,
isMetadataEnabledOnRead: Boolean): Unit = {
+ // read optimized query.
+ val readOptDf = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+ .option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead)
+ .load(basePath)
+ compareEntireInputDfWithHudiDf(inputDf, readOptDf, colsToSelect)
+ }
+
+ def compareROAndRT(basePath: String, colsToCompare: String,
isMetadataEnabledOnRead: Boolean): Unit = {
+ val roDf = spark.read.format("org.apache.hudi")
+ .option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead)
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).load(basePath)
+ val rtDf = spark.read.format("org.apache.hudi")
+ .option(HoodieMetadataConfig.ENABLE.key(), isMetadataEnabledOnRead)
+ .load(basePath)
+
+ val hudiWithoutMeta1Df = roDf
+ .drop(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD,
HoodieRecord.COMMIT_SEQNO_METADATA_FIELD,
HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD)
+ val hudiWithoutMeta2Df = rtDf
+ .drop(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD,
HoodieRecord.COMMIT_SEQNO_METADATA_FIELD,
HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD)
+ hudiWithoutMeta1Df.registerTempTable("hudiTbl1")
+ hudiWithoutMeta2Df.registerTempTable("hudiTbl2")
+
+ val hudiDf1ToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from hudiTbl1")
+ val hudiDf2ToCompare = spark.sqlContext.sql("select " + colsToCompare + "
from hudiTbl2")
+
+ assertEquals(hudiDf1ToCompare.count,
hudiDf1ToCompare.intersect(hudiDf2ToCompare).count)
+ assertEquals(0, hudiDf1ToCompare.except(hudiDf2ToCompare).count)
+ }
+}