yihua commented on code in PR #6317:
URL: https://github.com/apache/hudi/pull/6317#discussion_r939242081
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala:
##########
@@ -193,61 +235,30 @@ class TestStructuredStreaming extends
HoodieClientTestBase {
numInstants
}
- def getClusteringOpts(isInlineClustering: String, isAsyncClustering: String,
isAsyncCompaction: String,
- clusteringNumCommit: String, fileMaxRecordNum:
Int):Map[String, String] = {
- commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING.key ->
isInlineClustering,
- HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key ->
clusteringNumCommit,
- DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,
- DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key -> isAsyncCompaction,
- HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key ->
clusteringNumCommit,
- HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key ->
dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString
- )
- }
-
- @Test
- def testStructuredStreamingWithInlineClustering(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testStructuredStreamingWithClustering(isAsyncClustering: Boolean): Unit
= {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
- def checkClusteringResult(destPath: String):Unit = {
+ def checkClusteringResult(destPath: String): Unit = {
// check have schedule clustering and clustering file group to one
waitTillHasCompletedReplaceInstant(destPath, 120, 1)
metaClient.reloadActiveTimeline()
assertEquals(1,
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
}
- structuredStreamingForTestClusteringRunner(sourcePath, destPath, true,
false, false,
- HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
checkClusteringResult)
- }
-
- @Test
- def testStructuredStreamingWithAsyncClustering(): Unit = {
- val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
- def checkClusteringResult(destPath: String):Unit = {
- // check have schedule clustering and clustering file group to one
- waitTillHasCompletedReplaceInstant(destPath, 120, 1)
- metaClient.reloadActiveTimeline()
- assertEquals(1,
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
- }
- structuredStreamingForTestClusteringRunner(sourcePath, destPath, false,
true, false,
- HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
checkClusteringResult)
+ structuredStreamingForTestClusteringRunner(sourcePath, destPath,
HoodieTableType.COPY_ON_WRITE,
+ !isAsyncClustering, isAsyncClustering,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
}
- @Test
- def testStructuredStreamingWithAsyncClusteringAndCompaction(): Unit = {
- val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
-
- def checkClusteringResult(destPath: String):Unit = {
- // check have schedule clustering and clustering file group to one
- waitTillHasCompletedReplaceInstant(destPath, 120, 1)
- metaClient.reloadActiveTimeline()
- assertEquals(1,
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
- }
- structuredStreamingForTestClusteringRunner(sourcePath, destPath, false,
true, true,
- HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
checkClusteringResult)
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testStructuredStreamingWithCompaction(isAsyncCompaction: Boolean): Unit
= {
Review Comment:
After this change, compaction is tested on MOR, and clustering is tested on
COW.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala:
##########
@@ -193,61 +235,30 @@ class TestStructuredStreaming extends
HoodieClientTestBase {
numInstants
}
- def getClusteringOpts(isInlineClustering: String, isAsyncClustering: String,
isAsyncCompaction: String,
- clusteringNumCommit: String, fileMaxRecordNum:
Int):Map[String, String] = {
- commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING.key ->
isInlineClustering,
- HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key ->
clusteringNumCommit,
- DataSourceWriteOptions.ASYNC_CLUSTERING_ENABLE.key -> isAsyncClustering,
- DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key -> isAsyncCompaction,
- HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMITS.key ->
clusteringNumCommit,
- HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key ->
dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString
- )
- }
-
- @Test
- def testStructuredStreamingWithInlineClustering(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testStructuredStreamingWithClustering(isAsyncClustering: Boolean): Unit
= {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
- def checkClusteringResult(destPath: String):Unit = {
+ def checkClusteringResult(destPath: String): Unit = {
// check have schedule clustering and clustering file group to one
waitTillHasCompletedReplaceInstant(destPath, 120, 1)
metaClient.reloadActiveTimeline()
assertEquals(1,
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
}
- structuredStreamingForTestClusteringRunner(sourcePath, destPath, true,
false, false,
- HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
checkClusteringResult)
- }
-
- @Test
- def testStructuredStreamingWithAsyncClustering(): Unit = {
- val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
- def checkClusteringResult(destPath: String):Unit = {
- // check have schedule clustering and clustering file group to one
- waitTillHasCompletedReplaceInstant(destPath, 120, 1)
- metaClient.reloadActiveTimeline()
- assertEquals(1,
getLatestFileGroupsFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).size)
- }
- structuredStreamingForTestClusteringRunner(sourcePath, destPath, false,
true, false,
- HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
checkClusteringResult)
+ structuredStreamingForTestClusteringRunner(sourcePath, destPath,
HoodieTableType.COPY_ON_WRITE,
+ !isAsyncClustering, isAsyncClustering,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, checkClusteringResult)
}
- @Test
- def testStructuredStreamingWithAsyncClusteringAndCompaction(): Unit = {
Review Comment:
This does not trigger compaction as COW table is written.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]