jonvex commented on code in PR #7632:
URL: https://github.com/apache/hudi/pull/7632#discussion_r1066475735
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala:
##########
@@ -117,7 +119,8 @@ class HoodieStreamingSink(sqlContext: SQLContext,
retry(retryCnt, retryIntervalMs)(
Try(
HoodieSparkSqlWriter.write(
- sqlContext, mode, updatedOptions, data, hoodieTableConfig,
writeClient, Some(triggerAsyncCompactor), Some(triggerAsyncClustering),
+ sqlContext, mode, updatedOptions, data, hoodieTableConfig,
writeClient,
+ if (disableCompaction) None else Some(triggerAsyncCompactor),
Some(triggerAsyncClustering),
Review Comment:
I attempted a test with the following code, but it doesn't work. If you know
how to trigger the streaming that would be helpful. The destPath was never even
created or populated with what I have below. I do have the commands that I used
for testing this pr in the jira ticket so if I can't get this to work at least
we have some documentation of testing.
```
@Test
def testStructuredStreamingWithDisabledCompaction(): Unit = {
val (sourcePath, destPath) = initStreamingSourceAndDestPath("source",
"dest")
// First chunk of data
val records1 = recordsToStrings(dataGen.generateInserts("000",
100)).toList
val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1,
2))
val futures = new Array[Future[Unit]](25)
val opts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key ->
HoodieTableType.MERGE_ON_READ.name()) +
(DataSourceWriteOptions.STREAMING_DISABLE_COMPACTION.key -> "true" )
futures(0) = initStreamingWriteFuture(inputDF1.schema, sourcePath,
destPath, opts)
for (i <- 1 to 24) {
val records =
recordsToStrings(dataGen.generateUpdates(String.format("%03d", new Integer(i)),
100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records,
2))
futures(i) = initStreamingWriteFuture(inputDF.schema, sourcePath,
destPath, opts)
}
waitTillAtleastNCommits(fs, destPath, 25, 1000, 5)
val metaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(destPath)
.setLoadActiveTimelineOnLoad(true).build()
assertTrue(metaClient.getActiveTimeline.getCommitTimeline.empty())
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]