This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 62ada4562 [spark] Add startup mode for batch read (#2532)
62ada4562 is described below
commit 62ada456250b1d987f02255e865ecedbcee1e48e
Author: Yang Zhang <[email protected]>
AuthorDate: Thu Feb 5 12:22:10 2026 +0800
[spark] Add startup mode for batch read (#2532)
---
.../org/apache/fluss/spark/SparkFlussConf.scala | 34 +++++---
.../scala/org/apache/fluss/spark/SparkTable.scala | 21 +++--
.../spark/read/FlussAppendPartitionReader.scala | 51 ++++++------
.../org/apache/fluss/spark/read/FlussBatch.scala | 95 +++++++++++++++++++---
.../fluss/spark/read/FlussInputPartition.scala | 3 +-
.../fluss/spark/read/FlussOffsetInitializers.scala | 56 +++++++++++++
.../fluss/spark/read/FlussPartitionReader.scala | 8 +-
.../fluss/spark/SparkPrimaryKeyTableReadTest.scala | 18 +++-
8 files changed, 227 insertions(+), 59 deletions(-)
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
index f71e2c229..28fb633b5 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
@@ -17,23 +17,37 @@
package org.apache.fluss.spark
+import org.apache.fluss.config.{ConfigBuilder, ConfigOption}
import org.apache.fluss.config.ConfigBuilder.key
-import org.apache.fluss.config.ConfigOption
-import org.apache.spark.sql.internal.SQLConf.buildConf
+import java.time.Duration
object SparkFlussConf {
- val READ_OPTIMIZED = buildConf("spark.sql.fluss.readOptimized")
- .internal()
- .doc("If true, Spark will only read data from data lake snapshot or kv
snapshot, not execute merge them with log changes. This is a temporary
configuration that will be deprecated when read-optimized table(e.g.
`mytbl$ro`) is supported.")
- .booleanConf
- .createWithDefault(false)
+ val SPARK_FLUSS_CONF_PREFIX = "spark.sql.fluss."
val READ_OPTIMIZED_OPTION: ConfigOption[java.lang.Boolean] =
- key(READ_OPTIMIZED.key)
+ key("read.optimized")
.booleanType()
- .defaultValue(READ_OPTIMIZED.defaultValue.get)
- .withDescription(READ_OPTIMIZED.doc)
+ .defaultValue(false)
+ .withDescription(
+ "If true, Spark will only read data from data lake snapshot or kv
snapshot, not execute merge them with log changes. This is a temporary
configuration that will be deprecated when read-optimized table(e.g.
`mytbl$ro`) is supported.")
+ object StartUpMode extends Enumeration {
+ val FULL, EARLIEST, LATEST, TIMESTAMP = Value
+ }
+
+ val SCAN_START_UP_MODE: ConfigOption[String] =
+ ConfigBuilder
+ .key("scan.startup.mode")
+ .stringType()
+ .defaultValue(StartUpMode.FULL.toString)
+ .withDescription("The start up mode when read Fluss table.")
+
+ val SCAN_POLL_TIMEOUT: ConfigOption[Duration] =
+ ConfigBuilder
+ .key("scan.poll.timeout")
+ .durationType()
+ .defaultValue(Duration.ofMillis(10000L))
+ .withDescription("The timeout for log scanner to poll records.")
}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
index 53ed8a091..144db03ae 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
@@ -20,7 +20,6 @@ package org.apache.fluss.spark
import org.apache.fluss.client.admin.Admin
import org.apache.fluss.config.{Configuration => FlussConfiguration}
import org.apache.fluss.metadata.{TableInfo, TablePath}
-import org.apache.fluss.spark.SparkFlussConf.READ_OPTIMIZED
import org.apache.fluss.spark.catalog.{AbstractSparkTable,
SupportsFlussPartitionManagement}
import org.apache.fluss.spark.read.{FlussAppendScanBuilder,
FlussUpsertScanBuilder}
import org.apache.fluss.spark.write.{FlussAppendWriteBuilder,
FlussUpsertWriteBuilder}
@@ -42,7 +41,17 @@ class SparkTable(
with SupportsWrite
with SQLConfHelper {
+ private def populateSparkConf(flussConfig: FlussConfiguration): Unit = {
+ conf.getAllConfs
+ .filter(_._1.startsWith(SparkFlussConf.SPARK_FLUSS_CONF_PREFIX))
+ .foreach {
+ case (k, v) =>
+
flussConfig.setString(k.substring(SparkFlussConf.SPARK_FLUSS_CONF_PREFIX.length),
v)
+ }
+ }
+
override def newWriteBuilder(logicalWriteInfo: LogicalWriteInfo):
WriteBuilder = {
+ populateSparkConf(flussConfig)
if (tableInfo.getPrimaryKeys.isEmpty) {
new FlussAppendWriteBuilder(tablePath, logicalWriteInfo.schema(),
flussConfig)
} else {
@@ -51,17 +60,11 @@ class SparkTable(
}
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
+ populateSparkConf(flussConfig)
if (tableInfo.getPrimaryKeys.isEmpty) {
new FlussAppendScanBuilder(tablePath, tableInfo, options, flussConfig)
} else {
- val newFlussConfig = if (conf.getConf(SparkFlussConf.READ_OPTIMIZED,
false)) {
- val newFlussConfig_ = new FlussConfiguration(flussConfig)
- newFlussConfig_.setBoolean(SparkFlussConf.READ_OPTIMIZED_OPTION.key(),
true)
- newFlussConfig_
- } else {
- flussConfig
- }
- new FlussUpsertScanBuilder(tablePath, tableInfo, options, newFlussConfig)
+ new FlussUpsertScanBuilder(tablePath, tableInfo, options, flussConfig)
}
}
}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
index 075e0a275..54570285a 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
@@ -18,7 +18,6 @@
package org.apache.fluss.spark.read
import org.apache.fluss.client.table.scanner.ScanRecord
-import org.apache.fluss.client.table.scanner.log.ScanRecords
import org.apache.fluss.config.Configuration
import org.apache.fluss.metadata.{TableBucket, TablePath}
@@ -36,41 +35,41 @@ class FlussAppendPartitionReader(
private val logScanner =
table.newScan().project(projection).createLogScanner()
// Iterator for current batch of records
- private var currentRecords: java.util.Iterator[ScanRecord] = _
+ private var currentRecords: java.util.Iterator[ScanRecord] =
java.util.Collections.emptyIterator()
+
+ // The latest offset of fluss is -2
+ private var currentOffset: Long = flussPartition.startOffset.max(0L)
// initialize log scanner
initialize()
- override def next(): Boolean = {
- if (closed) {
- return false
- }
-
- // If we have records in current batch, return next one
- if (currentRecords != null && currentRecords.hasNext) {
- val scanRecord = currentRecords.next()
- currentRow = convertToSparkRow(scanRecord)
- return true
- }
-
- // Poll for more records
+ private def pollMoreRecords(): Unit = {
val scanRecords = logScanner.poll(POLL_TIMEOUT)
+ if ((scanRecords == null || scanRecords.isEmpty) && currentOffset <
flussPartition.stopOffset) {
+ throw new IllegalStateException(s"No more data from fluss server," +
+ s" but current offset $currentOffset not reach the stop offset
${flussPartition.stopOffset}")
+ }
+ currentRecords = scanRecords.records(tableBucket).iterator()
+ }
- if (scanRecords == null || scanRecords.isEmpty) {
+ override def next(): Boolean = {
+ if (closed || currentOffset >= flussPartition.stopOffset) {
return false
}
- // Get records for our bucket
- val bucketRecords = scanRecords.records(tableBucket)
- if (bucketRecords.isEmpty) {
- return false
+ if (!currentRecords.hasNext) {
+ pollMoreRecords()
}
- currentRecords = bucketRecords.iterator()
+ // If we have records in current batch, return next one
if (currentRecords.hasNext) {
val scanRecord = currentRecords.next()
currentRow = convertToSparkRow(scanRecord)
+ currentOffset = scanRecord.logOffset() + 1
true
+ } else if (currentOffset < flussPartition.stopOffset) {
+ throw new IllegalStateException(s"No more data from fluss server," +
+ s" but current offset $currentOffset not reach the stop offset
${flussPartition.stopOffset}")
} else {
false
}
@@ -83,10 +82,16 @@ class FlussAppendPartitionReader(
}
private def initialize(): Unit = {
+ if (flussPartition.startOffset >= flussPartition.stopOffset) {
+ throw new IllegalArgumentException(s"Invalid offset range
$flussPartition")
+ }
+ logInfo(s"Prepare read table $tablePath partition $partitionId bucket
$bucketId" +
+ s" with start offset ${flussPartition.startOffset} stop offset
${flussPartition.stopOffset}")
if (partitionId != null) {
- logScanner.subscribeFromBeginning(partitionId, bucketId)
+ logScanner.subscribe(partitionId, bucketId, flussPartition.startOffset)
} else {
- logScanner.subscribeFromBeginning(bucketId)
+ logScanner.subscribe(bucketId, flussPartition.startOffset)
}
+ pollMoreRecords()
}
}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala
index 1bcffa6fd..d2b9be546 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala
@@ -19,7 +19,7 @@ package org.apache.fluss.spark.read
import org.apache.fluss.client.{Connection, ConnectionFactory}
import org.apache.fluss.client.admin.Admin
-import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl,
OffsetsInitializer}
+import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl,
OffsetsInitializer, SnapshotOffsetsInitializer}
import org.apache.fluss.client.metadata.KvSnapshots
import org.apache.fluss.client.table.scanner.log.LogScanner
import org.apache.fluss.config.Configuration
@@ -47,6 +47,10 @@ abstract class FlussBatch(
lazy val partitionInfos: util.List[PartitionInfo] =
admin.listPartitionInfos(tablePath).get()
+ def startOffsetsInitializer: OffsetsInitializer
+
+ def stoppingOffsetsInitializer: OffsetsInitializer
+
protected def projection: Array[Int] = {
val columnNameToIndex =
tableInfo.getSchema.getColumnNames.asScala.zipWithIndex.toMap
readSchema.fields.map {
@@ -76,26 +80,77 @@ class FlussAppendBatch(
flussConfig: Configuration)
extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
+ override val startOffsetsInitializer: OffsetsInitializer = {
+ FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig)
+ }
+
+ override val stoppingOffsetsInitializer: OffsetsInitializer = {
+ FlussOffsetInitializers.stoppingOffsetsInitializer(true, options,
flussConfig)
+ }
+
override def planInputPartitions(): Array[InputPartition] = {
- def createPartitions(partitionId: Option[Long]): Array[InputPartition] = {
- (0 until tableInfo.getNumBuckets).map {
+ val bucketOffsetsRetrieverImpl = new BucketOffsetsRetrieverImpl(admin,
tablePath)
+ val buckets = (0 until tableInfo.getNumBuckets).toSeq
+
+ def createPartitions(
+ partitionId: Option[Long],
+ startBucketOffsets: Map[Integer, Long],
+ stoppingBucketOffsets: Map[Integer, Long]): Array[InputPartition] = {
+ buckets.map {
bucketId =>
- val tableBucket = partitionId match {
+ val (startBucketOffset, stoppingBucketOffset) =
+ (startBucketOffsets(bucketId), stoppingBucketOffsets(bucketId))
+ partitionId match {
case Some(partitionId) =>
- new TableBucket(tableInfo.getTableId, partitionId, bucketId)
+ val tableBucket = new TableBucket(tableInfo.getTableId,
partitionId, bucketId)
+ FlussAppendInputPartition(tableBucket, startBucketOffset,
stoppingBucketOffset)
+ .asInstanceOf[InputPartition]
case None =>
- new TableBucket(tableInfo.getTableId, bucketId)
+ val tableBucket = new TableBucket(tableInfo.getTableId, bucketId)
+ FlussAppendInputPartition(tableBucket, startBucketOffset,
stoppingBucketOffset)
+ .asInstanceOf[InputPartition]
}
- FlussAppendInputPartition(tableBucket).asInstanceOf[InputPartition]
}.toArray
}
if (tableInfo.isPartitioned) {
- partitionInfos.asScala.flatMap {
- partitionInfo => createPartitions(Some(partitionInfo.getPartitionId))
- }.toArray
+ partitionInfos.asScala
+ .map {
+ partitionInfo =>
+ val startBucketOffsets = startOffsetsInitializer.getBucketOffsets(
+ partitionInfo.getPartitionName,
+ buckets.map(Integer.valueOf).asJava,
+ bucketOffsetsRetrieverImpl)
+ val stoppingBucketOffsets =
stoppingOffsetsInitializer.getBucketOffsets(
+ partitionInfo.getPartitionName,
+ buckets.map(Integer.valueOf).asJava,
+ bucketOffsetsRetrieverImpl)
+ (
+ partitionInfo.getPartitionId,
+ startBucketOffsets.asScala.map(e => (e._1, Long2long(e._2))),
+ stoppingBucketOffsets.asScala.map(e => (e._1, Long2long(e._2))))
+ }
+ .flatMap {
+ case (partitionId, startBucketOffsets, stoppingBucketOffsets) =>
+ createPartitions(
+ Some(partitionId),
+ startBucketOffsets.toMap,
+ stoppingBucketOffsets.toMap)
+ }
+ .toArray
} else {
- createPartitions(None)
+ val startBucketOffsets = startOffsetsInitializer.getBucketOffsets(
+ null,
+ buckets.map(Integer.valueOf).asJava,
+ bucketOffsetsRetrieverImpl)
+ val stoppingBucketOffsets = stoppingOffsetsInitializer.getBucketOffsets(
+ null,
+ buckets.map(Integer.valueOf).asJava,
+ bucketOffsetsRetrieverImpl)
+ createPartitions(
+ None,
+ startBucketOffsets.asScala.map(e => (e._1, Long2long(e._2))).toMap,
+ stoppingBucketOffsets.asScala.map(e => (e._1, Long2long(e._2))).toMap)
}
}
@@ -114,7 +169,18 @@ class FlussUpsertBatch(
flussConfig: Configuration)
extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
- private val latestOffsetsInitializer = OffsetsInitializer.latest()
+ override val startOffsetsInitializer: OffsetsInitializer = {
+ val offsetsInitializer =
FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig)
+ if (!offsetsInitializer.isInstanceOf[SnapshotOffsetsInitializer]) {
+ throw new UnsupportedOperationException("Upsert scan only support FULL
startup mode.")
+ }
+ offsetsInitializer
+ }
+
+ override val stoppingOffsetsInitializer: OffsetsInitializer = {
+ FlussOffsetInitializers.stoppingOffsetsInitializer(true, options,
flussConfig)
+ }
+
private val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin,
tablePath)
override def planInputPartitions(): Array[InputPartition] = {
@@ -123,7 +189,10 @@ class FlussUpsertBatch(
val partitionId = kvSnapshots.getPartitionId
val bucketIds = kvSnapshots.getBucketIds
val bucketIdToLogOffset =
- latestOffsetsInitializer.getBucketOffsets(partitionName, bucketIds,
bucketOffsetsRetriever)
+ stoppingOffsetsInitializer.getBucketOffsets(
+ partitionName,
+ bucketIds,
+ bucketOffsetsRetriever)
bucketIds.asScala
.map {
bucketId =>
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
index 75bb056ce..b573828cf 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
@@ -36,7 +36,8 @@ trait FlussInputPartition extends InputPartition {
* @param tableBucket
* the table bucket to read from
*/
-case class FlussAppendInputPartition(tableBucket: TableBucket) extends
FlussInputPartition
+case class FlussAppendInputPartition(tableBucket: TableBucket, startOffset:
Long, stopOffset: Long)
+ extends FlussInputPartition
/**
* Represents an input partition for reading data from a primary key table
bucket. This partition
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussOffsetInitializers.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussOffsetInitializers.scala
new file mode 100644
index 000000000..d1e2bc4ff
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussOffsetInitializers.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.initializer.OffsetsInitializer
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.spark.SparkFlussConf
+
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+object FlussOffsetInitializers {
+ def startOffsetsInitializer(
+ options: CaseInsensitiveStringMap,
+ flussConfig: Configuration): OffsetsInitializer = {
+ val startupMode = options
+ .getOrDefault(
+ SparkFlussConf.SCAN_START_UP_MODE.key(),
+ flussConfig.get(SparkFlussConf.SCAN_START_UP_MODE))
+ .toUpperCase
+
+ SparkFlussConf.StartUpMode.withName(startupMode) match {
+ case SparkFlussConf.StartUpMode.EARLIEST => OffsetsInitializer.earliest()
+ case SparkFlussConf.StartUpMode.FULL => OffsetsInitializer.full()
+ case SparkFlussConf.StartUpMode.LATEST => OffsetsInitializer.latest()
+ case _ =>
+ throw new IllegalArgumentException(
+ s"Unsupported scan start up mode:
${options.get(SparkFlussConf.SCAN_START_UP_MODE.key())}")
+ }
+ }
+
+ def stoppingOffsetsInitializer(
+ isBatch: Boolean,
+ options: CaseInsensitiveStringMap,
+ flussConfig: Configuration): OffsetsInitializer = {
+ if (isBatch) {
+ OffsetsInitializer.latest()
+ } else {
+ throw new UnsupportedOperationException("Stream read is not supported
yet.")
+ }
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala
index 10203898f..07b6a47f4 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala
@@ -23,18 +23,22 @@ import org.apache.fluss.client.table.scanner.ScanRecord
import org.apache.fluss.config.Configuration
import org.apache.fluss.metadata.{TableInfo, TablePath}
import org.apache.fluss.row.{InternalRow => FlussInternalRow}
+import org.apache.fluss.spark.SparkFlussConf
import org.apache.fluss.spark.row.DataConverter
import org.apache.fluss.types.RowType
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.PartitionReader
import java.time.Duration
abstract class FlussPartitionReader(tablePath: TablePath, flussConfig:
Configuration)
- extends PartitionReader[InternalRow] {
+ extends PartitionReader[InternalRow]
+ with Logging {
- protected val POLL_TIMEOUT: Duration = Duration.ofMillis(100)
+ protected val POLL_TIMEOUT: Duration =
+
Duration.ofMillis(flussConfig.get(SparkFlussConf.SCAN_POLL_TIMEOUT).toMillis)
protected lazy val conn: Connection =
ConnectionFactory.createConnection(flussConfig)
protected lazy val table: Table = conn.getTable(tablePath)
protected lazy val tableInfo: TableInfo = table.getTableInfo
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala
index c1cb02236..359614cbc 100644
---
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala
@@ -38,6 +38,14 @@ class SparkPrimaryKeyTableReadTest extends
FlussSparkTestBase {
new Configuration()
}
+ override protected def beforeEach(): Unit = {
+ super.beforeEach()
+ sql(
+ s"set
${SparkFlussConf.SPARK_FLUSS_CONF_PREFIX}${SparkFlussConf.SCAN_START_UP_MODE.key()}=full")
+ sql(
+ s"set
${SparkFlussConf.SPARK_FLUSS_CONF_PREFIX}${SparkFlussConf.READ_OPTIMIZED_OPTION.key()}=false")
+ }
+
test("Spark Read: primary key table") {
withTable("t") {
val tablePath = createTablePath("t")
@@ -101,7 +109,8 @@ class SparkPrimaryKeyTableReadTest extends
FlussSparkTestBase {
Row(800L, 23L, "addr3") ::
Nil
)
- withSQLConf(SparkFlussConf.READ_OPTIMIZED.key -> "true") {
+ withSQLConf(
+
s"${SparkFlussConf.SPARK_FLUSS_CONF_PREFIX}${SparkFlussConf.READ_OPTIMIZED_OPTION.key()}"
-> "true") {
checkAnswer(
sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
Row(600L, 21L, 601, "addr1") ::
@@ -123,6 +132,13 @@ class SparkPrimaryKeyTableReadTest extends
FlussSparkTestBase {
Row(800L, 23L, "addr3") ::
Nil
)
+
+ // Only support FULL startup mode.
+ withSQLConf(
+
s"${SparkFlussConf.SPARK_FLUSS_CONF_PREFIX}${SparkFlussConf.SCAN_START_UP_MODE.key()}"
-> "latest") {
+ intercept[UnsupportedOperationException](
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId").show())
+ }
}
}