This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit c1a497059c42b7116d46b8afae4b826124fce77f
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Sep 12 02:33:11 2023 -0400

    [HUDI-6834] Fixing time travel queries when overlaps with cleaner and 
archival time window (#9666)
    
    When time travel query overlaps with cleaner or archival window, we should 
explicitly fail the query.
    If not, we might end up serving partial/wrong results or empty rows.
---
 .../hudi/common/table/timeline/TimelineUtils.java  |  30 ++++++
 .../hudi/functional/TestTimeTravelQuery.scala      | 104 +++++++++++++++++++--
 2 files changed, 127 insertions(+), 7 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index a763f4d9053..a682c9face9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -25,9 +25,12 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieTimeTravelException;
@@ -50,6 +53,7 @@ import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.SAVEPOINT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
@@ -339,6 +343,32 @@ public class TimelineUtils {
             timestampAsOf, incompleteCommitTime));
       }
     }
+
+    // also timestamp as of cannot query cleaned up data.
+    Option<HoodieInstant> latestCleanOpt = 
metaClient.getActiveTimeline().getCleanerTimeline().filterCompletedInstants().lastInstant();
+    if (latestCleanOpt.isPresent()) {
+      // Ensure timestamp as of is > than the earliest commit to retain and
+      try {
+        HoodieCleanMetadata cleanMetadata = 
CleanerUtils.getCleanerMetadata(metaClient, latestCleanOpt.get());
+        String earliestCommitToRetain = 
cleanMetadata.getEarliestCommitToRetain();
+        if (!StringUtils.isNullOrEmpty(earliestCommitToRetain)) {
+          
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(earliestCommitToRetain,
 LESSER_THAN_OR_EQUALS, timestampAsOf),
+              "Cleaner cleaned up the timestamp of interest. Please ensure 
sufficient commits are retained with cleaner "
+                  + "for Timestamp as of query to work");
+        } else {
+          // when cleaner is based on file versions, we may not find value for 
earliestCommitToRetain.
+          // so, lets check if timestamp of interest is archived based on 
first entry in active timeline
+          Option<HoodieInstant> firstCompletedInstant = 
metaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().firstInstant();
+          if (firstCompletedInstant.isPresent()) {
+            
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(firstCompletedInstant.get().getTimestamp(),
 LESSER_THAN_OR_EQUALS, timestampAsOf),
+                "Please ensure sufficient commits are retained (uncleaned and 
un-archived) for timestamp as of query to work.");
+          }
+        }
+      } catch (IOException e) {
+        throw new HoodieTimeTravelException("Cleaner cleaned up the timestamp 
of interest. "
+            + "Please ensure sufficient commits are retained with cleaner for 
Timestamp as of query to work ");
+      }
+    }
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
index cdb94907158..7f3d9386fb2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
@@ -17,26 +17,28 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.{HoodieCleaningPolicy, HoodieTableType}
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.testutils.HoodieTestTable
-import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.exception.HoodieTimeTravelException
+import org.apache.hudi.config.{HoodieArchivalConfig, HoodieCleanConfig, 
HoodieCompactionConfig, HoodieWriteConfig}
+import org.apache.hudi.exception.ExceptionUtil.getRootCause
+import org.apache.hudi.exception.{HoodieKeyGeneratorException, 
HoodieTimeTravelException}
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
ScalaAssertionSupport, config}
 import org.apache.spark.sql.SaveMode.{Append, Overwrite}
 import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, 
assertNull, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.EnumSource
 import org.scalatest.Assertions.assertThrows
 
 import java.text.SimpleDateFormat
 
