This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7178fa6d497 [MINOR] Adding tests for streaming read mor with 
compaction (#6695)
7178fa6d497 is described below

commit 7178fa6d4975b163632be2f42ea33b0a9136feff
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sun Jun 23 18:02:43 2024 -0700

    [MINOR] Adding tests for streaming read mor with compaction (#6695)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../hudi/functional/TestStreamingSource.scala      | 46 +++++++++++++++++++++-
 1 file changed, 45 insertions(+), 1 deletion(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
index 350fea3cf6c..b9ac64f2e10 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStreamingSource.scala
@@ -21,14 +21,18 @@ import org.apache.hudi.DataSourceReadOptions.START_OFFSET
 import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, 
RECORDKEY_FIELD}
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
 import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieTimeline
 import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling
 import 
org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.{BLOCK,
 USE_TRANSITION_TIME}
+import org.apache.hudi.config.HoodieCompactionConfig
 import org.apache.hudi.config.HoodieWriteConfig.{DELETE_PARALLELISM_VALUE, 
INSERT_PARALLELISM_VALUE, TBL_NAME, UPSERT_PARALLELISM_VALUE}
 import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.util.JavaConversions
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
 
 import org.apache.spark.sql.streaming.StreamTest
 import org.apache.spark.sql.{Row, SaveMode}
+import org.junit.jupiter.api.Assertions.assertTrue
 
 class TestStreamingSource extends StreamTest {
 
@@ -223,12 +227,52 @@ class TestStreamingSource extends StreamTest {
     }
   }
 
-  private def addData(inputPath: String, rows: Seq[(String, String, String, 
String)]): Unit = {
+  test("test mor stream source with compaction") {
+    withTempDir { inputDir =>
+      val tablePath = s"${inputDir.getCanonicalPath}/test_mor_stream"
+      val metaClient = HoodieTableMetaClient.withPropertyBuilder()
+        .setTableType(MERGE_ON_READ)
+        .setTableName(getTableName(tablePath))
+        
.setPayloadClassName(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue)
+        .setRecordKeyFields("id")
+        .setPreCombineField("ts")
+        
.initTable(HadoopFSUtils.getStorageConf(spark.sessionState.newHadoopConf()), 
tablePath)
+
+      addData(tablePath, Seq(("1", "a1", "10", "000")))
+      val df = spark.readStream
+        .format("org.apache.hudi")
+        .load(tablePath)
+        .select("id", "name", "price", "ts")
+
+      addData(tablePath,
+        Seq(("1", "a2", "12", "000"),
+          ("2", "a3", "12", "000")))
+      addData(tablePath, Seq(("2", "a5", "12", "000"), ("1", "a6", "12", 
"001")))
+      // trigger compaction
+      addData(tablePath, Seq(("3", "a6", "12", "000")), true)
+
+      testStream(df)(
+        AssertOnQuery {q => q.processAllAvailable(); true },
+        CheckAnswerRows(Seq(Row("1", "a6", "12", "001"),
+          Row("2", "a5", "12", "000"),
+          Row("3", "a6", "12", "000")), lastOnly = true, isSorted = false),
+        StopStream
+      )
+      assertTrue(metaClient.reloadActiveTimeline
+        .filter(JavaConversions.getPredicate(
+          e => e.isCompleted && 
HoodieTimeline.COMMIT_ACTION.equals(e.getAction)))
+        .countInstants() > 0)
+    }
+  }
+
+  private def addData(inputPath: String, rows: Seq[(String, String, String, 
String)], enableInlineCompaction: Boolean = false) : Unit = {
     rows.toDF(columns: _*)
       .write
       .format("org.apache.hudi")
       .options(commonOptions)
       .option(TBL_NAME.key, getTableName(inputPath))
+      .option(HoodieCompactionConfig.INLINE_COMPACT.key(), 
enableInlineCompaction.toString)
+      .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 
"2")
       .mode(SaveMode.Append)
       .save(inputPath)
   }

Reply via email to