This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 121edc5757b [HUDI-6587] Check incomplete commit for time travel query
(#9280)
121edc5757b is described below
commit 121edc5757b3ba67255ab8e5611f5286b319587c
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Aug 8 17:13:38 2023 -0500
[HUDI-6587] Check incomplete commit for time travel query (#9280)
---
.../org/apache/hudi/BaseHoodieTableFileIndex.java | 5 +
.../hudi/common/table/timeline/TimelineUtils.java | 30 +++-
.../hudi/exception/HoodieTimeTravelException.java | 29 ++++
.../hudi/hadoop/HoodieROTablePathFilter.java | 14 +-
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 5 +-
.../hudi/functional/TestTimeTravelQuery.scala | 182 +++++++++++----------
6 files changed, 173 insertions(+), 92 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
index 3a24ef4dd2f..7ba20795790 100644
--- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
+++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java
@@ -61,6 +61,7 @@ import java.util.stream.Collectors;
import static
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS;
import static org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE;
+import static
org.apache.hudi.common.table.timeline.TimelineUtils.validateTimestampAsOf;
import static org.apache.hudi.common.util.CollectionUtils.combine;
import static org.apache.hudi.hadoop.CachingPath.createRelativePathUnsafe;
@@ -243,6 +244,10 @@ public abstract class BaseHoodieTableFileIndex implements
AutoCloseable {
return Collections.emptyMap();
}
+ if (specifiedQueryInstant.isPresent() && !shouldIncludePendingCommits) {
+ validateTimestampAsOf(metaClient, specifiedQueryInstant.get());
+ }
+
FileStatus[] allFiles = listPartitionPathFiles(partitions);
HoodieTimeline activeTimeline = getActiveTimeline();
Option<HoodieInstant> latestInstant = activeTimeline.lastInstant();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
index 14a03ce60ef..a763f4d9053 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieTimeTravelException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,9 +48,11 @@ import static
org.apache.hudi.common.config.HoodieCommonConfig.INCREMENTAL_READ_
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.SAVEPOINT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
/**
* TimelineUtils provides a common way to query incremental meta-data changes
for a hoodie table.
@@ -244,8 +247,8 @@ public class TimelineUtils {
if (lastMaxCompletionTime.isPresent()) {
// Get 'hollow' instants that have less instant time than
exclusiveStartInstantTime but with greater commit completion time
HoodieDefaultTimeline hollowInstantsTimeline = (HoodieDefaultTimeline)
timeline.getCommitsTimeline()
- .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(),
LESSER_THAN, exclusiveStartInstantTime))
- .filter(s ->
HoodieTimeline.compareTimestamps(s.getStateTransitionTime(), GREATER_THAN,
lastMaxCompletionTime.get()));
+ .filter(s -> compareTimestamps(s.getTimestamp(), LESSER_THAN,
exclusiveStartInstantTime))
+ .filter(s -> compareTimestamps(s.getStateTransitionTime(),
GREATER_THAN, lastMaxCompletionTime.get()));
if (!hollowInstantsTimeline.empty()) {
return timelineSinceLastSync.mergeTimeline(hollowInstantsTimeline);
}
@@ -315,6 +318,29 @@ public class TimelineUtils {
}
}
+ /**
+ * Validate user-specified timestamp of time travel query against incomplete
commit's timestamp.
+ *
+ * @throws HoodieException when time travel query's timestamp >= incomplete
commit's timestamp
+ */
+ public static void validateTimestampAsOf(HoodieTableMetaClient metaClient,
String timestampAsOf) {
+ Option<HoodieInstant> firstIncompleteCommit =
metaClient.getCommitsTimeline()
+ .filterInflightsAndRequested()
+ .filter(instant ->
+ !HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())
+ || !ClusteringUtils.getClusteringPlan(metaClient,
instant).isPresent())
+ .firstInstant();
+
+ if (firstIncompleteCommit.isPresent()) {
+ String incompleteCommitTime = firstIncompleteCommit.get().getTimestamp();
+ if (compareTimestamps(timestampAsOf, GREATER_THAN_OR_EQUALS,
incompleteCommitTime)) {
+ throw new HoodieTimeTravelException(String.format(
+ "Time travel's timestamp '%s' must be earlier than the first
incomplete commit timestamp '%s'.",
+ timestampAsOf, incompleteCommitTime));
+ }
+ }
+ }
+
/**
* Handles hollow commit as per {@link
HoodieCommonConfig#INCREMENTAL_READ_HANDLE_HOLLOW_COMMIT}
* and return filtered or non-filtered timeline for incremental query to run
against.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieTimeTravelException.java
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieTimeTravelException.java
new file mode 100644
index 00000000000..c0f703fc95a
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieTimeTravelException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+public class HoodieTimeTravelException extends HoodieException {
+ public HoodieTimeTravelException(String msg) {
+ super(msg);
+ }
+
+ public HoodieTimeTravelException(String msg, Throwable e) {
+ super(msg, e);
+ }
+}
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index b38cea1ffe6..5e89ed804a8 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -49,6 +49,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static org.apache.hudi.common.config.HoodieCommonConfig.TIMESTAMP_AS_OF;
+import static
org.apache.hudi.common.table.timeline.TimelineUtils.validateTimestampAsOf;
+import static org.apache.hudi.common.util.StringUtils.nonEmpty;
/**
* Given a path is a part of - Hoodie table = accepts ONLY the latest version
of each path - Non-Hoodie table = then
@@ -185,16 +187,20 @@ public class HoodieROTablePathFilter implements
Configurable, PathFilter, Serial
metaClientCache.put(baseDir.toString(), metaClient);
}
- if (getConf().get(TIMESTAMP_AS_OF.key()) != null) {
+ final Configuration conf = getConf();
+ final String timestampAsOf = conf.get(TIMESTAMP_AS_OF.key());
+ if (nonEmpty(timestampAsOf)) {
+ validateTimestampAsOf(metaClient, timestampAsOf);
+
// Build FileSystemViewManager with specified time, it's necessary
to set this config when you may
// access old version files. For example, in spark side, using
"hoodie.datasource.read.paths"
// which contains old version files, if not specify this value,
these files will be filtered.
fsView =
FileSystemViewManager.createInMemoryFileSystemViewWithTimeline(engineContext,
- metaClient,
HoodieInputFormatUtils.buildMetadataConfig(getConf()),
-
metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(getConf().get(TIMESTAMP_AS_OF.key())));
+ metaClient, HoodieInputFormatUtils.buildMetadataConfig(conf),
+
metaClient.getActiveTimeline().filterCompletedInstants().findInstantsBeforeOrEquals(timestampAsOf));
} else {
fsView =
FileSystemViewManager.createInMemoryFileSystemView(engineContext,
- metaClient,
HoodieInputFormatUtils.buildMetadataConfig(getConf()));
+ metaClient, HoodieInputFormatUtils.buildMetadataConfig(conf));
}
String partition = FSUtils.getRelativePartitionPath(new
Path(metaClient.getBasePath()), folder);
List<HoodieBaseFile> latestFiles =
fsView.getLatestBaseFiles(partition).collect(Collectors.toList());
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index fea7781f84d..0f7eb27fd04 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -32,7 +32,8 @@ import org.apache.hudi.common.config.{ConfigProperty,
HoodieMetadataConfig, Seri
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
import org.apache.hudi.common.model.{FileSlice, HoodieFileFormat, HoodieRecord}
-import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.table.timeline.{HoodieTimeline, TimelineUtils}
+import
org.apache.hudi.common.table.timeline.TimelineUtils.{HollowCommitHandling,
validateTimestampAsOf, handleHollowCommitIfNeeded}
import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
@@ -413,6 +414,8 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
protected def listLatestFileSlices(globPaths: Seq[Path], partitionFilters:
Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = {
queryTimestamp match {
case Some(ts) =>
+ specifiedQueryTimestamp.foreach(t => validateTimestampAsOf(metaClient,
t))
+
val partitionDirs = if (globPaths.isEmpty) {
fileIndex.listFiles(partitionFilters, dataFilters)
} else {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
index 66f905abc47..cdb94907158 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
@@ -17,23 +17,27 @@
package org.apache.hudi.functional
-import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE,
MERGE_ON_READ}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.common.testutils.HoodieTestTable
import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.HoodieTimeTravelException
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
-import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.spark.sql.SaveMode.{Append, Overwrite}
+import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull,
assertNull, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.EnumSource
+import org.scalatest.Assertions.assertThrows
import java.text.SimpleDateFormat
class TestTimeTravelQuery extends HoodieSparkClientTestBase {
- var spark: SparkSession =_
+ var spark: SparkSession = _
val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
@@ -44,7 +48,7 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase {
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
)
- @BeforeEach override def setUp() {
+ @BeforeEach override def setUp(): Unit = {
setTableName("hoodie_test")
initPath()
initSparkContexts()
@@ -53,7 +57,7 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase {
initFileSystem()
}
- @AfterEach override def tearDown() = {
+ @AfterEach override def tearDown(): Unit = {
cleanupSparkContexts()
cleanupTestDataGenerator()
cleanupFileSystem()
@@ -66,38 +70,22 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase
{
val _spark = spark
import _spark.implicits._
+ val opts = commonOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name,
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> ""
+ )
+
// First write
val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version")
- df1.write.format("hudi")
- .options(commonOpts)
- .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
- .option(PARTITIONPATH_FIELD.key, "")
- .mode(SaveMode.Overwrite)
- .save(basePath)
-
- val firstCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+ val firstCommit = writeBatch(df1, opts, Overwrite)
// Second write
val df2 = Seq((1, "a1", 12, 1001)).toDF("id", "name", "value", "version")
- df2.write.format("hudi")
- .options(commonOpts)
- .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
- .option(PARTITIONPATH_FIELD.key, "")
- .mode(SaveMode.Append)
- .save(basePath)
- metaClient.reloadActiveTimeline()
- val secondCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+ val secondCommit = writeBatch(df2, opts)
// Third write
val df3 = Seq((1, "a1", 13, 1002)).toDF("id", "name", "value", "version")
- df3.write.format("hudi")
- .options(commonOpts)
- .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
- .option(PARTITIONPATH_FIELD.key, "")
- .mode(SaveMode.Append)
- .save(basePath)
- metaClient.reloadActiveTimeline()
- val thirdCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+ val thirdCommit = writeBatch(df3, opts)
// Query as of firstCommitTime
val result1 = spark.read.format("hudi")
@@ -124,6 +112,59 @@ class TestTimeTravelQuery extends
HoodieSparkClientTestBase {
assertEquals(Row(1, "a1", 13, 1002), result3)
}
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testTimeTravelQueryWithIncompleteCommit(tableType: HoodieTableType):
Unit = {
+ initMetaClient(tableType)
+ val _spark = spark
+ import _spark.implicits._
+
+ val opts = commonOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name,
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> ""
+ )
+
+ // First write
+ val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version")
+ val firstCommit = writeBatch(df1, opts, Overwrite)
+
+ // Second write
+ val df2 = Seq((1, "a1", 12, 1001)).toDF("id", "name", "value", "version")
+ val secondCommit = writeBatch(df2, opts)
+
+ // Third write
+ val df3 = Seq((1, "a1", 13, 1002)).toDF("id", "name", "value", "version")
+ val thirdCommit = writeBatch(df3, opts)
+
+ // add an incomplete commit btw 1st and 2nd commit
+ // it'll be 1 ms after 1st commit, which won't clash with 2nd commit
timestamp
+ val incompleteCommit = (firstCommit.toLong + 1).toString
+ tableType match {
+ case COPY_ON_WRITE =>
HoodieTestTable.of(metaClient).addInflightCommit(incompleteCommit)
+ case MERGE_ON_READ =>
HoodieTestTable.of(metaClient).addInflightDeltaCommit(incompleteCommit)
+ }
+
+ // Query as of firstCommitTime
+ val result1 = spark.read.format("hudi")
+ .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key, firstCommit)
+ .load(basePath)
+ .select("id", "name", "value", "version")
+ .take(1)(0)
+ assertEquals(Row(1, "a1", 10, 1000), result1)
+
+ // Query as of other commits
+ List(incompleteCommit, secondCommit, thirdCommit)
+ .foreach(commitTime => {
+ assertThrows[HoodieTimeTravelException] {
+ spark.read.format("hudi")
+ .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key,
commitTime)
+ .load(basePath)
+ .select("id", "name", "value", "version")
+ .take(1)(0)
+ }
+ })
+ }
+
@ParameterizedTest
@EnumSource(value = classOf[HoodieTableType])
def testTimeTravelQueryForPartitionedTable(tableType: HoodieTableType): Unit
= {
@@ -131,44 +172,24 @@ class TestTimeTravelQuery extends
HoodieSparkClientTestBase {
val _spark = spark
import _spark.implicits._
+ val opts = commonOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name,
+ DataSourceWriteOptions.RECORDKEY_FIELD.key -> "id",
+ DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "version",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "dt"
+ )
+
// First write
val df1 = Seq((1, "a1", 10, 1000, "2021-07-26")).toDF("id", "name",
"value", "version", "dt")
- df1.write.format("hudi")
- .options(commonOpts)
- .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
- .option(RECORDKEY_FIELD.key, "id")
- .option(PRECOMBINE_FIELD.key, "version")
- .option(PARTITIONPATH_FIELD.key, "dt")
- .mode(SaveMode.Overwrite)
- .save(basePath)
-
- val firstCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+ val firstCommit = writeBatch(df1, opts, Overwrite)
// Second write
val df2 = Seq((1, "a1", 12, 1001, "2021-07-26")).toDF("id", "name",
"value", "version", "dt")
- df2.write.format("hudi")
- .options(commonOpts)
- .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
- .option(RECORDKEY_FIELD.key, "id")
- .option(PRECOMBINE_FIELD.key, "version")
- .option(PARTITIONPATH_FIELD.key, "dt")
- .mode(SaveMode.Append)
- .save(basePath)
- metaClient.reloadActiveTimeline()
- val secondCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+ val secondCommit = writeBatch(df2, opts)
// Third write
val df3 = Seq((1, "a1", 13, 1002, "2021-07-26")).toDF("id", "name",
"value", "version", "dt")
- df3.write.format("hudi")
- .options(commonOpts)
- .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
- .option(RECORDKEY_FIELD.key, "id")
- .option(PRECOMBINE_FIELD.key, "version")
- .option(PARTITIONPATH_FIELD.key, "dt")
- .mode(SaveMode.Append)
- .save(basePath)
- metaClient.reloadActiveTimeline()
- val thirdCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+ val thirdCommit = writeBatch(df3, opts)
// query as of firstCommitTime (using 'yyyy-MM-dd HH:mm:ss' format)
val result1 = spark.read.format("hudi")
@@ -204,6 +225,12 @@ class TestTimeTravelQuery extends
HoodieSparkClientTestBase {
assertTrue(result4.isEmpty)
}
+ private def writeBatch(df: DataFrame, options: Map[String, String], mode:
SaveMode = Append): String = {
+ df.write.format("hudi").options(options).mode(mode).save(basePath)
+ metaClient.reloadActiveTimeline()
+
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+ }
+
private def defaultDateTimeFormat(queryInstant: String): String = {
val date = HoodieActiveTimeline.parseDateFromInstantTime(queryInstant)
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
@@ -223,42 +250,27 @@ class TestTimeTravelQuery extends
HoodieSparkClientTestBase {
val _spark = spark
import _spark.implicits._
- // First write
- val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version")
- df1.write.format("hudi")
- .options(commonOpts)
- .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
- .option(PARTITIONPATH_FIELD.key, "name")
- .mode(SaveMode.Overwrite)
- .save(basePath)
-
metaClient = HoodieTableMetaClient.builder()
.setBasePath(basePath)
.setConf(spark.sessionState.newHadoopConf)
.build()
- val firstCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+
+ val opts = commonOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name,
+ DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "name"
+ )
+
+ // First write
+ val df1 = Seq((1, "a1", 10, 1000)).toDF("id", "name", "value", "version")
+ val firstCommit = writeBatch(df1, opts, Overwrite)
// Second write
val df2 = Seq((1, "a1", 12, 1001, "2022")).toDF("id", "name", "value",
"version", "year")
- df2.write.format("hudi")
- .options(commonOpts)
- .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
- .option(PARTITIONPATH_FIELD.key, "name")
- .mode(SaveMode.Append)
- .save(basePath)
- metaClient.reloadActiveTimeline()
- val secondCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+ val secondCommit = writeBatch(df2, opts)
// Third write
val df3 = Seq((1, "a1", 13, 1002, "2022", "08")).toDF("id", "name",
"value", "version", "year", "month")
- df3.write.format("hudi")
- .options(commonOpts)
- .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType.name())
- .option(PARTITIONPATH_FIELD.key, "name")
- .mode(SaveMode.Append)
- .save(basePath)
- metaClient.reloadActiveTimeline()
- val thirdCommit =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getTimestamp
+ val thirdCommit = writeBatch(df3, opts)
val tableSchemaResolver = new TableSchemaResolver(metaClient)