This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 859a7ee4fb61 [HUDI-9540] Fix data loss for spark incremental source 
(#13786)
859a7ee4fb61 is described below

commit 859a7ee4fb611e1786ba7fcb3b0b3acea8fe0ca7
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Sep 4 18:14:40 2025 +0800

    [HUDI-9540] Fix data loss for spark incremental source (#13786)
---
 .../hudi/MergeOnReadIncrementalRelationV1.scala    |  2 +
 .../hudi/MergeOnReadIncrementalRelationV2.scala    |  2 +
 .../apache/hudi/functional/TestMORDataSource.scala | 43 +++++++++++++++++++++-
 3 files changed, 46 insertions(+), 1 deletion(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
index 41bb465d8d52..45e91863b49c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
@@ -160,6 +160,8 @@ case class MergeOnReadIncrementalRelationV1(override val 
sqlContext: SQLContext,
     }
   }
 
+  override def shouldIncludeLogFiles(): Boolean = fullTableScan
+
   private def filterFileSlices(fileSlices: Seq[FileSlice], pathGlobPattern: 
String): Seq[FileSlice] = {
     val filteredFileSlices = if (!StringUtils.isNullOrEmpty(pathGlobPattern)) {
       val globMatcher = new GlobPattern("*" + pathGlobPattern)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
index 3cd941e6fbf2..c2795325d688 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
@@ -156,6 +156,8 @@ case class MergeOnReadIncrementalRelationV2(override val 
sqlContext: SQLContext,
     }
   }
 
+  override def shouldIncludeLogFiles(): Boolean = fullTableScan
+
   private def filterFileSlices(fileSlices: Seq[FileSlice], pathGlobPattern: 
String): Seq[FileSlice] = {
     val filteredFileSlices = if (!StringUtils.isNullOrEmpty(pathGlobPattern)) {
       val globMatcher = new GlobPattern("*" + pathGlobPattern)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 7ca8e9b53647..7a9263ce58de 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -30,7 +30,7 @@ import 
org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtil
 import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings
 import org.apache.hudi.common.util.Option
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
-import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, 
HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig, 
HoodieIndexConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.HoodieUpgradeDowngradeException
 import org.apache.hudi.index.HoodieIndex.IndexType
 import 
org.apache.hudi.metadata.HoodieTableMetadataUtil.{metadataPartitionExists, 
PARTITION_NAME_SECONDARY_INDEX_PREFIX}
@@ -811,6 +811,47 @@ class TestMORDataSource extends HoodieSparkClientTestBase 
with SparkDatasetMixin
     df.select("_hoodie_commit_seqno", "fare.amount", "fare.currency", 
"tip_history").show(1)
   }
 
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableVersion], names = Array("SIX", 
"EIGHT"))
+  def testIncrementalQueryMORWithCompactionAndClean(tableVersion: 
HoodieTableVersion): Unit = {
+    val (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO)
+    initMetaClient(HoodieTableType.MERGE_ON_READ)
+    // delta commit1
+    // delta commit2
+    // delta commit3
+    // commit <--compaction
+    // delta commit4
+    // delta commit5
+    // clean
+    for (i <- 1 to 5) {
+      val records = recordsToStrings(dataGen.generateInserts("%05d".format(i), 
10)).asScala.toList
+      val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+      inputDF.write.format("hudi")
+        .options(writeOpts)
+        .option(HoodieCompactionConfig.INLINE_COMPACT.key(), "true")
+        .option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(), 
tableVersion.versionCode().toString)
+        .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 
"3")
+        .option(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "2")
+        .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+        .option(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+        // Use InMemoryIndex to generate log only mor table.
+        .option(HoodieIndexConfig.INDEX_TYPE.key, IndexType.INMEMORY.toString)
+        .mode(if (i == 1) SaveMode.Overwrite else SaveMode.Append)
+        .save(basePath)
+    }
+
+    // specify begin time as 000 for the incremental query, and the query will 
fallback to full table scan.
+    val rowCount1 = spark.read.format("hudi")
+      .options(readOpts)
+      .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.key(), 
"true")
+      .option(DataSourceReadOptions.START_COMMIT.key(), "000")
+      .load(basePath)
+      .count()
+    assertEquals(50, rowCount1)
+  }
+
   @ParameterizedTest
   @CsvSource(Array(
     "true,false,AVRO", "true,true,AVRO", "false,true,AVRO", "false,false,AVRO",

Reply via email to