This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 99952d1bb4a [HUDI-8602] Fix a bug for incremental query (#12385)
99952d1bb4a is described below

commit 99952d1bb4ade5198c063ec3657742c562883748
Author: Lin Liu <[email protected]>
AuthorDate: Sun Dec 1 22:22:16 2024 -0800

    [HUDI-8602] Fix a bug for incremental query (#12385)
    
    In an incremental query, Hud may need to read details from an archived 
instant.
    The ActiveTimeline and ArchivedTimeline have different ways to generate the 
file path.
    Therefore, when an ActiveTime tries to read the file of an archived 
instant, we see the FileNotFound error.
---
 .../hudi/MergeOnReadIncrementalRelation.scala      |   9 +-
 .../TestIncrementalQueryWithArchivedInstants.scala | 111 +++++++++++++++++++++
 .../TestIncrementalReadWithFullTableScan.scala     |  15 ++-
 .../deltastreamer/TestHoodieDeltaStreamer.java     |   7 +-
 4 files changed, 130 insertions(+), 12 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index ecb9b79bcb8..13337b9eace 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -209,7 +209,14 @@ trait HoodieIncrementalRelationTrait extends 
HoodieBaseRelation {
 
   protected lazy val includedCommits: immutable.Seq[HoodieInstant] = 
queryContext.getInstants.asScala.toList
 
-  protected lazy val commitsMetadata = 
includedCommits.map(getCommitMetadata(_, super.timeline)).asJava
+  protected lazy val commitsMetadata = includedCommits.map(
+    i => {
+      if (queryContext.getArchivedInstants.contains(i)) {
+        getCommitMetadata(i, queryContext.getArchivedTimeline)
+      } else {
+        getCommitMetadata(i, queryContext.getActiveTimeline)
+      }
+    }).asJava
 
   protected lazy val affectedFilesInCommits: java.util.List[StoragePathInfo] = 
{
     listAffectedFilesForCommits(conf, metaClient.getBasePath, commitsMetadata)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/TestIncrementalQueryWithArchivedInstants.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/TestIncrementalQueryWithArchivedInstants.scala
new file mode 100644
index 00000000000..15a5dbbf69c
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/TestIncrementalQueryWithArchivedInstants.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache
+
+import org.apache.hudi.DataSourceWriteOptions.{RECORD_MERGE_MODE, 
RECORD_MERGE_STRATEGY_ID}
+import org.apache.hudi.common.config.RecordMergeMode
+import org.apache.hudi.common.model.HoodieRecordMerger
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
+import org.apache.hudi.storage.{HoodieStorageUtils, StorageConfiguration}
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.types.{LongType, StringType, StructField, 
StructType}
+import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertFalse, 
assertTrue}
+import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.function.Executable
+import org.junit.{After, Before, Test}
+
+import java.nio.file.{Files, Path}
+
+@Tag("functional")
+class TestIncrementalQueryWithArchivedInstants extends 
SparkClientFunctionalTestHarness {
+  var tmpDir: Path = _
+  var tblPath: String = _
+
+  override def basePath(): String = tmpDir.toAbsolutePath.toUri.toString
+
+  @Before
+  def setUp(): Unit = {
+    tmpDir = Files.createTempDirectory("hudi_random")
+    tblPath = basePath()
+    super.runBeforeEach()
+  }
+
+  def tearDown(): Unit = {
+    super.closeFileSystem()
+  }
+
+  @Test
+  def testCompactionWithCommitTimeMerge(): Unit = {
+    val tableOpt: Map[String, String] = Map(
+      "hoodie.datasource.write.table.name" -> "test_table",
+      "hoodie.datasource.write.table.type" -> "MERGE_ON_READ",
+      "hoodie.datasource.write.recordkey.field" -> "id",
+      "hoodie.datasource.write.precombined.field" -> "ts",
+      "hoodie.datasource.write.partitionpath.field" -> "name",
+      "hoodie.populate.meta.fields=true" -> "false",
+      "hoodie.compaction.payload.class" -> 
"org.apache.hudi.common.model.OverwriteWithLatestAvroPayload",
+      "hoodie.datasource.write.payload.class" -> 
"org.apache.hudi.common.model.OverwriteWithLatestAvroPayload",
+      RECORD_MERGE_MODE.key -> RecordMergeMode.COMMIT_TIME_ORDERING.name)
+
+    val serviceOpt: Map[String, String] = Map(
+      "hoodie.table.services.enabled" -> "true",
+      "hoodie.compact.inline" -> "false",
+      "hoodie.compact.inline.max.delta.commits" -> "1",
+      "hoodie.parquet.small.file.limit" -> "0",
+      "hoodie.clustering.inline" -> "false",
+      "hoodie.keep.max.commits" -> "2",
+      "hoodie.keep.min.commits" -> "1",
+      "hoodie.commits.archival.batch" -> "1")
+    val schema = new StructType(Array(
+      StructField("id", StringType, nullable = true),
+      StructField("name", StringType, nullable = true),
+      StructField("ts", LongType, nullable = true)
+    ))
+
+    val opt = tableOpt ++ serviceOpt
+    for (i <- 1L to 10L) {
+      val data = Seq(Row("id1", "name1", i), Row("id2", "name2", i), 
Row("id3", "name3", i))
+      val df = spark.createDataFrame(spark.sparkContext.parallelize(data, 2), 
schema)
+      df.write.format("hudi").options(opt).mode(SaveMode.Append).save(tblPath)
+    }
+
+    val storageConfiguration = new HadoopStorageConfiguration(false)
+    val metaClient = HoodieTableMetaClient.builder().setBasePath(tblPath)
+      .setStorage(HoodieStorageUtils.getStorage(storageConfiguration)).build()
+    val instants = metaClient.getArchivedTimeline().getInstants
+
+    // There are at least one archived instants.
+    assertFalse(instants.isEmpty)
+    // No errors during read.
+    assertDoesNotThrow(new Executable {
+      def execute(): Unit = {
+        spark.read.format("hudi")
+          .option("hoodie.schema.on.read.enable", "true")
+          .option("hoodie.datasource.query.type", "incremental")
+          .option("hoodie.datasource.read.begin.instanttime", "0")
+          .options(opt)
+          .load(tblPath)
+          .show(false)
+      }
+    })
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
index bd05f542a14..45dfd6fb89e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
@@ -17,21 +17,20 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.model.HoodieTableType
-import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline, 
InstantComparison}
 import 
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.instantTimeMinusMillis
-import InstantComparison.compareTimestamps
+import 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps
+import org.apache.hudi.common.table.timeline.{HoodieInstant, InstantComparison}
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.exception.HoodieIOException
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
 import org.apache.spark.SparkException
 import org.apache.spark.sql.{SaveMode, SparkSession}
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
 import org.junit.jupiter.api.function.Executable
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.EnumSource
 
@@ -179,12 +178,12 @@ class TestIncrementalReadWithFullTableScan extends 
HoodieSparkClientTestBase {
 
   private def shouldThrowIfFallbackIsFalse(fn: () => Unit): Unit = {
     val msg = "Should fail with Path does not exist"
-    val exp = assertThrows(classOf[HoodieIOException], new Executable {
+    val exp = assertThrows(classOf[SparkException], new Executable {
       override def execute(): Unit = {
         fn()
       }
     }, msg)
-    assertTrue(exp.getMessage.contains("Could not read commit details"),
-      "Expected to fail with 'Could not read commit details' but the message 
was: " + exp.getMessage)
+    assertTrue(exp.getMessage.contains("FileNotFoundException"),
+      "Expected to fail with 'FileNotFoundException' but the message was: " + 
exp.getMessage)
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 7987241724e..38d9dde5407 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -120,6 +120,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.AnalysisException;
@@ -2493,9 +2494,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
     insertInTable(tableBasePath, 9, WriteOperationType.UPSERT);
     //No change as this fails with could not read commit details due to path 
not exists
-    Throwable exp = assertThrows(HoodieIOException.class, () -> new 
HoodieDeltaStreamer(downstreamCfg, jsc).sync());
-    assertTrue(exp.getMessage().contains("Could not read commit details"),
-        "Expected to fail with 'Could not read commit details' but the message 
was: " + exp.getMessage());
+    Throwable exp = assertThrows(SparkException.class, () -> new 
HoodieDeltaStreamer(downstreamCfg, jsc).sync());
+    assertTrue(exp.getMessage().contains("FileNotFoundException"),
+        "Expected to fail with 'FileNotFoundException' but the message was: " 
+ exp.getMessage());
     assertRecordCount(1000, downstreamTableBasePath, sqlContext);
 
     if (downstreamCfg.configs == null) {

Reply via email to