This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 859a7ee4fb61 [HUDI-9540] Fix data loss for spark incremental source
(#13786)
859a7ee4fb61 is described below
commit 859a7ee4fb611e1786ba7fcb3b0b3acea8fe0ca7
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Sep 4 18:14:40 2025 +0800
[HUDI-9540] Fix data loss for spark incremental source (#13786)
---
.../hudi/MergeOnReadIncrementalRelationV1.scala | 2 +
.../hudi/MergeOnReadIncrementalRelationV2.scala | 2 +
.../apache/hudi/functional/TestMORDataSource.scala | 43 +++++++++++++++++++++-
3 files changed, 46 insertions(+), 1 deletion(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
index 41bb465d8d52..45e91863b49c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV1.scala
@@ -160,6 +160,8 @@ case class MergeOnReadIncrementalRelationV1(override val
sqlContext: SQLContext,
}
}
+ override def shouldIncludeLogFiles(): Boolean = fullTableScan
+
private def filterFileSlices(fileSlices: Seq[FileSlice], pathGlobPattern:
String): Seq[FileSlice] = {
val filteredFileSlices = if (!StringUtils.isNullOrEmpty(pathGlobPattern)) {
val globMatcher = new GlobPattern("*" + pathGlobPattern)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
index 3cd941e6fbf2..c2795325d688 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
@@ -156,6 +156,8 @@ case class MergeOnReadIncrementalRelationV2(override val
sqlContext: SQLContext,
}
}
+ override def shouldIncludeLogFiles(): Boolean = fullTableScan
+
private def filterFileSlices(fileSlices: Seq[FileSlice], pathGlobPattern:
String): Seq[FileSlice] = {
val filteredFileSlices = if (!StringUtils.isNullOrEmpty(pathGlobPattern)) {
val globMatcher = new GlobPattern("*" + pathGlobPattern)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 7ca8e9b53647..7a9263ce58de 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -30,7 +30,7 @@ import
org.apache.hudi.common.testutils.{HoodieTestDataGenerator, HoodieTestUtil
import
org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings
import org.apache.hudi.common.util.Option
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
-import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig,
HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig,
HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.exception.HoodieUpgradeDowngradeException
import org.apache.hudi.index.HoodieIndex.IndexType
import
org.apache.hudi.metadata.HoodieTableMetadataUtil.{metadataPartitionExists,
PARTITION_NAME_SECONDARY_INDEX_PREFIX}
@@ -811,6 +811,47 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
df.select("_hoodie_commit_seqno", "fare.amount", "fare.currency",
"tip_history").show(1)
}
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableVersion], names = Array("SIX",
"EIGHT"))
+ def testIncrementalQueryMORWithCompactionAndClean(tableVersion:
HoodieTableVersion): Unit = {
+ val (writeOpts, readOpts) = getWriterReaderOpts(HoodieRecordType.AVRO)
+ initMetaClient(HoodieTableType.MERGE_ON_READ)
+ // delta commit1
+ // delta commit2
+ // delta commit3
+ // commit <--compaction
+ // delta commit4
+ // delta commit5
+ // clean
+ for (i <- 1 to 5) {
+ val records = recordsToStrings(dataGen.generateInserts("%05d".format(i),
10)).asScala.toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+ inputDF.write.format("hudi")
+ .options(writeOpts)
+ .option(HoodieCompactionConfig.INLINE_COMPACT.key(), "true")
+ .option(HoodieWriteConfig.WRITE_TABLE_VERSION.key(),
tableVersion.versionCode().toString)
+ .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"3")
+ .option(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "2")
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE.key,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ // Use InMemoryIndex to generate log only mor table.
+ .option(HoodieIndexConfig.INDEX_TYPE.key, IndexType.INMEMORY.toString)
+ .mode(if (i == 1) SaveMode.Overwrite else SaveMode.Append)
+ .save(basePath)
+ }
+
+ // specify begin time as 000 for the incremental query, and the query will
fallback to full table scan.
+ val rowCount1 = spark.read.format("hudi")
+ .options(readOpts)
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN.key(),
"true")
+ .option(DataSourceReadOptions.START_COMMIT.key(), "000")
+ .load(basePath)
+ .count()
+ assertEquals(50, rowCount1)
+ }
+
@ParameterizedTest
@CsvSource(Array(
"true,false,AVRO", "true,true,AVRO", "false,true,AVRO", "false,false,AVRO",