This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 99deee6e6dc5 [HUDI-8890] fix incremental read with invalid/missing end 
instant (#12679)
99deee6e6dc5 is described below

commit 99deee6e6dc5c940e143deed2d78f7836e8df490
Author: TheR1sing3un <[email protected]>
AuthorDate: Thu Jul 24 18:42:13 2025 +0800

    [HUDI-8890] fix incremental read with invalid/missing end instant (#12679)
---
 .../org/apache/hudi/IncrementalRelationV2.scala    | 10 ++++--
 .../hudi/functional/TestCOWDataSourceStorage.scala | 36 +++++++++++++++-------
 2 files changed, 33 insertions(+), 13 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala
index 0cb34ef26813..c361ed667a05 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV2.scala
@@ -205,7 +205,13 @@ class IncrementalRelationV2(val sqlContext: SQLContext,
       val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
 
       val startInstantArchived = !queryContext.getArchivedInstants.isEmpty
-      val endInstantTime = queryContext.getEndInstant.get()
+      if (queryContext.isEmpty) {
+        // no commits to read
+        // scalastyle:off return
+        return sqlContext.sparkContext.emptyRDD[Row]
+        // scalastyle:on return
+      }
+      val endInstantTime = queryContext.getLastInstant
 
       val scanDf = if (fallbackToFullTableScan && startInstantArchived) {
         log.info(s"Falling back to full table scan as startInstantArchived: 
$startInstantArchived")
@@ -263,7 +269,7 @@ class IncrementalRelationV2(val sqlContext: SQLContext,
                   .load(filteredRegularFullPaths.toList: _*)
                   
.filter(col(HoodieRecord.COMMIT_TIME_METADATA_FIELD).isin(commitTimesToReturn: 
_*)))
               } catch {
-                case e : AnalysisException =>
+                case e: AnalysisException =>
                   if (e.getMessage.contains("Path does not exist")) {
                     throw new HoodieIncrementalPathNotFoundException(e)
                   } else {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
index f5d42fc87ac6..cd77a5f6f674 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
@@ -21,7 +21,7 @@ package org.apache.hudi.functional
 
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers}
 import org.apache.hudi.client.validator.{SqlQueryEqualityPreCommitValidator, 
SqlQueryInequalityPreCommitValidator}
-import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieReaderConfig}
 import 
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
 TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TYPE_FIELD}
 import org.apache.hudi.common.model.WriteOperationType
 import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
@@ -71,20 +71,29 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
 
   @ParameterizedTest
   @CsvSource(value = Array(
-    "true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
-    "true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency",
-    "true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key",
-    "false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key",
-    "false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency",
-    "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key"
+    "true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|true",
+    
"true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|true",
+    "true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|true",
+    "false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|true",
+    
"false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|true",
+    "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|true",
+    "true|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|false",
+    
"true|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|false",
+    "true|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|false",
+    "false|org.apache.hudi.keygen.SimpleKeyGenerator|_row_key|false",
+    
"false|org.apache.hudi.keygen.ComplexKeyGenerator|_row_key,fare.currency|false",
+    "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key|false"
   ), delimiter = '|')
-  def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String, 
recordKeys: String): Unit = {
+  def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String, 
recordKeys: String, fileGroupReaderEnabled: Boolean): Unit = {
     var options: Map[String, String] = commonOpts ++ Map(
       HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled),
       DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> keyGenClass,
       DataSourceWriteOptions.RECORDKEY_FIELD.key -> recordKeys,
       HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true")
 
+    val readOptions = Map(HoodieMetadataConfig.ENABLE.key() -> 
String.valueOf(isMetadataEnabled),
+      HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() -> 
String.valueOf(fileGroupReaderEnabled))
+
     val isTimestampBasedKeyGen: Boolean = 
classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass)
     if (isTimestampBasedKeyGen) {
       options += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key"
@@ -108,7 +117,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
 
     // Snapshot query
     val snapshotDF1 = spark.read.format("org.apache.hudi")
-      .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
+      .options(readOptions)
       .load(basePath)
     assertEquals(100, snapshotDF1.count())
 
@@ -135,7 +144,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
       .save(basePath)
 
     val snapshotDF2 = spark.read.format("hudi")
-      .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
+      .options(readOptions)
       .load(basePath)
     assertEquals(100, snapshotDF2.count())
     assertEquals(updatedVerificationVal, snapshotDF2.filter(col("_row_key") 
=== verificationRowKey).select(verificationCol).first.getString(0))
@@ -174,7 +183,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
 
     // Snapshot Query
     val snapshotDF3 = spark.read.format("org.apache.hudi")
-      .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
+      .options(readOptions)
       .load(basePath)
     assertEquals(100, snapshotDF3.count()) // still 100, since we only updated
 
@@ -184,6 +193,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
     // Setting HoodieROTablePathFilter here to test whether pathFilter can 
filter out correctly for IncrementalRelation
     
spark.sparkContext.hadoopConfiguration.set("mapreduce.input.pathFilter.class", 
"org.apache.hudi.hadoop.HoodieROTablePathFilter")
     val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
+      .options(readOptions)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
       .option(DataSourceReadOptions.START_COMMIT.key, completionTime1)
       .option(DataSourceReadOptions.END_COMMIT.key, completionTime1)
@@ -196,6 +206,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
 
     // Test incremental query has no instant in range
     val emptyIncDF = spark.read.format("org.apache.hudi")
+      .options(readOptions)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
       .option(DataSourceReadOptions.START_COMMIT.key, "000")
       .option(DataSourceReadOptions.END_COMMIT.key, "002")
@@ -212,6 +223,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
 
     // pull the latest commit
     val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
+      .options(readOptions)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
       .option(DataSourceReadOptions.START_COMMIT.key, completionTime3)
       .load(basePath)
@@ -223,6 +235,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
 
     // pull the latest commit within certain partitions
     val hoodieIncViewDF3 = spark.read.format("org.apache.hudi")
+      .options(readOptions)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
       .option(DataSourceReadOptions.START_COMMIT.key, completionTime3)
       .option(DataSourceReadOptions.INCR_PATH_GLOB.key, if 
(isTimestampBasedKeyGen) "/2016*/*" else "/2016/*/*/*")
@@ -231,6 +244,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
       .filter(col("_hoodie_partition_path").startsWith("2016")).count(), 
hoodieIncViewDF3.count())
 
     val timeTravelDF = spark.read.format("org.apache.hudi")
+      .options(readOptions)
       .option(DataSourceReadOptions.QUERY_TYPE.key, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
       .option(DataSourceReadOptions.START_COMMIT.key, completionTime1)
       .option(DataSourceReadOptions.END_COMMIT.key, completionTime1)

Reply via email to