leesf commented on a change in pull request #2379:
URL: https://github.com/apache/hudi/pull/2379#discussion_r548948188
##########
File path:
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
##########
@@ -177,4 +188,112 @@ class TestStructuredStreaming extends
HoodieClientTestBase {
if (!success) throw new IllegalStateException("Timed-out waiting for " +
numCommits + " commits to appear in " + tablePath)
numInstants
}
+
+ def getInlineClusteringOpts( isInlineClustering: String,
clusteringNumCommit: String, fileMaxRecordNum: Int):Map[String, String] = {
+ commonOpts + (HoodieClusteringConfig.INLINE_CLUSTERING_PROP ->
isInlineClustering,
+ HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP ->
clusteringNumCommit,
+ HoodieStorageConfig.PARQUET_FILE_MAX_BYTES ->
dataGen.getEstimatedFileSizeInBytes(fileMaxRecordNum).toString
+ )
+ }
+
+ @Test
+ def testStructuredStreamingWithInlineClustering(): Unit = {
+ val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
+
+ def checkClusteringResult(destPath: String):Unit = {
+ // check have schedule clustering and clustering file group to one
+ waitTillHasCompletedReplaceInstant(destPath, 120, 5)
+ metaClient.reloadActiveTimeline()
+ assertEquals(1, getLatestFileGroupsFileId.size)
+ }
+ structuredStreamingForTestClusteringRunner(sourcePath, destPath, true,
checkClusteringResult)
+ }
+
+ @Test
+ def testStructuredStreamingWithoutInlineClustering(): Unit = {
+ val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
+
+ def checkClusteringResult(destPath: String):Unit = {
+ val msg = "Should have replace commit completed"
+ assertThrows(classOf[IllegalStateException], new Executable {
+ override def execute(): Unit = {
+ waitTillHasCompletedReplaceInstant(destPath, 120, 5)
+ }
+ }
+ , "Should have replace commit completed")
+ println(msg)
+ }
+ structuredStreamingForTestClusteringRunner(sourcePath, destPath, false,
checkClusteringResult)
+ }
+
+ def structuredStreamingForTestClusteringRunner(sourcePath: String, destPath:
String,
+ isInlineClustering: Boolean,
checkClusteringResult: String => Unit): Unit = {
+ // First insert of data
+ val records1 = recordsToStrings(dataGen.generateInsertsForPartition("000",
100, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).toList
Review comment:
would replace the below three
`HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH` with one parameter? it
is a little weird to use HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH
in `getLatestFileGroupsFileId`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]