This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ea59d60a86d1 fix(spark): validate and normalize incremental start/end
instants (#18426)
ea59d60a86d1 is described below
commit ea59d60a86d19345bda28d601f68c1e9284d97a6
Author: yaojiejia <[email protected]>
AuthorDate: Wed Apr 1 01:03:23 2026 -0400
fix(spark): validate and normalize incremental start/end instants (#18426)
* init fixes
* fixes cicd test errors
---
.../scala/org/apache/hudi/DataSourceOptions.scala | 15 +-
.../plans/logical/HoodieTableChanges.scala | 7 +-
.../spark/sql/hudi/HoodieSqlCommonUtils.scala | 71 +++++-
.../hudi/common/TestInstantTimeValidation.scala | 278 +++++++++++++++++++++
4 files changed, 353 insertions(+), 18 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 111630b3694c..cd76092a7c12 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -36,6 +36,7 @@ import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.util.{JFunction, SparkConfigUtils}
import org.apache.spark.sql.execution.datasources.{DataSourceUtils =>
SparkDataSourceUtils}
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
@@ -1131,9 +1132,17 @@ object DataSourceOptionsHelper {
.map(is => if (is.toBoolean) QUERY_TYPE_READ_OPTIMIZED_OPT_VAL else
QUERY_TYPE_SNAPSHOT_OPT_VAL)
.getOrElse(paramsWithGlobalProps.getOrElse(QUERY_TYPE.key,
QUERY_TYPE.defaultValue()))
- Map(
- QUERY_TYPE.key -> queryType
- ) ++ translateConfigurations(paramsWithGlobalProps)
+ val translatedParams = translateConfigurations(paramsWithGlobalProps)
+
+ val startCommitKey = DataSourceReadOptions.START_COMMIT.key
+ val endCommitKey = DataSourceReadOptions.END_COMMIT.key
+ val normalizedStartCommit = translatedParams.get(startCommitKey)
+ .map(v => startCommitKey ->
HoodieSqlCommonUtils.formatIncrementalInstant(v))
+ val normalizedEndCommit = translatedParams.get(endCommitKey)
+ .map(v => endCommitKey ->
HoodieSqlCommonUtils.formatIncrementalInstant(v))
+
+ Map(QUERY_TYPE.key -> queryType) ++ translatedParams ++
+ normalizedStartCommit ++ normalizedEndCommit
}
def inferKeyGenClazz(props: TypedProperties): String = {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/HoodieTableChanges.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/HoodieTableChanges.scala
index 8ded54a641ff..f2a8eaa5443e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/HoodieTableChanges.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/HoodieTableChanges.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
import org.apache.spark.sql.hudi.command.exception.HoodieAnalysisException
object HoodieTableChangesOptionsParser {
@@ -43,11 +44,13 @@ object HoodieTableChangesOptionsParser {
val startInstantTimeOpt = startInstantTime match {
case "earliest" => Map("hoodie.datasource.read.begin.instanttime" ->
"000")
- case _ => Map("hoodie.datasource.read.begin.instanttime" ->
startInstantTime)
+ case _ => Map("hoodie.datasource.read.begin.instanttime" ->
+ HoodieSqlCommonUtils.formatIncrementalInstant(startInstantTime))
}
val endInstantTimeOpt = endInstantTime match {
- case Some(x) => Map("hoodie.datasource.read.end.instanttime" -> x)
+ case Some(x) => Map("hoodie.datasource.read.end.instanttime" ->
+ HoodieSqlCommonUtils.formatIncrementalInstant(x))
case None => Map.empty[String, String]
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
index 451dd60d79de..1462ceab78dd 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig,
TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
+import org.apache.hudi.common.table.read.IncrementalQueryAnalyzer
import org.apache.hudi.common.table.timeline.{HoodieInstantTimeGenerator,
HoodieTimeline, TimelineUtils}
import
org.apache.hudi.common.table.timeline.TimelineUtils.parseDateFromInstantTime
import org.apache.hudi.common.util.PartitionPathEncodeUtils
@@ -258,29 +259,73 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
def isUsingHiveCatalog(sparkSession: SparkSession): Boolean =
sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION) ==
"hive"
+ private val SUPPORTED_FORMATS_MSG: String =
+ "Supported time formats are: 'yyyyMMddHHmmss[SSS]', 'yyyy-MM-dd', " +
+ "'yyyy-MM-dd HH:mm:ss[.SSS]', 'yyyy-MM-ddTHH:mm:ss[.SSS]', " +
+ "epoch seconds (10-digit number), or epoch millis (13-digit number)"
+
+ private def isAllDigits(s: String): Boolean = s.nonEmpty &&
s.forall(_.isDigit)
+
/**
- * Convert different query instant time format to the commit time format.
- * Currently we support three kinds of instant time format for time travel
query:
- * 1、yyyy-MM-dd HH:mm:ss
- * 2、yyyy-MM-dd
- * This will convert to 'yyyyMMdd000000'.
- * 3、yyyyMMddHHmmss
+ * Convert different query instant time format to the Hudi commit time
format.
+ * Supported formats:
+ * - yyyyMMddHHmmss or yyyyMMddHHmmssSSS (Hudi native)
+ * - yyyy-MM-dd (ISO date)
+ * - yyyy-MM-dd HH:mm:ss[.SSS] (ISO datetime with space separator)
+ * - yyyy-MM-ddTHH:mm:ss[.SSS] (ISO datetime with T separator)
+ * - 10-digit all-numeric string (epoch seconds)
+ * - 13-digit all-numeric string (epoch millis)
*/
def formatQueryInstant(queryInstant: String): String = {
val instantLength = queryInstant.length
- if (instantLength == 19 || instantLength == 23) {
- // Handle "yyyy-MM-dd HH:mm:ss[.SSS]" format
+ if (instantLength >= 19 && instantLength <= 23 &&
queryInstant.contains("T")) {
+
HoodieInstantTimeGenerator.getInstantForDateString(queryInstant.replace('T', '
'))
+ } else if (instantLength == 19 || instantLength == 23) {
HoodieInstantTimeGenerator.getInstantForDateString(queryInstant)
} else if (instantLength ==
HoodieInstantTimeGenerator.SECS_INSTANT_ID_LENGTH
- || instantLength ==
HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) {
- // Handle already serialized "yyyyMMddHHmmss[SSS]" format
+ || instantLength == HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH)
{
validateInstant(queryInstant)
queryInstant
- } else if (instantLength == 10) { // for yyyy-MM-dd
+ } else if (instantLength == 10 && !isAllDigits(queryInstant)) {
TimelineUtils.formatDate(defaultDateFormat.get().parse(queryInstant))
+ } else if (instantLength == 10 && isAllDigits(queryInstant)) {
+ TimelineUtils.formatDate(new java.util.Date(queryInstant.toLong * 1000L))
+ } else if (instantLength == 13 && isAllDigits(queryInstant)) {
+ TimelineUtils.formatDate(new java.util.Date(queryInstant.toLong))
+ } else {
+ throw new IllegalArgumentException(
+ s"Unsupported query instant time format: $queryInstant.
$SUPPORTED_FORMATS_MSG")
+ }
+ }
+
+ private val INCREMENTAL_SENTINEL_VALUES: Set[String] = Set(
+ IncrementalQueryAnalyzer.START_COMMIT_EARLIEST,
+ "000",
+ HoodieTimeline.INIT_INSTANT_TS,
+ HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
+ HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS
+ )
+
+ private def isLegacyZeroPrefixedInstant(instantValue: String): Boolean = {
+ instantValue.nonEmpty &&
+ instantValue.length < HoodieInstantTimeGenerator.SECS_INSTANT_ID_LENGTH
&&
+ instantValue.startsWith("0") &&
+ isAllDigits(instantValue)
+ }
+
+ /**
+ * Validate and normalize an incremental query instant (begin or end).
+ * Allows sentinel values like "earliest" and "000" to pass through
unchanged,
+ * and delegates all other values to [[formatQueryInstant]] for format
validation
+ * and normalization to the Hudi commit time format.
+ */
+ def formatIncrementalInstant(instantValue: String): String = {
+ if
(INCREMENTAL_SENTINEL_VALUES.contains(instantValue.toLowerCase(Locale.ROOT))
+ || INCREMENTAL_SENTINEL_VALUES.contains(instantValue)
+ || isLegacyZeroPrefixedInstant(instantValue)) {
+ instantValue
} else {
- throw new IllegalArgumentException(s"Unsupported query instant time
format: $queryInstant,"
- + s"Supported time format are: 'yyyy-MM-dd: HH:mm:ss.SSS' or
'yyyy-MM-dd' or 'yyyyMMddHHmmssSSS'")
+ formatQueryInstant(instantValue)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestInstantTimeValidation.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestInstantTimeValidation.scala
new file mode 100644
index 000000000000..2c64c4ca7915
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestInstantTimeValidation.scala
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.common
+
+import org.apache.hudi.{DataSourceOptionsHelper, DataSourceReadOptions}
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
+import org.junit.jupiter.api.Test
+
+class TestInstantTimeValidation {
+
+ // --- formatQueryInstant: Hudi native formats ---
+
+ @Test
+ def testFormatQueryInstantWithHudiMillisFormat(): Unit = {
+ val result = HoodieSqlCommonUtils.formatQueryInstant("20250101080000000")
+ assertEquals("20250101080000000", result)
+ }
+
+ @Test
+ def testFormatQueryInstantWithHudiSecsFormat(): Unit = {
+ val result = HoodieSqlCommonUtils.formatQueryInstant("20250101080000")
+ assertEquals("20250101080000", result)
+ }
+
+ // --- formatQueryInstant: ISO date formats ---
+
+ @Test
+ def testFormatQueryInstantWithIsoDate(): Unit = {
+ val result = HoodieSqlCommonUtils.formatQueryInstant("2025-01-01")
+ assertTrue(result.startsWith("20250101"), s"Expected result to start with
20250101 but got: $result")
+ }
+
+ @Test
+ def testFormatQueryInstantWithIsoDateTimeSpace(): Unit = {
+ val result = HoodieSqlCommonUtils.formatQueryInstant("2025-01-02 03:04:56")
+ assertTrue(result.startsWith("20250102030456"), s"Expected result to start
with 20250102030456 but got: $result")
+ }
+
+ @Test
+ def testFormatQueryInstantWithIsoDateTimeSpaceMillis(): Unit = {
+ val result = HoodieSqlCommonUtils.formatQueryInstant("2025-01-02
03:04:56.789")
+ assertEquals("20250102030456789", result)
+ }
+
+ @Test
+ def testFormatQueryInstantWithIsoDateTimeTSeparator(): Unit = {
+ val result = HoodieSqlCommonUtils.formatQueryInstant("2025-01-02T03:04:56")
+ assertTrue(result.startsWith("20250102030456"), s"Expected result to start
with 20250102030456 but got: $result")
+ }
+
+ @Test
+ def testFormatQueryInstantWithIsoDateTimeTSeparatorMillis(): Unit = {
+ val result =
HoodieSqlCommonUtils.formatQueryInstant("2025-01-02T03:04:56.789")
+ assertEquals("20250102030456789", result)
+ }
+
+ // --- formatQueryInstant: epoch formats ---
+
+ @Test
+ def testFormatQueryInstantWithEpochSeconds(): Unit = {
+ val result = HoodieSqlCommonUtils.formatQueryInstant("1735689600")
+ assertTrue(result.matches("\\d{17}"), s"Expected 17-digit Hudi instant but
got: $result")
+ }
+
+ @Test
+ def testFormatQueryInstantWithEpochMillis(): Unit = {
+ val result = HoodieSqlCommonUtils.formatQueryInstant("1735689600000")
+ assertTrue(result.matches("\\d{17}"), s"Expected 17-digit Hudi instant but
got: $result")
+ }
+
+ @Test
+ def testFormatQueryInstantEpochSecsAndMillisProduceSameResult(): Unit = {
+ val fromSecs = HoodieSqlCommonUtils.formatQueryInstant("1735689600")
+ val fromMillis = HoodieSqlCommonUtils.formatQueryInstant("1735689600000")
+ assertEquals(fromSecs, fromMillis)
+ }
+
+ // --- formatQueryInstant: rejection of invalid inputs ---
+
+ @Test
+ def testFormatQueryInstantRejectsShortNumber(): Unit = {
+ assertThrows(classOf[IllegalArgumentException], () => {
+ HoodieSqlCommonUtils.formatQueryInstant("42")
+ })
+ }
+
+ @Test
+ def testFormatQueryInstantRejectsRandomString(): Unit = {
+ assertThrows(classOf[IllegalArgumentException], () => {
+ HoodieSqlCommonUtils.formatQueryInstant("abc")
+ })
+ }
+
+ @Test
+ def testFormatQueryInstantRejectsNegativeNumber(): Unit = {
+ assertThrows(classOf[IllegalArgumentException], () => {
+ HoodieSqlCommonUtils.formatQueryInstant("-1")
+ })
+ }
+
+ @Test
+ def testFormatQueryInstantRejectsFiveDigitNumber(): Unit = {
+ assertThrows(classOf[IllegalArgumentException], () => {
+ HoodieSqlCommonUtils.formatQueryInstant("12345")
+ })
+ }
+
+ // --- formatIncrementalInstant: sentinel pass-through ---
+
+ @Test
+ def testFormatIncrementalInstantPassesThroughEarliest(): Unit = {
+ assertEquals("earliest",
HoodieSqlCommonUtils.formatIncrementalInstant("earliest"))
+ }
+
+ @Test
+ def testFormatIncrementalInstantPassesThroughZeroSentinel(): Unit = {
+ assertEquals("000", HoodieSqlCommonUtils.formatIncrementalInstant("000"))
+ }
+
+ @Test
+ def testFormatIncrementalInstantPassesThroughLegacyZeroPrefixedInstants():
Unit = {
+ assertEquals("0", HoodieSqlCommonUtils.formatIncrementalInstant("0"))
+ assertEquals("001", HoodieSqlCommonUtils.formatIncrementalInstant("001"))
+ assertEquals("0000", HoodieSqlCommonUtils.formatIncrementalInstant("0000"))
+ }
+
+ @Test
+ def testFormatIncrementalInstantPassesThroughBootstrapInstants(): Unit = {
+ assertEquals(HoodieTimeline.INIT_INSTANT_TS,
+
HoodieSqlCommonUtils.formatIncrementalInstant(HoodieTimeline.INIT_INSTANT_TS))
+ assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
+
HoodieSqlCommonUtils.formatIncrementalInstant(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS))
+ assertEquals(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
+
HoodieSqlCommonUtils.formatIncrementalInstant(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS))
+ }
+
+ // --- formatIncrementalInstant: normalization ---
+
+ @Test
+ def testFormatIncrementalInstantNormalizesHudiFormat(): Unit = {
+ assertEquals("20250901080000",
HoodieSqlCommonUtils.formatIncrementalInstant("20250901080000"))
+ }
+
+ @Test
+ def testFormatIncrementalInstantNormalizesIsoDate(): Unit = {
+ val result = HoodieSqlCommonUtils.formatIncrementalInstant("2025-01-02")
+ assertTrue(result.startsWith("20250102"), s"Expected result to start with
20250102 but got: $result")
+ }
+
+ @Test
+ def testFormatIncrementalInstantNormalizesEpochSeconds(): Unit = {
+ val result = HoodieSqlCommonUtils.formatIncrementalInstant("1735689600")
+ assertTrue(result.matches("\\d{17}"), s"Expected 17-digit Hudi instant but
got: $result")
+ }
+
+ @Test
+ def testFormatIncrementalInstantNormalizesEpochMillis(): Unit = {
+ val result = HoodieSqlCommonUtils.formatIncrementalInstant("1735689600000")
+ assertTrue(result.matches("\\d{17}"), s"Expected 17-digit Hudi instant but
got: $result")
+ }
+
+ // --- formatIncrementalInstant: rejection ---
+
+ @Test
+ def testFormatIncrementalInstantRejectsShortNumber(): Unit = {
+ assertThrows(classOf[IllegalArgumentException], () => {
+ HoodieSqlCommonUtils.formatIncrementalInstant("42")
+ })
+ }
+
+ @Test
+ def testFormatIncrementalInstantRejectsRandomString(): Unit = {
+ assertThrows(classOf[IllegalArgumentException], () => {
+ HoodieSqlCommonUtils.formatIncrementalInstant("not_a_timestamp")
+ })
+ }
+
+ // --- parametersWithReadDefaults: incremental timestamp normalization ---
+
+ @Test
+ def testReadDefaultsNormalizesStartCommitIsoDate(): Unit = {
+ val params = DataSourceOptionsHelper.parametersWithReadDefaults(Map(
+ DataSourceReadOptions.START_COMMIT.key -> "2025-01-02"
+ ))
+ val result = params(DataSourceReadOptions.START_COMMIT.key)
+ assertTrue(result.startsWith("20250102"), s"Expected normalized instant
starting with 20250102, got: $result")
+ }
+
+ @Test
+ def testReadDefaultsNormalizesEndCommitIsoDateTimeT(): Unit = {
+ val params = DataSourceOptionsHelper.parametersWithReadDefaults(Map(
+ DataSourceReadOptions.END_COMMIT.key -> "2025-06-15T10:30:00.123"
+ ))
+ assertEquals("20250615103000123",
params(DataSourceReadOptions.END_COMMIT.key))
+ }
+
+ @Test
+ def testReadDefaultsAllowsEarliestSentinel(): Unit = {
+ val params = DataSourceOptionsHelper.parametersWithReadDefaults(Map(
+ DataSourceReadOptions.START_COMMIT.key -> "earliest"
+ ))
+ assertEquals("earliest", params(DataSourceReadOptions.START_COMMIT.key))
+ }
+
+ @Test
+ def testReadDefaultsAllowsLegacyZeroPrefixedInstants(): Unit = {
+ val params = DataSourceOptionsHelper.parametersWithReadDefaults(Map(
+ DataSourceReadOptions.START_COMMIT.key -> "0",
+ DataSourceReadOptions.END_COMMIT.key -> "001"
+ ))
+ assertEquals("0", params(DataSourceReadOptions.START_COMMIT.key))
+ assertEquals("001", params(DataSourceReadOptions.END_COMMIT.key))
+ }
+
+ @Test
+ def testReadDefaultsRejectsInvalidStartCommit(): Unit = {
+ assertThrows(classOf[IllegalArgumentException], () => {
+ DataSourceOptionsHelper.parametersWithReadDefaults(Map(
+ DataSourceReadOptions.START_COMMIT.key -> "42"
+ ))
+ })
+ }
+
+ @Test
+ def testReadDefaultsRejectsInvalidEndCommit(): Unit = {
+ assertThrows(classOf[IllegalArgumentException], () => {
+ DataSourceOptionsHelper.parametersWithReadDefaults(Map(
+ DataSourceReadOptions.END_COMMIT.key -> "abc"
+ ))
+ })
+ }
+
+ @Test
+ def testReadDefaultsNormalizesEpochSeconds(): Unit = {
+ val params = DataSourceOptionsHelper.parametersWithReadDefaults(Map(
+ DataSourceReadOptions.START_COMMIT.key -> "1735689600"
+ ))
+ val result = params(DataSourceReadOptions.START_COMMIT.key)
+ assertTrue(result.matches("\\d{17}"), s"Expected 17-digit Hudi instant but
got: $result")
+ }
+
+ @Test
+ def testReadDefaultsPassesThroughValidHudiInstant(): Unit = {
+ val params = DataSourceOptionsHelper.parametersWithReadDefaults(Map(
+ DataSourceReadOptions.START_COMMIT.key -> "20250901080000000"
+ ))
+ assertEquals("20250901080000000",
params(DataSourceReadOptions.START_COMMIT.key))
+ }
+
+ @Test
+ def testReadDefaultsDoesNotAffectMissingTimestamps(): Unit = {
+ val params = DataSourceOptionsHelper.parametersWithReadDefaults(Map(
+ "hoodie.datasource.query.type" -> "incremental"
+ ))
+ assertTrue(!params.contains(DataSourceReadOptions.START_COMMIT.key))
+ assertTrue(!params.contains(DataSourceReadOptions.END_COMMIT.key))
+ }
+}