This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7178fa6d497 [MINOR] Adding tests for streaming read mor with
compaction (#6695)
7178fa6d497 is described below
commit 7178fa6d4975b163632be2f42ea33b0a9136feff
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun Jun 23 18:02:43 2024 -0700
[MINOR] Adding tests for streaming read mor with compaction (#6695)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/functional/TestStreamingSource.scala | 46 +++++++++++++++++++++-
1 file changed, 45 insertions(+), 1 deletion(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
index 350fea3cf6c..b9ac64f2e10 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
@@ -21,14 +21,18 @@ import org.apache.hudi.DataSourceReadOptions.START_OFFSET
import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD,
RECORDKEY_FIELD}
import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE,
MERGE_ON_READ}
import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling
import
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.{BLOCK,
USE_TRANSITION_TIME}
+import org.apache.hudi.config.HoodieCompactionConfig
import org.apache.hudi.config.HoodieWriteConfig.{DELETE_PARALLELISM_VALUE,
INSERT_PARALLELISM_VALUE, TBL_NAME, UPSERT_PARALLELISM_VALUE}
import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.util.JavaConversions
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.{Row, SaveMode}
+import org.junit.jupiter.api.Assertions.assertTrue
class TestStreamingSource extends StreamTest {
@@ -223,12 +227,52 @@ class TestStreamingSource extends StreamTest {
}
}
- private def addData(inputPath: String, rows: Seq[(String, String, String,
String)]): Unit = {
+ test("test mor stream source with compaction") {
+ withTempDir { inputDir =>
+ val tablePath = s"${inputDir.getCanonicalPath}/test_mor_stream"
+ val metaClient = HoodieTableMetaClient.withPropertyBuilder()
+ .setTableType(MERGE_ON_READ)
+ .setTableName(getTableName(tablePath))
+
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+ .setRecordKeyFields("id")
+ .setPreCombineField("ts")
+
.initTable(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf()),
tablePath)
+
+ addData(tablePath, Seq(("1", "a1", "10", "000")))
+ val df = spark.readStream
+ .format("org.apache.hudi")
+ .load(tablePath)
+ .select("id", "name", "price", "ts")
+
+ addData(tablePath,
+ Seq(("1", "a2", "12", "000"),
+ ("2", "a3", "12", "000")))
+ addData(tablePath, Seq(("2", "a5", "12", "000"), ("1", "a6", "12",
"001")))
+ // trigger compaction
+ addData(tablePath, Seq(("3", "a6", "12", "000")), true)
+
+ testStream(df)(
+ AssertOnQuery {q => q.processAllAvailable(); true },
+ CheckAnswerRows(Seq(Row("1", "a6", "12", "001"),
+ Row("2", "a5", "12", "000"),
+ Row("3", "a6", "12", "000")), lastOnly = true, isSorted = false),
+ StopStream
+ )
+ assertTrue(metaClient.reloadActiveTimeline
+ .filter(JavaConversions.getPredicate(
+ e => e.isCompleted &&
HoodieTimeline.COMMIT_ACTION.equals(e.getAction)))
+ .countInstants() > 0)
+ }
+ }
+
+ private def addData(inputPath: String, rows: Seq[(String, String, String,
String)], enableInlineCompaction: Boolean = false) : Unit = {
rows.toDF(columns: _*)
.write
.format("org.apache.hudi")
.options(commonOptions)
.option(TBL_NAME.key, getTableName(inputPath))
+ .option(HoodieCompactionConfig.INLINE_COMPACT.key(),
enableInlineCompaction.toString)
+ .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"2")
.mode(SaveMode.Append)
.save(inputPath)
}