This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 99deee6e6dc5 [HUDI-8890] fix incremental read with invalid/missing end
instant (#12679)
99deee6e6dc5 is described below
commit 99deee6e6dc5c940e143deed2d78f7836e8df490
Author: TheR1sing3un <[email protected]>
AuthorDate: Thu Jul 24 18:42:13 2025 +0800
[HUDI-8890] fix incremental read with invalid/missing end instant (#12679)
---
.../org/apache/hudi/IncrementalRelationV2.scala | 10 ++++--
.../hudi/functional/TestCOWDataSourceStorage.scala | 36 +++++++++++++++-------
2 files changed, 33 insertions(+), 13 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala
index 0cb34ef26813..c361ed667a05 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala
@@ -205,7 +205,13 @@ class IncrementalRelationV2(val sqlContext: SQLContext,
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
val startInstantArchived = !queryContext.getArchivedInstants.isEmpty
- val endInstantTime = queryContext.getEndInstant.get()
+ if (queryContext.isEmpty) {
+ // no commits to read
+ // scalastyle:off return
+ return sqlContext.sparkContext.emptyRDD[Row]
+ // scalastyle:on return
+ }
+ val endInstantTime = queryContext.getLastInstant
val scanDf = if (fallbackToFullTableScan && startInstantArchived) {
log.info(s"Falling back to full table scan as startInstantArchived:
$startInstantArchived")
@@ -263,7 +269,7 @@ class IncrementalRelationV2(val sqlContext: SQLContext,
.load(filteredRegularFullPaths.toList: _*)
.filter(col(HoodieRecord.COMMIT_TIME_METADATA_FIELD).isin(commitTimesToReturn:
_*)))
} catch {
- case e : AnalysisException =>
+ case e: AnalysisException =>
if (e.getMessage.contains("Path does not exist")) {
throw new HoodieIncrementalPathNotFoundException(e)
} else {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
index f5d42fc87ac6..cd77a5f6f674 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
@@ -21,7 +21,7 @@ package org.apache.hudi.functional
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieDataSourceHelpers}
import org.apache.hudi.client.validator.{SqlQueryEqualityPreCommitValidator,
SqlQueryInequalityPreCommitValidator}
-import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieReaderConfig}
import
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TYPE_FIELD}
import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
@@ -71,20 +71,29 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
@ParameterizedTest
@CsvSource(value = Array(
- "true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
- "true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency",
- "true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key",
- "false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
- "false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency",
- "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key"
+ "true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|true",
+
"true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|true",
+ "true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|true",
+ "false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|true",
+
"false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|true",
+ "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|true",
+ "true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|false",
+
"true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|false",
+ "true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|false",
+ "false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|false",
+
"false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|false",
+ "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|false"
), delimiter = '|')
- def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String,
recordKeys: String): Unit = {
+ def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String,
recordKeys: String, fileGroupReaderEnabled: Boolean): Unit = {
var options: Map[String, String] = commonOpts ++ Map(
HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled),
DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> keyGenClass,
DataSourceWriteOptions.RECORDKEY_FIELD.key -> recordKeys,
HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true")
+ val readOptions = Map(HoodieMetadataConfig.ENABLE.key() ->
String.valueOf(isMetadataEnabled),
+ HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() ->
String.valueOf(fileGroupReaderEnabled))
+
val isTimestampBasedKeyGen: Boolean =
classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)
if (isTimestampBasedKeyGen) {
options += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key"
@@ -108,7 +117,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
// Snapshot query
val snapshotDF1 = spark.read.format("org.apache.hudi")
- .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
+ .options(readOptions)
.load(basePath)
assertEquals(100, snapshotDF1.count())
@@ -135,7 +144,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
.save(basePath)
val snapshotDF2 = spark.read.format("hudi")
- .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
+ .options(readOptions)
.load(basePath)
assertEquals(100, snapshotDF2.count())
assertEquals(updatedVerificationVal, snapshotDF2.filter(col("_row_key")
=== verificationRowKey).select(verificationCol).first.getString(0))
@@ -174,7 +183,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
// Snapshot Query
val snapshotDF3 = spark.read.format("org.apache.hudi")
- .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
+ .options(readOptions)
.load(basePath)
assertEquals(100, snapshotDF3.count()) // still 100, since we only updated
@@ -184,6 +193,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
// Setting HoodieROTablePathFilter here to test whether pathFilter can
filter out correctly for IncrementalRelation
spark.sparkContext.hadoopConfiguration.set("mapreduce.input.pathFilter.class",
"org.apache.hudi.hadoop.HoodieROTablePathFilter")
val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
+ .options(readOptions)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.START_COMMIT.key, completionTime1)
.option(DataSourceReadOptions.END_COMMIT.key, completionTime1)
@@ -196,6 +206,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
// Test incremental query has no instant in range
val emptyIncDF = spark.read.format("org.apache.hudi")
+ .options(readOptions)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.START_COMMIT.key, "000")
.option(DataSourceReadOptions.END_COMMIT.key, "002")
@@ -212,6 +223,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
// pull the latest commit
val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
+ .options(readOptions)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.START_COMMIT.key, completionTime3)
.load(basePath)
@@ -223,6 +235,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
// pull the latest commit within certain partitions
val hoodieIncViewDF3 = spark.read.format("org.apache.hudi")
+ .options(readOptions)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.START_COMMIT.key, completionTime3)
.option(DataSourceReadOptions.INCR_PATH_GLOB.key, if
(isTimestampBasedKeyGen) "/2016*/*" else "/2016/*/*/*")
@@ -231,6 +244,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
.filter(col("_hoodie_partition_path").startsWith("2016")).count(),
hoodieIncViewDF3.count())
val timeTravelDF = spark.read.format("org.apache.hudi")
+ .options(readOptions)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.START_COMMIT.key, completionTime1)
.option(DataSourceReadOptions.END_COMMIT.key, completionTime1)