This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8298cf8d4010 fix(query): Change start commit time to be exclusive in
incremental query on Spark (#13982)
8298cf8d4010 is described below
commit 8298cf8d4010ab3b844a66fad315b4438db8af45
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Sep 26 16:48:11 2025 -0700
fix(query): Change start commit time to be exclusive in incremental query
on Spark (#13982)
---
docker/demo/sparksql-incremental.commands | 8 +--
.../common/table/timeline/BaseHoodieTimeline.java | 2 +-
.../main/scala/org/apache/hudi/DefaultSource.scala | 4 +-
.../hudi/HoodieHadoopFsRelationFactory.scala | 12 ++---
.../hudi/MergeOnReadIncrementalRelationV2.scala | 2 +-
.../sql/hudi/streaming/HoodieStreamSourceV2.scala | 4 +-
.../hudi/functional/TestIncrementalQueries.java | 8 ++-
.../apache/hudi/functional/TestCOWDataSource.scala | 30 ++++--------
.../hudi/functional/TestCOWDataSourceStorage.scala | 10 ++--
.../functional/TestColumnStatsIndexWithSQL.scala | 4 +-
.../functional/TestDataSourceForBootstrap.scala | 7 ++-
.../TestIncrementalReadWithFullTableScan.scala | 6 +--
.../apache/hudi/functional/TestMORDataSource.scala | 57 +++++++++-------------
.../hudi/functional/TestSparkDataSource.scala | 5 +-
.../hudi/functional/TestStructuredStreaming.scala | 4 +-
.../apache/spark/sql/hudi/common/TestSqlConf.scala | 4 +-
.../dml/others/TestHoodieTableValuedFunction.scala | 6 +--
.../sql/hudi/dml/others/TestMergeIntoTable.scala | 6 +--
.../sql/hudi/feature/TestCDCForSparkSQL.scala | 45 ++++++++---------
.../hudi/procedure/TestCopyToTableProcedure.scala | 2 +-
.../hudi/utilities/sources/HoodieIncrSource.java | 17 ++++---
21 files changed, 104 insertions(+), 139 deletions(-)
diff --git a/docker/demo/sparksql-incremental.commands
b/docker/demo/sparksql-incremental.commands
index 8f58ef8e4caf..fa77582e8bfd 100644
--- a/docker/demo/sparksql-incremental.commands
+++ b/docker/demo/sparksql-incremental.commands
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
-val startCompletionTime = HoodieDataSourceHelpers.listCompletionTimeSince(fs,
"/user/hive/warehouse/stock_ticks_cow", "00000").get(1)
+val startCompletionTime = HoodieDataSourceHelpers.listCompletionTimeSince(fs,
"/user/hive/warehouse/stock_ticks_cow", "00000").get(0)
println("Begin completion time for COW incremental query: " +
startCompletionTime)
val hoodieIncQueryDF = spark.read.format("hudi").
option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
@@ -38,7 +38,7 @@ println("stock_ticks_cow incremental count: " +
hoodieIncQueryDF.count)
hoodieIncQueryDF.registerTempTable("stock_ticks_cow_incr")
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from
stock_ticks_cow_incr where symbol = 'GOOG'").show(100, false);
-val bsStartCompletionTime =
HoodieDataSourceHelpers.listCompletionTimeSince(fs,
"/user/hive/warehouse/stock_ticks_cow_bs", "00000000000000").get(1)
+val bsStartCompletionTime =
HoodieDataSourceHelpers.listCompletionTimeSince(fs,
"/user/hive/warehouse/stock_ticks_cow_bs", "00000000000000").get(0)
println("Begin completion time for bootstrap COW incremental query: " +
bsStartCompletionTime)
val hoodieIncQueryBsDF = spark.read.format("hudi").
option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
@@ -48,7 +48,7 @@ println("stock_ticks_cow_bs incremental count: " +
hoodieIncQueryBsDF.count)
hoodieIncQueryBsDF.registerTempTable("stock_ticks_cow_bs_incr")
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from
stock_ticks_cow_bs_incr where symbol = 'GOOG'").show(100, false);
-val morStartCompletionTime =
HoodieDataSourceHelpers.listCompletionTimeSince(fs,
"/user/hive/warehouse/stock_ticks_mor", "00000").get(1)
+val morStartCompletionTime =
HoodieDataSourceHelpers.listCompletionTimeSince(fs,
"/user/hive/warehouse/stock_ticks_mor", "00000").get(0)
println("Begin completion time for MOR incremental query: " +
morStartCompletionTime)
val hoodieMorIncQueryDF = spark.read.format("hudi").
option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
@@ -58,7 +58,7 @@ println("stock_ticks_mor incremental count: " +
hoodieMorIncQueryDF.count)
hoodieMorIncQueryDF.registerTempTable("stock_ticks_mor_incr")
spark.sql("select `_hoodie_commit_time`, symbol, ts, volume, open, close from
stock_ticks_mor_incr where symbol = 'GOOG'").show(100, false);
-val bsMorStartCompletionTime =
HoodieDataSourceHelpers.listCompletionTimeSince(fs,
"/user/hive/warehouse/stock_ticks_mor_bs", "00000000000000").get(1)
+val bsMorStartCompletionTime =
HoodieDataSourceHelpers.listCompletionTimeSince(fs,
"/user/hive/warehouse/stock_ticks_mor_bs", "00000000000000").get(0)
println("Begin completion time for bootstrap MOR incremental query: " +
bsMorStartCompletionTime)
val hoodieMorIncQueryBsDF = spark.read.format("hudi").
option(DataSourceReadOptions.QUERY_TYPE.key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java
index 331db8713faa..cc559d111f3d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java
@@ -263,7 +263,7 @@ public abstract class BaseHoodieTimeline implements
HoodieTimeline {
@Override
public HoodieTimeline findInstantsInRangeByCompletionTime(String startTs,
String endTs) {
return factory.createDefaultTimeline(
- getInstantsAsStream().filter(s -> s.getCompletionTime() != null &&
InstantComparison.isInClosedRange(s.getCompletionTime(), startTs, endTs)),
+ getInstantsAsStream().filter(s -> s.getCompletionTime() != null &&
InstantComparison.isInRange(s.getCompletionTime(), startTs, endTs)),
getInstantReader());
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index 54ca53794e34..fb78db78c51f 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -345,8 +345,8 @@ object DefaultSource {
case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
(hoodieTableSupportsCompletionTime, enableFileGroupReader) match {
case (true, true) => new
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV2(
- sqlContext, metaClient, parameters, userSchema,
isBootstrappedTable, RangeType.CLOSED_CLOSED).build()
- case (true, false) => new IncrementalRelationV2(sqlContext,
parameters, userSchema, metaClient, RangeType.CLOSED_CLOSED)
+ sqlContext, metaClient, parameters, userSchema,
isBootstrappedTable).build()
+ case (true, false) => new IncrementalRelationV2(sqlContext,
parameters, userSchema, metaClient, RangeType.OPEN_CLOSED)
case (false, true) => new
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV1(
sqlContext, metaClient, parameters, userSchema,
isBootstrappedTable).build()
case (false, false) => new IncrementalRelationV1(sqlContext,
parameters, userSchema, metaClient)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
index 9221e4f0ff78..9ee1663fea65 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala
@@ -20,21 +20,15 @@ package org.apache.hudi
import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema,
isSchemaEvolutionEnabledOnRead}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.HoodieFileIndex.getConfigProperties
-import org.apache.hudi.common.config.{ConfigProperty, HoodieMetadataConfig,
HoodieReaderConfig, TypedProperties}
-import
org.apache.hudi.common.config.HoodieMetadataConfig.{DEFAULT_METADATA_ENABLE_FOR_READERS,
ENABLE}
+import org.apache.hudi.common.config.{ConfigProperty, HoodieReaderConfig}
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.table.log.InstantRange.RangeType
import org.apache.hudi.common.table.timeline.HoodieTimeline
-import org.apache.hudi.common.util.ConfigUtils
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
-import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.keygen.{CustomAvroKeyGenerator, CustomKeyGenerator,
TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
-import org.apache.hudi.metadata.HoodieTableMetadataUtil
import org.apache.hudi.storage.StoragePath
import org.apache.avro.Schema
@@ -334,7 +328,7 @@ class
HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV2(override val sqlCont
override val
options: Map[String, String],
override val
schemaSpec: Option[StructType],
isBootstrap:
Boolean,
- rangeType:
RangeType = RangeType.CLOSED_CLOSED)
+ rangeType:
RangeType = RangeType.OPEN_CLOSED)
extends HoodieMergeOnReadIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap,
MergeOnReadIncrementalRelationV2(sqlContext, options, metaClient,
schemaSpec, None, rangeType))
@@ -434,7 +428,7 @@ class
HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV2(override val sqlCont
override val
options: Map[String, String],
override val
schemaSpec: Option[StructType],
isBootstrap:
Boolean,
- rangeType:
RangeType)
+ rangeType:
RangeType = RangeType.OPEN_CLOSED)
extends HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(sqlContext,
metaClient, options, schemaSpec, isBootstrap,
MergeOnReadIncrementalRelationV2(sqlContext, options, metaClient,
schemaSpec, None, rangeType))
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
index c2795325d688..f104f2cfc0f3 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelationV2.scala
@@ -52,7 +52,7 @@ case class MergeOnReadIncrementalRelationV2(override val
sqlContext: SQLContext,
override val metaClient:
HoodieTableMetaClient,
private val userSchema:
Option[StructType],
private val prunedDataSchema:
Option[StructType] = None,
- override val rangeType: RangeType
= RangeType.CLOSED_CLOSED)
+ override val rangeType: RangeType
= RangeType.OPEN_CLOSED)
extends BaseMergeOnReadSnapshotRelation(sqlContext, optParams, metaClient,
Seq(), userSchema, prunedDataSchema)
with HoodieIncrementalRelationV2Trait with MergeOnReadIncrementalRelation {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
index f5cb59cdf16b..3eebc4639179 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSourceV2.scala
@@ -170,7 +170,7 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
.map(serDe.serializeRow)
case HoodieTableType.MERGE_ON_READ =>
val requiredColumns = schema.fields.map(_.name)
- new MergeOnReadIncrementalRelationV2(sqlContext, incParams,
metaClient, Some(schema), rangeType = rangeType)
+ MergeOnReadIncrementalRelationV2(sqlContext, incParams,
metaClient, Some(schema), rangeType = rangeType)
.buildScan(requiredColumns, Array.empty[Filter])
.asInstanceOf[RDD[InternalRow]]
case _ => throw new IllegalArgumentException(s"UnSupport
tableType: $tableType")
@@ -186,7 +186,7 @@ class HoodieStreamSourceV2(sqlContext: SQLContext,
startOffset.offsetCommitTime, RangeType.CLOSED_CLOSED)
case HoodieSourceOffset(completionTime) => (
completionTime, RangeType.OPEN_CLOSED)
- case _=> throw new IllegalStateException("UnKnow offset type.")
+ case _=> throw new IllegalStateException("Unknown offset type.")
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestIncrementalQueries.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestIncrementalQueries.java
index 0563e6364004..c150c97b6d03 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestIncrementalQueries.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestIncrementalQueries.java
@@ -75,11 +75,9 @@ class TestIncrementalQueries extends
SparkClientFunctionalTestHarness {
// Run incremental query for CASE 1: last two commits are within the range.
// Make sure the records from the second commit are included.
// This avoids the differences between different versions. That is,
- // the range type of table version 6 is open_close, but that of > 6 is
close_close by default.
- String startTimestamp =
String.valueOf(Long.valueOf(sortedInstants.get(1).getRight()) - 1);
- Dataset<Row> result = spark().read().format("org.apache.hudi")
+ Dataset<Row> result = spark().read().format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
- .option(DataSourceReadOptions.START_COMMIT().key(), startTimestamp)
+ .option(DataSourceReadOptions.START_COMMIT().key(),
sortedInstants.get(0).getRight())
.option(DataSourceReadOptions.END_COMMIT().key(),
sortedInstants.get(2).getRight()).load(path);
// Only records from the last two commits should be returned.
assertEquals(3,
@@ -97,7 +95,7 @@ class TestIncrementalQueries extends
SparkClientFunctionalTestHarness {
// Run incremental query for CASE 2: start time is larger than the newest
instant.
// That is, no instances would fall into this range.
- startTimestamp =
String.valueOf(Long.valueOf(sortedInstants.get(2).getRight()) + 100);
+ String startTimestamp =
String.valueOf(Long.valueOf(sortedInstants.get(2).getRight()) + 100);
String endTimestamp =
String.valueOf(Long.valueOf(sortedInstants.get(2).getRight()) + 200);
result = spark().read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(),
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index 6f56f466bc9e..96d9685638a1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -637,6 +637,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
.mode(SaveMode.Overwrite)
.save(basePath)
metaClient = createMetaClient(spark, basePath)
+ val commit1CompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
val dataGen2 = new HoodieTestDataGenerator(Array("2022-01-02"))
val records2 = recordsToStrings(dataGen2.generateInserts("002",
30)).asScala.toList
@@ -657,7 +658,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
val incrementalQueryRes = spark.read.format("hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commit2CompletionTime)
+ .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
.option(DataSourceReadOptions.END_COMMIT.key, commit2CompletionTime)
.load(basePath)
assertEquals(incrementalQueryRes.where("partition = '2022-01-01'").count,
0)
@@ -1146,6 +1147,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)
+ val commitCompletionTime1 =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
val hoodieROViewDF1 =
spark.read.format("org.apache.hudi").options(readOpts).load(basePath)
assertEquals(insert1Cnt, hoodieROViewDF1.count())
@@ -1159,14 +1161,13 @@ class TestCOWDataSource extends
HoodieSparkClientTestBase with ScalaAssertionSup
.option(DataSourceWriteOptions.INSERT_DROP_DUPS.key, "true")
.mode(SaveMode.Append)
.save(basePath)
- val commitCompletionTime2 =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
val hoodieROViewDF2 =
spark.read.format("org.apache.hudi").options(readOpts).load(basePath)
assertEquals(hoodieROViewDF2.count(), totalUniqueKeyToGenerate)
val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime2)
+ .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime1)
.load(basePath)
assertEquals(hoodieIncViewDF2.count(), insert2NewKeyCnt)
}
@@ -1462,6 +1463,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
.mode(SaveMode.Overwrite)
.save(basePath)
+ val commitCompletionTime1 =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
val countIn20160315 = records1.asScala.count(record =>
record.getPartitionPath == "2016/03/15")
val pathForReader = getPathForReader(basePath, !enableFileIndex, if
(partitionEncode) 1 else 3)
// query the partition by filter
@@ -1492,13 +1494,12 @@ class TestCOWDataSource extends
HoodieSparkClientTestBase with ScalaAssertionSup
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
.mode(SaveMode.Append)
.save(basePath)
- val commitCompletionTime2 =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
// Incremental query without "*" in path
val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime2)
+ .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime1)
.load(basePath)
assertEquals(N + 1, hoodieIncViewDF1.count())
assertEquals(false, Metrics.isInitialized(basePath))
@@ -2174,7 +2175,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
)
// Helper method to get commit time based on table version
- // v6 uses requestedTime, v9+ uses completionTime
+ // v6 uses requestedTime, v8+ uses completionTime
def getCommitTime(tableVersion: String): String = {
metaClient.reloadActiveTimeline()
val lastInstant =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get()
@@ -2195,19 +2196,6 @@ class TestCOWDataSource extends
HoodieSparkClientTestBase with ScalaAssertionSup
getCommitTime(tableVersion)
}
- // Helper method to adjust start time for incremental queries based on
version
- // v6: open_close range (START exclusive, END inclusive)
- // v9: close_close range (both START and END inclusive)
- def getIncrementalStartTime(commitTime: String, tableVersion: String):
String = {
- if (tableVersion == "6") {
- // v6: open_close - need to use time just before to include the commit
- (commitTime.toLong - 1).toString
- } else {
- // v9: close_close - use the actual commit time
- commitTime
- }
- }
-
// Commit c1 - Initial Insert (10 records with timestamp 1000)
val df1 = Seq(
(1, "val1", 1000L, false),
@@ -2334,7 +2322,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
// - v9: Uses close_close range (both START and END inclusive)
val incrementalDf1 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key,
getIncrementalStartTime(commit1, tableVersion))
+ .option(DataSourceReadOptions.START_COMMIT.key, "000")
.option(DataSourceReadOptions.END_COMMIT.key, commit3)
.load(basePath)
.select("id", "value", "timestamp")
@@ -2370,7 +2358,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
// Should include changes from c3, c4 and c5
val incrementalDf2 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key,
getIncrementalStartTime(commit3, tableVersion))
+ .option(DataSourceReadOptions.START_COMMIT.key, commit2)
.load(basePath) // No END_COMMIT means up to latest
.select("id", "value", "timestamp")
.orderBy("id")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
index 2b73b1155d0b..8986a8ff55f6 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala
@@ -144,6 +144,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
.mode(SaveMode.Append)
.save(basePath)
+ val completionTime2 = DataSourceTestUtils.latestCommitCompletionTime(fs,
basePath)
val snapshotDF2 = spark.read.format("hudi")
.options(readOptions)
.load(basePath)
@@ -179,7 +180,6 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
.save(basePath)
val commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
- val completionTime3 = DataSourceTestUtils.latestCommitCompletionTime(fs,
basePath)
assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
"000").size())
// Snapshot Query
@@ -196,7 +196,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
.options(readOptions)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, completionTime1)
+ .option(DataSourceReadOptions.START_COMMIT.key, "000")
.option(DataSourceReadOptions.END_COMMIT.key, completionTime1)
.load(basePath)
assertEquals(100, hoodieIncViewDF1.count()) // 100 initial inserts must be
pulled
@@ -226,7 +226,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
.options(readOptions)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, completionTime3)
+ .option(DataSourceReadOptions.START_COMMIT.key, completionTime2)
.load(basePath)
assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must
be pulled
@@ -238,7 +238,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
val hoodieIncViewDF3 = spark.read.format("org.apache.hudi")
.options(readOptions)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, completionTime3)
+ .option(DataSourceReadOptions.START_COMMIT.key, completionTime2)
.option(DataSourceReadOptions.INCR_PATH_GLOB.key, if
(isTimestampBasedKeyGen) "/2016*/*" else "/2016/*/*/*")
.load(basePath)
assertEquals(hoodieIncViewDF2
@@ -247,7 +247,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
val timeTravelDF = spark.read.format("org.apache.hudi")
.options(readOptions)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, completionTime1)
+ .option(DataSourceReadOptions.START_COMMIT.key, "000")
.option(DataSourceReadOptions.END_COMMIT.key, completionTime1)
.load(basePath)
assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be
pulled
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
index 6e2304b09b58..7bc7acb98041 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
@@ -19,7 +19,7 @@
package org.apache.hudi.functional
import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceReadOptions,
DataSourceWriteOptions, HoodieFileIndex}
-import org.apache.hudi.DataSourceWriteOptions.{DELETE_OPERATION_OPT_VAL,
ORDERING_FIELDS, RECORDKEY_FIELD}
+import org.apache.hudi.DataSourceWriteOptions.{DELETE_OPERATION_OPT_VAL,
RECORDKEY_FIELD}
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
@@ -722,7 +722,7 @@ class TestColumnStatsIndexWithSQL extends
ColumnStatIndexTestBase {
val numRecordsForSecondQueryWithDataSkipping =
spark.sql(secondQuery).count()
if
(queryType.equals(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
- createIncrementalSQLTable(commonOpts,
metaClient.reloadActiveTimeline().getInstants.get(2).getCompletionTime)
+ createIncrementalSQLTable(commonOpts,
metaClient.reloadActiveTimeline().getInstants.get(1).getCompletionTime)
assertEquals(spark.sql(firstQuery).count(), if (isLastOperationDelete) 0
else 3)
assertEquals(spark.sql(secondQuery).count(), if (isLastOperationDelete)
0 else 2)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index 7fe676065af8..7d5ca083107e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -24,7 +24,7 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig,
HoodieStorageConfig}
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.table.HoodieTableConfig
-import org.apache.hudi.common.table.timeline.{HoodieInstantTimeGenerator,
HoodieTimeline}
+import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieClusteringConfig,
HoodieCompactionConfig, HoodieWriteConfig}
import org.apache.hudi.functional.TestDataSourceForBootstrap.{dropMetaCols,
sort}
import org.apache.hudi.hadoop.fs.HadoopFSUtils
@@ -660,12 +660,11 @@ class TestDataSourceForBootstrap {
assertEquals(1, countsPerCommit.length)
assertEquals(bootstrapCommitInstantTime, countsPerCommit(0).get(0))
- val startCompletionTime =
HoodieInstantTimeGenerator.instantTimePlusMillis(bootstrapCommitCompletionTime,
1)
// incrementally pull only changes after bootstrap commit, which would
pull only the updated records in the
// later commits
val hoodieIncViewDF2 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, startCompletionTime)
+ .option(DataSourceReadOptions.START_COMMIT.key,
bootstrapCommitCompletionTime)
.load(basePath)
assertEquals(numRecordsUpdate, hoodieIncViewDF2.count())
@@ -678,7 +677,7 @@ class TestDataSourceForBootstrap {
// pull the update commits within certain partitions
val hoodieIncViewDF3 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, startCompletionTime)
+ .option(DataSourceReadOptions.START_COMMIT.key,
bootstrapCommitCompletionTime)
.option(DataSourceReadOptions.INCR_PATH_GLOB.key,
relativePartitionPath)
.load(basePath)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
index 714a33a0a576..dc6a8b5d760d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestIncrementalReadWithFullTableScan.scala
@@ -112,10 +112,10 @@ class TestIncrementalReadWithFullTableScan extends
HoodieSparkClientTestBase {
assertTrue(nArchivedInstants >= 3)
//Anything less than 2 is a valid commit in the sense no cleanup has been
done for those commit files
- val startUnarchivedCompletionTs =
completedCommits.nthInstant(1).get().getCompletionTime //C5 completion
+ val startUnarchivedCompletionTs =
completedCommits.nthInstant(0).get().getCompletionTime //C5 completion
val endUnarchivedCompletionTs =
completedCommits.nthInstant(1).get().getCompletionTime //C5 completion
- val startArchivedCompletionTs =
archivedInstants(1).asInstanceOf[HoodieInstant].getCompletionTime //C1
completion
+ val startArchivedCompletionTs =
archivedInstants(0).asInstanceOf[HoodieInstant].getCompletionTime //C1
completion
val endArchivedCompletionTs =
archivedInstants(1).asInstanceOf[HoodieInstant].getCompletionTime //C1
completion
val instant = Instant.now()
@@ -152,7 +152,7 @@ class TestIncrementalReadWithFullTableScan extends
HoodieSparkClientTestBase {
// Test both start commit and end commits is not archived and not cleaned
val reversedCommits = completedCommits.getReverseOrderedInstants.toArray
- val startUncleanedCompletionTs =
reversedCommits.apply(0).asInstanceOf[HoodieInstant].getCompletionTime
+ val startUncleanedCompletionTs =
reversedCommits.apply(1).asInstanceOf[HoodieInstant].getCompletionTime
val endUncleanedCompletionTs =
reversedCommits.apply(0).asInstanceOf[HoodieInstant].getCompletionTime
runIncrementalQueryAndCompare(startUncleanedCompletionTs,
endUncleanedCompletionTs, 1, true)
runIncrementalQueryAndCompare(startUncleanedCompletionTs,
endUncleanedCompletionTs, 1, false)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 03dcf79b3992..1948a49ebf8a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -262,7 +262,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
val hudiIncDF1 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
+ .option(DataSourceReadOptions.START_COMMIT.key, "000")
.option(DataSourceReadOptions.END_COMMIT.key, commit1CompletionTime)
.load(basePath)
assertEquals(100, hudiIncDF1.count())
@@ -274,7 +274,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
val hudiIncDF2 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commit2CompletionTime)
+ .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
.option(DataSourceReadOptions.END_COMMIT.key, commit2CompletionTime)
.load(basePath)
assertEquals(100, hudiIncDF2.count())
@@ -286,7 +286,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
val hudiIncDF3 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
+ .option(DataSourceReadOptions.START_COMMIT.key, "000")
.option(DataSourceReadOptions.END_COMMIT.key, commit2CompletionTime)
.load(basePath)
assertEquals(100, hudiIncDF3.count())
@@ -331,11 +331,6 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
.mode(SaveMode.Append)
.save(basePath)
HoodieTestUtils.validateTableConfig(storage, basePath, expectedConfigs,
nonExistentConfigs)
- val commit3CompletionTime = if
(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
- DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
- } else {
- DataSourceTestUtils.latestCommitRequestTime(storage, basePath)
- }
val hudiSnapshotDF3 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
@@ -356,7 +351,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
val hudiIncDF4 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commit3CompletionTime)
+ .option(DataSourceReadOptions.START_COMMIT.key, commit2CompletionTime)
.load(basePath)
assertEquals(50, hudiIncDF4.count())
@@ -365,7 +360,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
val hudiIncDF4SkipMerge = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
+ .option(DataSourceReadOptions.START_COMMIT.key, "000")
.option(DataSourceReadOptions.REALTIME_MERGE.key,
DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL)
.load(basePath)
assertEquals(250, hudiIncDF4SkipMerge.count())
@@ -400,7 +395,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
val hudiIncDF5 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commit3CompletionTime)
+ .option(DataSourceReadOptions.START_COMMIT.key, commit2CompletionTime)
.load(basePath)
assertEquals(150, hudiIncDF5.count())
}
@@ -415,6 +410,11 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
.options(writeOpts)
.mode(SaveMode.Append)
.save(basePath)
+ val commit5CompletionTime = if
(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+ DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+ } else {
+ DataSourceTestUtils.latestCommitRequestTime(storage, basePath)
+ }
HoodieTestUtils.validateTableConfig(storage, basePath, expectedConfigs,
nonExistentConfigs)
val hudiSnapshotDF5 = spark.read.format("org.apache.hudi")
.options(readOpts)
@@ -448,7 +448,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
val hudiIncDF6 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commit6CompletionTime)
+ .option(DataSourceReadOptions.START_COMMIT.key, commit5CompletionTime)
.option(DataSourceReadOptions.END_COMMIT.key, commit6CompletionTime)
.load(basePath)
// even though compaction updated 150 rows, since preserve commit
metadata is true, they won't be part of incremental query.
@@ -510,6 +510,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
.option(DataSourceWriteOptions.TABLE_TYPE.key,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)
+ val commit1CompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
assertTrue(HoodieDataSourceHelpers.hasNewCommits(storage, basePath, "000"))
val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
.options(readOpts)
@@ -526,7 +527,6 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
.options(writeOpts)
.mode(SaveMode.Append)
.save(basePath)
- val commit2CompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
@@ -550,7 +550,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
val hudiIncDF1 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commit2CompletionTime)
+ .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
.load(basePath)
assertEquals(0, hudiIncDF1.count())
@@ -595,6 +595,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
.option(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key,
classOf[DefaultHoodieRecordPayload].getName)
.mode(SaveMode.Overwrite)
.save(basePath)
+ val commit1CompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
@@ -618,7 +619,6 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
.options(opts)
.mode(SaveMode.Append)
.save(basePath)
- val commit2CompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
@@ -637,7 +637,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
val hudiIncDF2 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, if (tableVersion == 6)
commit1Time else commit2CompletionTime)
+ .option(DataSourceReadOptions.START_COMMIT.key, if (tableVersion == 6)
commit1Time else commit1CompletionTime)
.load(basePath)
// filter first commit and only read log records
@@ -905,12 +905,11 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
.mode(SaveMode.Append)
.save(basePath)
- val commitCompletionTime2 =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
// Incremental query without "*" in path
val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime2)
+ .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime1)
.load(basePath)
assertEquals(N + 1, hoodieIncViewDF1.count())
}
@@ -1300,6 +1299,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
.mode(SaveMode.Append)
.save(basePath)
val commit2Time =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
+ val commit2CompletionTime =
metaClient.getActiveTimeline.lastInstant().get().getCompletionTime
val records3 = recordsToStrings(dataGen2.generateUniqueUpdates("003",
20)).asScala.toSeq
val inputDF3 = spark.read.json(spark.sparkContext.parallelize(records3, 2))
@@ -1308,7 +1308,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
.mode(SaveMode.Append)
.save(basePath)
val commit3Time =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
- val commit3CompletionTime =
metaClient.reloadActiveTimeline.lastInstant().get().getCompletionTime
+ val commit3CompletionTime =
metaClient.getActiveTimeline.lastInstant().get().getCompletionTime
val pathForROQuery = getPathForROQuery(basePath, !enableFileIndex, 3)
// snapshot query
@@ -1332,7 +1332,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
val incrementalQueryRes = spark.read.format("hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commit3CompletionTime)
+ .option(DataSourceReadOptions.START_COMMIT.key, commit2CompletionTime)
.option(DataSourceReadOptions.END_COMMIT.key, commit3CompletionTime)
.load(basePath)
assertEquals(0, incrementalQueryRes.where("partition =
'2022-01-01'").count)
@@ -1980,19 +1980,6 @@ class TestMORDataSource extends
HoodieSparkClientTestBase with SparkDatasetMixin
getCommitTime(tableVersion)
}
- // Helper method to adjust start time for incremental queries based on
version
- // v6: open_close range (START exclusive, END inclusive)
- // v9: close_close range (both START and END inclusive)
- def getIncrementalStartTime(commitTime: String, tableVersion: String):
String = {
- if (tableVersion == "6") {
- // v6: open_close - need to use time just before to include the commit
- (commitTime.toLong - 1).toString
- } else {
- // v9: close_close - use the actual commit time
- commitTime
- }
- }
-
// Commit c1 - Initial Insert (10 records with timestamp 1000)
val df1 = Seq(
(1, "val1", 1000L, false),
@@ -2119,7 +2106,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
// - v9: Uses close_close range (both START and END inclusive)
val incrementalDf1 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key,
getIncrementalStartTime(commit1, tableVersion))
+ .option(DataSourceReadOptions.START_COMMIT.key, "000")
.option(DataSourceReadOptions.END_COMMIT.key, commit3)
.load(basePath)
.select("id", "value", "timestamp")
@@ -2155,7 +2142,7 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
// Should include changes from c3, c4 and c5
val incrementalDf2 = spark.read.format("hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key,
getIncrementalStartTime(commit3, tableVersion))
+ .option(DataSourceReadOptions.START_COMMIT.key, commit2)
.load(basePath) // No END_COMMIT means up to latest
.select("id", "value", "timestamp")
.orderBy("id")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
index 537469bffef5..123d23954efa 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSource.scala
@@ -113,6 +113,7 @@ class TestSparkDataSource extends
SparkClientFunctionalTestHarness {
.mode(SaveMode.Append)
.save(basePath)
val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+ val commitCompletionTime2 =
DataSourceTestUtils.latestCommitCompletionTime(fs, basePath)
val snapshotDf2 = spark.read.format("org.apache.hudi")
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabledOnRead)
@@ -147,7 +148,7 @@ class TestSparkDataSource extends
SparkClientFunctionalTestHarness {
val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath,
"000").get(0)
val hoodieIncViewDf1 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime1)
+ .option(DataSourceReadOptions.START_COMMIT.key, "000")
.option(DataSourceReadOptions.END_COMMIT.key, commitCompletionTime1)
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabledOnRead)
.load(basePath)
@@ -166,7 +167,7 @@ class TestSparkDataSource extends
SparkClientFunctionalTestHarness {
// another incremental query with commit2 and commit3
val hoodieIncViewDf2 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime3)
+ .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime2)
.option(DataSourceReadOptions.END_COMMIT.key(), commitCompletionTime3)
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabledOnRead)
.load(basePath)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
index 834173ad3392..2ae305a8ccc0 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -177,7 +177,7 @@ class TestStructuredStreaming extends
HoodieSparkClientTestBase {
val firstCommit = HoodieDataSourceHelpers.listCommitsSince(storage,
destPath, "000").get(0)
val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime1)
+ .option(DataSourceReadOptions.START_COMMIT.key, "000")
.option(DataSourceReadOptions.END_COMMIT.key, commitCompletionTime1)
.load(destPath)
assertEquals(100, hoodieIncViewDF1.count())
@@ -189,7 +189,7 @@ class TestStructuredStreaming extends
HoodieSparkClientTestBase {
// pull the latest commit
val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime2)
+ .option(DataSourceReadOptions.START_COMMIT.key, commitCompletionTime1)
.load(destPath)
assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must
be pulled
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
index 0bfaa580af72..0f0af4363cce 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestSqlConf.scala
@@ -66,11 +66,11 @@ class TestSqlConf extends HoodieSparkSqlTestBase with
BeforeAndAfter {
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000,
$partitionVal)")
val metaClient = createMetaClient(spark, tablePath)
+ val commitCompletionTime1 =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getCompletionTime
// Then insert another new record
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000,
$partitionVal)")
- val commitCompletionTime2 =
metaClient.getActiveTimeline.filterCompletedInstants().lastInstant().get().getCompletionTime
checkAnswer(s"select id, name, price, ts, year from $tableName")(
Seq(1, "a1", 10.0, 1000, partitionVal),
Seq(2, "a2", 10.0, 1000, partitionVal)
@@ -87,7 +87,7 @@ class TestSqlConf extends HoodieSparkSqlTestBase with
BeforeAndAfter {
// Manually pass incremental configs to global configs to make sure Hudi
query is able to load the
// global configs
DFSPropertiesConfiguration.addToGlobalProps(QUERY_TYPE.key,
QUERY_TYPE_INCREMENTAL_OPT_VAL)
- DFSPropertiesConfiguration.addToGlobalProps(START_COMMIT.key,
commitCompletionTime2)
+ DFSPropertiesConfiguration.addToGlobalProps(START_COMMIT.key,
commitCompletionTime1)
spark.catalog.refreshTable(tableName)
checkAnswer(s"select id, name, price, ts, year from $tableName")(
Seq(2, "a2", 10.0, 1000, partitionVal)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestHoodieTableValuedFunction.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestHoodieTableValuedFunction.scala
index bf4f4eb86d5c..ce1b7cc9849a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestHoodieTableValuedFunction.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestHoodieTableValuedFunction.scala
@@ -129,6 +129,7 @@ class TestHoodieTableValuedFunction extends
HoodieSparkSqlTestBase {
)
val fs = HadoopFSUtils.getFs(tablePath,
spark.sessionState.newHadoopConf())
+ val firstCompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(fs, tablePath)
checkAnswer(
s"""select id,
@@ -150,7 +151,6 @@ class TestHoodieTableValuedFunction extends
HoodieSparkSqlTestBase {
| """.stripMargin
)
val secondCompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(fs, tablePath)
- val secondInstant = spark.sql(s"select max(_hoodie_commit_time) as
commitTime from $tableName order by commitTime").first().getString(0)
checkAnswer(
s"""select id,
@@ -160,7 +160,7 @@ class TestHoodieTableValuedFunction extends
HoodieSparkSqlTestBase {
|from hudi_table_changes(
|'$identifier',
|'latest_state',
- |'$secondCompletionTime')
+ |'$firstCompletionTime')
|""".stripMargin
)(
Seq(1, "a1_1", 10.0, 1100),
@@ -184,7 +184,7 @@ class TestHoodieTableValuedFunction extends
HoodieSparkSqlTestBase {
| from hudi_table_changes(
| '$identifier',
| 'latest_state',
- | '$secondCompletionTime',
+ | '$firstCompletionTime',
| '$secondCompletionTime')
| """.stripMargin
)(
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
index 884d646d6aeb..243ebd93b414 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
@@ -1043,10 +1043,11 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
checkAnswer(s"select id, name, price, _ts from $targetTable")(
Seq(1, "a1", 12, 1001)
)
+ val secondCompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(fs, targetBasePath)
// Test incremental query
val hudiIncDF1 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, firstCompletionTime)
+ .option(DataSourceReadOptions.START_COMMIT.key, "000")
.option(DataSourceReadOptions.END_COMMIT.key, firstCompletionTime)
.load(targetBasePath)
hudiIncDF1.createOrReplaceTempView("inc1")
@@ -1067,11 +1068,10 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase
with ScalaAssertionSuppo
Seq(1, "a1", 12, 1001),
Seq(2, "a2", 10, 1001)
)
- val thirdCompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(fs, targetBasePath)
// Test incremental query
val hudiIncDF2 = spark.read.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, thirdCompletionTime)
+ .option(DataSourceReadOptions.START_COMMIT.key, secondCompletionTime)
.load(targetBasePath)
hudiIncDF2.createOrReplaceTempView("inc2")
checkAnswer(s"select id, name, price, _ts from inc2 order by id")(
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestCDCForSparkSQL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestCDCForSparkSQL.scala
index e316559aa515..a6b3e1654ff1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestCDCForSparkSQL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/TestCDCForSparkSQL.scala
@@ -34,13 +34,13 @@ import org.junit.jupiter.api.Assertions.assertEquals
class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
- def cdcDataFrame(basePath: String, startingTs: Long, endingTs: Option[Long]
= None): DataFrame = {
+ def cdcDataFrame(basePath: String, startingTs: String, endingTs:
Option[String] = None): DataFrame = {
val reader = spark.read.format("hudi")
.option(QUERY_TYPE.key, QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(INCREMENTAL_FORMAT.key, INCREMENTAL_FORMAT_CDC_VAL)
- .option(START_COMMIT.key, startingTs.toString)
+ .option(START_COMMIT.key, startingTs)
endingTs.foreach { ts =>
- reader.option(END_COMMIT.key, ts.toString)
+ reader.option(END_COMMIT.key, ts)
}
reader.load(basePath)
}
@@ -83,12 +83,12 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
assert(spark.sql(s"select _hoodie_file_name from
$tableName").distinct().count() == 2)
val fgForID1 = spark.sql(s"select _hoodie_file_name from $tableName
where id=1").head().get(0)
val commitTime1 =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
- val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1)
+ val cdcDataOnly1 = cdcDataFrame(basePath, "000")
cdcDataOnly1.show(false)
assertCDCOpCnt(cdcDataOnly1, 2, 0, 0)
spark.sql(s"delete from $tableName where id = 1")
- val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1.toLong)
+ val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1)
assertCDCOpCnt(cdcDataOnly2, 0, 0, 1)
assert(spark.sql(s"select _hoodie_file_name from
$tableName").distinct().count() == 1)
assert(!spark.sql(s"select _hoodie_file_name from
$tableName").head().get(0).equals(fgForID1))
@@ -138,7 +138,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
spark.sql(s"insert into $tableName values (1, 'a1', 11, 1000), (2,
'a2', 12, 1000), (3, 'a3', 13, 1000)")
val commitTime1 =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
- val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1)
+ val cdcDataOnly1 = cdcDataFrame(basePath, "000")
cdcDataOnly1.show(false)
assertCDCOpCnt(cdcDataOnly1, 3, 0, 0)
@@ -146,7 +146,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
val commitTime2 =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
// here we use `commitTime1` to query the change data in commit 2.
// because `commitTime2` is maybe the ts of the compaction
operation, not the write operation.
- val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1.toLong)
+ val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1)
cdcDataOnly2.show(false)
assertCDCOpCnt(cdcDataOnly2, 0, 1, 0)
@@ -168,13 +168,13 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
spark.sql(s"update $tableName set name = 'a2_v2', ts = 1200 where id
= 2")
val commitTime3 =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
- val cdcDataOnly3 = cdcDataFrame(basePath, commitTime2.toLong)
+ val cdcDataOnly3 = cdcDataFrame(basePath, commitTime2)
cdcDataOnly3.show(false)
assertCDCOpCnt(cdcDataOnly3, 0, 1, 0)
spark.sql(s"delete from $tableName where id = 3")
val commitTime4 =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
- val cdcDataOnly4 = cdcDataFrame(basePath, commitTime3.toLong)
+ val cdcDataOnly4 = cdcDataFrame(basePath, commitTime3)
cdcDataOnly4.show(false)
assertCDCOpCnt(cdcDataOnly4, 0, 0, 1)
@@ -192,8 +192,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
| when matched then update set id = s0.id, name = s0.name,
price = s0.price, ts = s0.ts
| when not matched then insert *
""".stripMargin)
- val commitTime5 =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
- val cdcDataOnly5 = cdcDataFrame(basePath, commitTime4.toLong)
+ val cdcDataOnly5 = cdcDataFrame(basePath, commitTime4)
cdcDataOnly5.show(false)
assertCDCOpCnt(cdcDataOnly5, 1, 1, 0)
@@ -215,7 +214,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
Seq("i", 4, null, null, "a4", 14)
)
- val totalCdcData = cdcDataFrame(basePath, commitTime1.toLong - 1)
+ val totalCdcData = cdcDataFrame(basePath, "000")
assertCDCOpCnt(totalCdcData, 4, 3, 1)
}
}
@@ -266,19 +265,19 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
| (3, 'a3', 13, 1000, '2022')
""".stripMargin)
val commitTime1 =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
- val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1)
+ val cdcDataOnly1 = cdcDataFrame(basePath, "000")
cdcDataOnly1.show(false)
assertCDCOpCnt(cdcDataOnly1, 3, 0, 0)
spark.sql(s"insert overwrite table $tableName partition (pt =
'2021') values (1, 'a1_v2', 11, 1100)")
val commitTime2 =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
- val cdcDataOnly2 = cdcDataFrame(basePath, commitTime2.toLong - 1)
+ val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1)
cdcDataOnly2.show(false)
assertCDCOpCnt(cdcDataOnly2, 1, 0, 1)
spark.sql(s"update $tableName set name = 'a2_v2', ts = 1200 where id
= 2")
val commitTime3 =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
- val cdcDataOnly3 = cdcDataFrame(basePath, commitTime3.toLong - 1)
+ val cdcDataOnly3 = cdcDataFrame(basePath, commitTime2)
cdcDataOnly3.show(false)
assertCDCOpCnt(cdcDataOnly3, 0, 1, 0)
@@ -296,12 +295,11 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
| when matched then update set id = s0.id, name = s0.name,
price = s0.price, ts = s0.ts, pt = s0.pt
| when not matched then insert *
""".stripMargin)
- val commitTime4 =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
- val cdcDataOnly4 = cdcDataFrame(basePath, commitTime4.toLong - 1)
+ val cdcDataOnly4 = cdcDataFrame(basePath, commitTime3)
cdcDataOnly4.show(false)
assertCDCOpCnt(cdcDataOnly4, 1, 1, 0)
- val totalCdcData = cdcDataFrame(basePath, commitTime1.toLong - 1)
+ val totalCdcData = cdcDataFrame(basePath, "000")
assertCDCOpCnt(totalCdcData, 5, 2, 1)
}
}
@@ -350,19 +348,19 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
| (3, 'a3', 13, 1000, '2022')
""".stripMargin)
val commitTime1 =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
- val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1)
+ val cdcDataOnly1 = cdcDataFrame(basePath, "000")
cdcDataOnly1.show(false)
assertCDCOpCnt(cdcDataOnly1, 3, 0, 0)
spark.sql(s"insert overwrite table $tableName partition (pt =
'2021') values (1, 'a1_v2', 11, 1100)")
val commitTime2 =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
- val cdcDataOnly2 = cdcDataFrame(basePath, commitTime2.toLong - 1)
+ val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1)
cdcDataOnly2.show(false)
assertCDCOpCnt(cdcDataOnly2, 1, 0, 1)
spark.sql(s"update $tableName set name = 'a2_v2', ts = 1200 where id
= 2")
val commitTime3 =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
- val cdcDataOnly3 = cdcDataFrame(basePath, commitTime3.toLong - 1)
+ val cdcDataOnly3 = cdcDataFrame(basePath, commitTime2)
cdcDataOnly3.show(false)
assertCDCOpCnt(cdcDataOnly3, 0, 1, 0)
@@ -377,12 +375,11 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
| on s0.id = $tableName.id
| when matched then update set id = s0.id, name = s0.name, ts =
s0.ts, pt = s0.pt
""".stripMargin)
- val commitTime4 =
metaClient.reloadActiveTimeline.lastInstant().get().requestedTime
- val cdcDataOnly4 = cdcDataFrame(basePath, commitTime4.toLong - 1)
+ val cdcDataOnly4 = cdcDataFrame(basePath, commitTime3)
cdcDataOnly4.show(false)
assertCDCOpCnt(cdcDataOnly4, 0, 1, 0)
- val totalCdcData = cdcDataFrame(basePath, commitTime1.toLong - 1)
+ val totalCdcData = cdcDataFrame(basePath, "000")
assertCDCOpCnt(totalCdcData, 4, 2, 1)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
index f6fcab7ca985..abf0b98ac2c3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCopyToTableProcedure.scala
@@ -193,8 +193,8 @@ class TestCopyToTableProcedure extends
HoodieSparkProcedureTestBase {
// mark startCompletionTime
val fs = HadoopFSUtils.getFs(tablePath,
spark.sessionState.newHadoopConf())
- spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
val startCompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(fs, tablePath)
+ spark.sql(s"insert into $tableName select 3, 'a3', 30, 2000")
spark.sql(s"insert into $tableName select 4, 'a4', 40, 2500")
val endCompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(fs, tablePath)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index 6e82a856a91a..a73e7412214c 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -26,9 +26,9 @@ import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
-import
org.apache.hudi.common.table.checkpoint.UnresolvedStreamerCheckpointBasedOnCfg;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
+import
org.apache.hudi.common.table.checkpoint.UnresolvedStreamerCheckpointBasedOnCfg;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer;
import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer.QueryContext;
@@ -287,18 +287,19 @@ public class HoodieIncrSource extends RowSource {
.filter(String.format("%s IN ('%s')",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
String.join("','", instantTimeList)));
} else {
- // normal incremental query
- TimelineLayout layout =
TimelineLayout.fromVersion(queryContext.getActiveTimeline().getTimelineLayoutVersion());
- String inclusiveStartCompletionTime = queryContext.getInstants().stream()
- .min(layout.getInstantComparator().completionTimeOrderedComparator())
+ String exclusiveStartCompletionTime =
analyzer.getStartCompletionTime().isPresent()
+ ? analyzer.getStartCompletionTime().get()
+ : String.valueOf(Long.parseLong(queryContext.getInstants().stream()
+
.min(TimelineLayout.fromVersion(queryContext.getActiveTimeline().getTimelineLayoutVersion())
+ .getInstantComparator().completionTimeOrderedComparator())
.map(HoodieInstant::getCompletionTime)
- .get();
-
+ .get()) - 1);
+ // normal incremental query
source = reader
.options(readOpts)
.option(QUERY_TYPE().key(), QUERY_TYPE_INCREMENTAL_OPT_VAL())
.option(INCREMENTAL_READ_TABLE_VERSION().key(),
HoodieTableVersion.EIGHT.versionCode())
- .option(START_COMMIT().key(), inclusiveStartCompletionTime)
+ .option(START_COMMIT().key(), exclusiveStartCompletionTime)
.option(END_COMMIT().key(), endCompletionTime)
.option(INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(),
props.getString(INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key(),