jonvex commented on code in PR #7632:
URL: https://github.com/apache/hudi/pull/7632#discussion_r1066475735


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -117,7 +119,8 @@ class HoodieStreamingSink(sqlContext: SQLContext,
     retry(retryCnt, retryIntervalMs)(
       Try(
         HoodieSparkSqlWriter.write(
-          sqlContext, mode, updatedOptions, data, hoodieTableConfig, 
writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering),
+          sqlContext, mode, updatedOptions, data, hoodieTableConfig, 
writeClient,
+          if (disableCompaction) None else Some(triggerAsyncCompactor), 
Some(triggerAsyncClustering),

Review Comment:
   I attempted a test with the following code, but it doesn't work. If you know 
how to trigger the streaming that would be helpful. The destPath was never even 
created or populated with what I have below. I do have the commands that I used 
for testing this pr in the jira ticket so if I can't get this to work at least 
we have some documentation of testing.
   ```
     @Test
     def testStructuredStreamingWithDisabledCompaction(): Unit = {
       val (sourcePath, destPath) = initStreamingSourceAndDestPath("source", 
"dest")
       // First chunk of data
       val records1 = recordsToStrings(dataGen.generateInserts("000", 
100)).toList
       val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 
2))
       val futures = new Array[Future[Unit]](25)
       val opts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> 
HoodieTableType.MERGE_ON_READ.name()) + 
(DataSourceWriteOptions.STREAMING_DISABLE_COMPACTION.key -> "true" )
       futures(0) = initStreamingWriteFuture(inputDF1.schema, sourcePath, 
destPath, opts)
       for (i <- 1 to 24) {
         val records = 
recordsToStrings(dataGen.generateUpdates(String.format("%03d", new Integer(i)), 
100)).toList
         val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 
2))
         futures(i) = initStreamingWriteFuture(inputDF.schema, sourcePath, 
destPath, opts)
       }
       waitTillAtleastNCommits(fs, destPath, 25, 1000, 5)
       val metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
         .setLoadActiveTimelineOnLoad(true).build()
       assertTrue(metaClient.getActiveTimeline.getCommitTimeline.empty())
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to