This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 59527c3590 [HUDI-4540] Cover different table types in functional tests
of Spark structured streaming (#6317)
59527c3590 is described below
commit 59527c35903ee76e89ecc3c2bdc5806ac40691d1
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Aug 5 17:11:35 2022 -0700
[HUDI-4540] Cover different table types in functional tests of Spark
structured streaming (#6317)
---
.../hudi/functional/TestStructuredStreaming.scala | 164 +++++++++++----------
1 file changed, 90 insertions(+), 74 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index 85c64f8265..8981d1447b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -18,11 +18,13 @@
package org.apache.hudi.functional
import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator,
HoodieTestTable}
-import org.apache.hudi.config.{HoodieClusteringConfig, HoodieStorageConfig,
HoodieWriteConfig}
+import org.apache.hudi.common.util.CollectionUtils
+import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig,
HoodieStorageConfig, HoodieWriteConfig}
import org.apache.hudi.exception.TableNotFoundException
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieDataSourceHelpers}
@@ -31,7 +33,9 @@ import org.apache.spark.sql._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types.StructType
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
@@ -99,8 +103,30 @@ class TestStructuredStreaming extends HoodieClientTestBase {
(sourcePath, destPath)
}
- @Test
- def testStructuredStreaming(): Unit = {
+ def getOptsWithTableType(tableType: HoodieTableType): Map[String, String] = {
+ commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name())
+ }
+
+ def getClusteringOpts(tableType: HoodieTableType, isInlineClustering: String,
+ isAsyncClustering: String, clusteringNumCommit: String,
+ fileMaxRecordNum: Int): Map[String, String] = {
+ getOptsWithTableType(tableType) + (
+ HoodieClusteringConfig.INLINE_CLUSTERING.key -> isInlineClustering,
+ HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key ->
clusteringNumCommit,
+ DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,
+ HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key ->
clusteringNumCommit,
+ HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key ->
dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString
+ )
+ }
+
+ def getCompactionOpts(tableType: HoodieTableType, isAsyncCompaction:
Boolean): Map[String, String] = {
+ getOptsWithTableType(tableType) + (
+ DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key ->
isAsyncCompaction.toString,
+ HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key -> "1"
+ )
+ }
+
+ def structuredStreamingTestRunner(tableType: HoodieTableType,
addCompactionConfigs: Boolean, isAsyncCompaction: Boolean): Unit = {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
// First chunk of data
val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
@@ -111,7 +137,12 @@ class TestStructuredStreaming extends HoodieClientTestBase
{
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
- val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath,
commonOpts)
+ val hudiOptions = if (addCompactionConfigs) {
+ getCompactionOpts(tableType, isAsyncCompaction)
+ } else {
+ getOptsWithTableType(tableType)
+ }
+ val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath,
hudiOptions)
val f2 = Future {
inputDF1.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
@@ -125,16 +156,23 @@ class TestStructuredStreaming extends
HoodieClientTestBase {
assert(hoodieROViewDF1.count() == 100)
inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
- // wait for spark streaming to process second microbatch
- waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
- val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs,
destPath)
- assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath,
"000").size())
+ // When the compaction configs are added, one more commit of the
compaction is expected
+ val numExpectedCommits = if (addCompactionConfigs) currNumCommits + 2
else currNumCommits + 1
+ waitTillAtleastNCommits(fs, destPath, numExpectedCommits, 120, 5)
+
+ val commitInstantTime2 = if (tableType == HoodieTableType.MERGE_ON_READ)
{
+ // For the records that are processed by the compaction in MOR table
+ // the "_hoodie_commit_time" still reflects the latest delta commit
+ latestInstant(fs, destPath, HoodieTimeline.DELTA_COMMIT_ACTION)
+ } else {
+ HoodieDataSourceHelpers.latestCommit(fs, destPath)
+ }
+ assertEquals(numExpectedCommits,
HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
// Read RO View
val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
.load(destPath + "/*/*/*/*")
assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only
updated
-
// Read Incremental View
// we have 2 commits, try pulling the first commit (which is not the
latest)
val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, destPath,
"000").get(0)
@@ -163,6 +201,12 @@ class TestStructuredStreaming extends HoodieClientTestBase
{
Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
}
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testStructuredStreaming(tableType: HoodieTableType): Unit = {
+ structuredStreamingTestRunner(tableType, false, false)
+ }
+
@throws[InterruptedException]
private def waitTillAtleastNCommits(fs: FileSystem, tablePath: String,
numCommits: Int, timeoutSecs: Int,
sleepSecsAfterEachRun: Int) = {
@@ -178,8 +222,6 @@ class TestStructuredStreaming extends HoodieClientTestBase {
numInstants = timeline.countInstants
success = true
}
- val metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath)
- .setLoadActiveTimelineOnLoad(true).build()
} catch {
case te: TableNotFoundException =>
log.info("Got table not found exception. Retrying")
@@ -193,61 +235,30 @@ class TestStructuredStreaming extends
HoodieClientTestBase {
numInstants
}
- def getClusteringOpts(isInlineClustering: String, isAsyncClustering: String,
isAsyncCompaction: String,
- clusteringNumCommit: String, fileMaxRecordNum:
Int):Map[String, String] = {
- commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING.key ->
isInlineClustering,
- HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key ->
clusteringNumCommit,
- DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,
- DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key -> isAsyncCompaction,
- HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key ->
clusteringNumCommit,
- HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key ->
dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString
- )
- }
-
- @Test
- def testStructuredStreamingWithInlineClustering(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testStructuredStreamingWithClustering(isAsyncClustering: Boolean): Unit
= {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
- def checkClusteringResult(destPath: String):Unit = {
+ def checkClusteringResult(destPath: String): Unit = {
// check have schedule clustering and clustering file group to one
waitTillHasCompletedReplaceInstant(destPath, 120, 1)
metaClient.reloadActiveTimeline()
assertEquals(1,
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
}
- structuredStreamingForTestClusteringRunner(sourcePath, destPath, true,
false, false,
- HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
checkClusteringResult)
- }
-
- @Test
- def testStructuredStreamingWithAsyncClustering(): Unit = {
- val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
- def checkClusteringResult(destPath: String):Unit = {
- // check have schedule clustering and clustering file group to one
- waitTillHasCompletedReplaceInstant(destPath, 120, 1)
- metaClient.reloadActiveTimeline()
- assertEquals(1,
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
- }
- structuredStreamingForTestClusteringRunner(sourcePath, destPath, false,
true, false,
- HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
checkClusteringResult)
+ structuredStreamingForTestClusteringRunner(sourcePath, destPath,
HoodieTableType.COPY_ON_WRITE,
+ !isAsyncClustering, isAsyncClustering,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
}
- @Test
- def testStructuredStreamingWithAsyncClusteringAndCompaction(): Unit = {
- val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
-
- def checkClusteringResult(destPath: String):Unit = {
- // check have schedule clustering and clustering file group to one
- waitTillHasCompletedReplaceInstant(destPath, 120, 1)
- metaClient.reloadActiveTimeline()
- assertEquals(1,
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
- }
- structuredStreamingForTestClusteringRunner(sourcePath, destPath, false,
true, true,
- HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
checkClusteringResult)
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testStructuredStreamingWithCompaction(isAsyncCompaction: Boolean): Unit
= {
+ structuredStreamingTestRunner(HoodieTableType.MERGE_ON_READ, true,
isAsyncCompaction)
}
- def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath:
String, isInlineClustering: Boolean,
- isAsyncClustering: Boolean,
isAsyncCompaction: Boolean,
+ def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath:
String, tableType: HoodieTableType,
+ isInlineClustering: Boolean,
isAsyncClustering: Boolean,
partitionOfRecords: String,
checkClusteringResult: String => Unit): Unit = {
// First insert of data
val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000",
100, partitionOfRecords)).toList
@@ -257,8 +268,8 @@ class TestStructuredStreaming extends HoodieClientTestBase {
val records2 = recordsToStrings(dataGen.generateInsertsForPartition("001",
100, partitionOfRecords)).toList
val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
- val hudiOptions = getClusteringOpts(isInlineClustering.toString,
isAsyncClustering.toString,
- isAsyncCompaction.toString, "2", 100)
+ val hudiOptions = getClusteringOpts(
+ tableType, isInlineClustering.toString, isAsyncClustering.toString, "2",
100)
val f1 = initStreamingWriteFuture(inputDF1.schema, sourcePath, destPath,
hudiOptions)
val f2 = Future {
@@ -270,28 +281,24 @@ class TestStructuredStreaming extends
HoodieClientTestBase {
inputDF2.coalesce(1).write.mode(SaveMode.Append).json(sourcePath)
// wait for spark streaming to process second microbatch
currNumCommits = waitTillAtleastNCommits(fs, destPath, currNumCommits +
1, 120, 5)
- // for inline clustering, clustering may be complete along with 2nd
commit
- if (HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs,
destPath).getCompletedReplaceTimeline.countInstants() > 0) {
- assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath,
"000").size())
- // check have at least one file group
- this.metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
- .setLoadActiveTimelineOnLoad(true).build()
- assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0)
- } else {
- assertEquals(currNumCommits,
HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
- // check have more than one file group
- this.metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
- .setLoadActiveTimelineOnLoad(true).build()
- assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 1)
- }
- // check clustering result
+ // Wait for the clustering to finish
+ this.metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
+ .setLoadActiveTimelineOnLoad(true).build()
checkClusteringResult(destPath)
- // check data correct after clustering
+ assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, destPath,
"000").size())
+ // Check have at least one file group
+ assertTrue(getLatestFileGroupsFileId(partitionOfRecords).size > 0)
+
+ // Validate data after clustering
val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
.load(destPath + "/*/*/*/*")
assertEquals(200, hoodieROViewDF2.count())
+ val countsPerCommit =
hoodieROViewDF2.groupBy("_hoodie_commit_time").count().collect()
+ assertEquals(2, countsPerCommit.length)
+ val commitInstantTime2 = latestInstant(fs, destPath,
HoodieTimeline.COMMIT_ACTION)
+ assertEquals(commitInstantTime2, countsPerCommit.maxBy(row =>
row.getAs[String](0)).get(0))
}
Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
}
@@ -327,4 +334,13 @@ class TestStructuredStreaming extends HoodieClientTestBase
{
if (!success) throw new IllegalStateException("Timed-out waiting for
completing replace instant appear in " + tablePath)
}
+ private def latestInstant(fs: FileSystem, basePath: String, instantAction:
String): String = {
+ val metaClient = HoodieTableMetaClient.builder
+
.setConf(fs.getConf).setBasePath(basePath).setLoadActiveTimelineOnLoad(true).build
+ metaClient.getActiveTimeline
+ .getTimelineOfActions(CollectionUtils.createSet(instantAction))
+ .filterCompletedInstants
+ .lastInstant
+ .get.getTimestamp
+ }
}