This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit c1a497059c42b7116d46b8afae4b826124fce77f Author: Sivabalan Narayanan <[email protected]> AuthorDate: Tue Sep 12 02:33:11 2023 -0400 [HUDI-6834] Fixing time travel queries when overlaps with cleaner and archival time window (#9666) When time travel query overlaps with cleaner or archival window, we should explicitly fail the query. If not, we might end up serving partial/wrong results or empty rows. --- .../hudi/common/table/timeline/TimelineUtils.java | 30 ++++++ .../hudi/functional/TestTimeTravelQuery.scala | 104 +++++++++++++++++++-- 2 files changed, 127 insertions(+), 7 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index a763f4d9053..a682c9face9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -25,9 +25,12 @@ import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieTimeTravelException; @@ -50,6 +53,7 @@ import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS; import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS; import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.SAVEPOINT_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps; @@ -339,6 +343,32 @@ public class TimelineUtils { timestampAsOf, incompleteCommitTime)); } } + + // also timestamp as of cannot query cleaned up data. + Option<HoodieInstant> latestCleanOpt = metaClient.getActiveTimeline().getCleanerTimeline().filterCompletedInstants().lastInstant(); + if (latestCleanOpt.isPresent()) { + // Ensure timestamp as of is > than the earliest commit to retain and + try { + HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(metaClient, latestCleanOpt.get()); + String earliestCommitToRetain = cleanMetadata.getEarliestCommitToRetain(); + if (!StringUtils.isNullOrEmpty(earliestCommitToRetain)) { + ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(earliestCommitToRetain, LESSER_THAN_OR_EQUALS, timestampAsOf), + "Cleaner cleaned up the timestamp of interest. Please ensure sufficient commits are retained with cleaner " + + "for Timestamp as of query to work"); + } else { + // when cleaner is based on file versions, we may not find value for earliestCommitToRetain. + // so, lets check if timestamp of interest is archived based on first entry in active timeline + Option<HoodieInstant> firstCompletedInstant = metaClient.getActiveTimeline().getWriteTimeline().filterCompletedInstants().firstInstant(); + if (firstCompletedInstant.isPresent()) { + ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(firstCompletedInstant.get().getTimestamp(), LESSER_THAN_OR_EQUALS, timestampAsOf), + "Please ensure sufficient commits are retained (uncleaned and un-archived) for timestamp as of query to work."); + } + } + } catch (IOException e) { + throw new HoodieTimeTravelException("Cleaner cleaned up the timestamp of interest. " + + "Please ensure sufficient commits are retained with cleaner for Timestamp as of query to work "); + } + } } /** diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala index cdb94907158..7f3d9386fb2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala @@ -17,26 +17,28 @@ package org.apache.hudi.functional -import org.apache.hudi.common.model.HoodieTableType +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.{HoodieCleaningPolicy, HoodieTableType} import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ} import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestTable -import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.exception.HoodieTimeTravelException +import org.apache.hudi.config.{HoodieArchivalConfig, HoodieCleanConfig, HoodieCompactionConfig, HoodieWriteConfig} +import org.apache.hudi.exception.ExceptionUtil.getRootCause +import org.apache.hudi.exception.{HoodieKeyGeneratorException, HoodieTimeTravelException} import org.apache.hudi.testutils.HoodieSparkClientTestBase -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, ScalaAssertionSupport, config} import org.apache.spark.sql.SaveMode.{Append, Overwrite} import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertNull, assertTrue} -import org.junit.jupiter.api.{AfterEach, BeforeEach} +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.EnumSource import org.scalatest.Assertions.assertThrows import java.text.SimpleDateFormat -class TestTimeTravelQuery extends HoodieSparkClientTestBase { +class TestTimeTravelQuery extends HoodieSparkClientTestBase with ScalaAssertionSupport { var spark: SparkSession = _ val commonOpts = Map( "hoodie.insert.shuffle.parallelism" -> "4", @@ -155,7 +157,7 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase { // Query as of other commits List(incompleteCommit, secondCommit, thirdCommit) .foreach(commitTime => { - assertThrows[HoodieTimeTravelException] { + assertThrows(classOf[HoodieTimeTravelException]) { spark.read.format("hudi") .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, commitTime) .load(basePath) @@ -307,4 +309,92 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase { assertNotNull(schema3.getField("year")) assertNotNull(schema3.getField("month")) } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testTimeTravelQueryCommitsBasedClean(tableType: HoodieTableType): Unit = { + testTimeTravelQueryCOW(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name, tableType) + } + + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testTimeTravelQueryFileVersionBasedClean(tableType: HoodieTableType): Unit = { + testTimeTravelQueryCOW(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name, tableType) + } + + def testTimeTravelQueryCOW(cleanerPolicy: String, tableType: HoodieTableType): Unit = { + initMetaClient(tableType) + val _spark = spark + import _spark.implicits._ + + val opts = commonOpts ++ Map( + DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name, + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "", + HoodieCleanConfig.CLEANER_POLICY.key() -> cleanerPolicy, + HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key() -> "2", + HoodieCleanConfig.CLEANER_FILE_VERSIONS_RETAINED.key() -> "2", + HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key() -> "3", + HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key() -> "4", + HoodieMetadataConfig.ENABLE.key() -> "false", + HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key() -> "1" + ) + + // First write + val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version") + val firstCommit = writeBatch(df1, opts, Overwrite) + + // Second write + writeBatch(Seq((1, "a1", 12, 1001)).toDF("id", "name", "value", "version"), opts) + + // Third write + val df3 = Seq((1, "a1", 13, 1002)).toDF("id", "name", "value", "version") + val thirdCommit = writeBatch(df3, opts) + + // Fourth write + writeBatch(Seq((1, "a1", 14, 1003)).toDF("id", "name", "value", "version"), opts) + + // Query as of thirdCommitTime + val result3 = spark.read.format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, thirdCommit) + .load(basePath) + .select("id", "name", "value", "version") + .take(1)(0) + assertEquals(Row(1, "a1", 13, 1002), result3) + + if (!cleanerPolicy.equals(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name)) { + // first commit should fail since cleaner already cleaned up. + val e1 = assertThrows(classOf[IllegalArgumentException]) { + spark.read.format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, firstCommit) + .load(basePath) + .select("id", "name", "value", "version") + .take(1) + } + assertTrue(getRootCause(e1).getMessage.contains("Cleaner cleaned up the timestamp of interest. Please ensure sufficient commits are retained with cleaner for Timestamp as of query to work")) + } + + // add more writes so that first commit goes into archived timeline. + // fifth write + writeBatch(Seq((1, "a1", 15, 1004)).toDF("id", "name", "value", "version"), opts) + + // sixth write + writeBatch(Seq((1, "a1", 16, 1005)).toDF("id", "name", "value", "version"), opts) + + // for commits and hours based cleaning, cleaner based exception will be thrown. For file versions based cleaning, + // archival based exception will be thrown. + val expectedErrorMsg = if (!cleanerPolicy.equals(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name)) { + "Cleaner cleaned up the timestamp of interest. Please ensure sufficient commits are retained with cleaner for Timestamp as of query to work" + } else { + "Please ensure sufficient commits are retained (uncleaned and un-archived) for timestamp as of query to work." + } + + val e2 = assertThrows(classOf[IllegalArgumentException]) { + spark.read.format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, firstCommit) + .load(basePath) + .select("id", "name", "value", "version") + .take(1) + } + assertTrue(getRootCause(e2).getMessage.contains(expectedErrorMsg)) + } }