-class TestTimeTravelQuery extends HoodieSparkClientTestBase {
+class TestTimeTravelQuery extends HoodieSparkClientTestBase with 
ScalaAssertionSupport {
   var spark: SparkSession = _
   val commonOpts = Map(
     "hoodie.insert.shuffle.parallelism" -> "4",
@@ -155,7 +157,7 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase 
{
     // Query as of other commits
     List(incompleteCommit, secondCommit, thirdCommit)
       .foreach(commitTime => {
-        assertThrows[HoodieTimeTravelException] {
+        assertThrows(classOf[HoodieTimeTravelException]) {
           spark.read.format("hudi")
             .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, 
commitTime)
             .load(basePath)
@@ -307,4 +309,92 @@ class TestTimeTravelQuery extends 
HoodieSparkClientTestBase {
     assertNotNull(schema3.getField("year"))
     assertNotNull(schema3.getField("month"))
   }
+
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testTimeTravelQueryCommitsBasedClean(tableType: HoodieTableType): Unit = 
{
+      testTimeTravelQueryCOW(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name, 
tableType)
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testTimeTravelQueryFileVersionBasedClean(tableType: HoodieTableType): 
Unit = {
+    
testTimeTravelQueryCOW(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name, 
tableType)
+  }
+
+  def testTimeTravelQueryCOW(cleanerPolicy: String, tableType: 
HoodieTableType): Unit = {
+    initMetaClient(tableType)
+    val _spark = spark
+    import _spark.implicits._
+
+    val opts = commonOpts ++ Map(
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name,
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "",
+      HoodieCleanConfig.CLEANER_POLICY.key() -> cleanerPolicy,
+      HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "2",
+      HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.key() -> "2",
+      HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key() -> "3",
+      HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key() -> "4",
+      HoodieMetadataConfig.ENABLE.key() -> "false",
+      HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "1"
+    )
+
+    // First write
+    val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version")
+    val firstCommit = writeBatch(df1, opts, Overwrite)
+
+    // Second write
+    writeBatch(Seq((1, "a1", 12, 1001)).toDF("id", "name", "value", 
"version"), opts)
+
+    // Third write
+    val df3 = Seq((1, "a1", 13, 1002)).toDF("id", "name", "value", "version")
+    val thirdCommit = writeBatch(df3, opts)
+
+    // Fourth write
+    writeBatch(Seq((1, "a1", 14, 1003)).toDF("id", "name", "value", 
"version"), opts)
+
+    // Query as of thirdCommitTime
+    val result3 = spark.read.format("hudi")
+      .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, thirdCommit)
+      .load(basePath)
+      .select("id", "name", "value", "version")
+      .take(1)(0)
+    assertEquals(Row(1, "a1", 13, 1002), result3)
+
+    if 
(!cleanerPolicy.equals(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name)) {
+      // first commit should fail since cleaner already cleaned up.
+      val e1 = assertThrows(classOf[IllegalArgumentException]) {
+        spark.read.format("hudi")
+          .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, 
firstCommit)
+          .load(basePath)
+          .select("id", "name", "value", "version")
+          .take(1)
+      }
+      assertTrue(getRootCause(e1).getMessage.contains("Cleaner cleaned up the 
timestamp of interest. Please ensure sufficient commits are retained with 
cleaner for Timestamp as of query to work"))
+    }
+
+    // add more writes so that first commit goes into archived timeline.
+    // fifth write
+    writeBatch(Seq((1, "a1", 15, 1004)).toDF("id", "name", "value", 
"version"), opts)
+
+    // sixth write
+    writeBatch(Seq((1, "a1", 16, 1005)).toDF("id", "name", "value", 
"version"), opts)
+
+    // for commits and hours based cleaning, cleaner based exception will be 
thrown. For file versions based cleaning,
+    // archival based exception will be thrown.
+    val expectedErrorMsg = if 
(!cleanerPolicy.equals(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name)) {
+      "Cleaner cleaned up the timestamp of interest. Please ensure sufficient 
commits are retained with cleaner for Timestamp as of query to work"
+    } else {
+     "Please ensure sufficient commits are retained (uncleaned and 
un-archived) for timestamp as of query to work."
+    }
+
+    val e2 = assertThrows(classOf[IllegalArgumentException]) {
+      spark.read.format("hudi")
+        .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, 
firstCommit)
+        .load(basePath)
+        .select("id", "name", "value", "version")
+        .take(1)
+    }
+    assertTrue(getRootCause(e2).getMessage.contains(expectedErrorMsg))
+  }
 }

Reply via email to