This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 62ada4562 [spark] Add startup mode for batch read (#2532)
62ada4562 is described below

commit 62ada456250b1d987f02255e865ecedbcee1e48e
Author: Yang Zhang <[email protected]>
AuthorDate: Thu Feb 5 12:22:10 2026 +0800

    [spark] Add startup mode for batch read (#2532)
---
 .../org/apache/fluss/spark/SparkFlussConf.scala    | 34 +++++---
 .../scala/org/apache/fluss/spark/SparkTable.scala  | 21 +++--
 .../spark/read/FlussAppendPartitionReader.scala    | 51 ++++++------
 .../org/apache/fluss/spark/read/FlussBatch.scala   | 95 +++++++++++++++++++---
 .../fluss/spark/read/FlussInputPartition.scala     |  3 +-
 .../fluss/spark/read/FlussOffsetInitializers.scala | 56 +++++++++++++
 .../fluss/spark/read/FlussPartitionReader.scala    |  8 +-
 .../fluss/spark/SparkPrimaryKeyTableReadTest.scala | 18 +++-
 8 files changed, 227 insertions(+), 59 deletions(-)

diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
index f71e2c229..28fb633b5 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
@@ -17,23 +17,37 @@
 
 package org.apache.fluss.spark
 
+import org.apache.fluss.config.{ConfigBuilder, ConfigOption}
 import org.apache.fluss.config.ConfigBuilder.key
-import org.apache.fluss.config.ConfigOption
 
-import org.apache.spark.sql.internal.SQLConf.buildConf
+import java.time.Duration
 
 object SparkFlussConf {
 
-  val READ_OPTIMIZED = buildConf("spark.sql.fluss.readOptimized")
-    .internal()
-    .doc("If true, Spark will only read data from data lake snapshot or kv 
snapshot, not execute merge them with log changes. This is a temporary 
configuration that will be deprecated when read-optimized table(e.g. 
`mytbl$ro`) is supported.")
-    .booleanConf
-    .createWithDefault(false)
+  val SPARK_FLUSS_CONF_PREFIX = "spark.sql.fluss."
 
   val READ_OPTIMIZED_OPTION: ConfigOption[java.lang.Boolean] =
-    key(READ_OPTIMIZED.key)
+    key("read.optimized")
       .booleanType()
-      .defaultValue(READ_OPTIMIZED.defaultValue.get)
-      .withDescription(READ_OPTIMIZED.doc)
+      .defaultValue(false)
+      .withDescription(
+        "If true, Spark will only read data from data lake snapshot or kv 
snapshot, not execute merge them with log changes. This is a temporary 
configuration that will be deprecated when read-optimized table(e.g. 
`mytbl$ro`) is supported.")
 
+  object StartUpMode extends Enumeration {
+    val FULL, EARLIEST, LATEST, TIMESTAMP = Value
+  }
+
+  val SCAN_START_UP_MODE: ConfigOption[String] =
+    ConfigBuilder
+      .key("scan.startup.mode")
+      .stringType()
+      .defaultValue(StartUpMode.FULL.toString)
+      .withDescription("The start up mode when read Fluss table.")
+
+  val SCAN_POLL_TIMEOUT: ConfigOption[Duration] =
+    ConfigBuilder
+      .key("scan.poll.timeout")
+      .durationType()
+      .defaultValue(Duration.ofMillis(10000L))
+      .withDescription("The timeout for log scanner to poll records.")
 }
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
index 53ed8a091..144db03ae 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
@@ -20,7 +20,6 @@ package org.apache.fluss.spark
 import org.apache.fluss.client.admin.Admin
 import org.apache.fluss.config.{Configuration => FlussConfiguration}
 import org.apache.fluss.metadata.{TableInfo, TablePath}
-import org.apache.fluss.spark.SparkFlussConf.READ_OPTIMIZED
 import org.apache.fluss.spark.catalog.{AbstractSparkTable, 
SupportsFlussPartitionManagement}
 import org.apache.fluss.spark.read.{FlussAppendScanBuilder, 
FlussUpsertScanBuilder}
 import org.apache.fluss.spark.write.{FlussAppendWriteBuilder, 
FlussUpsertWriteBuilder}
@@ -42,7 +41,17 @@ class SparkTable(
   with SupportsWrite
   with SQLConfHelper {
 
+  private def populateSparkConf(flussConfig: FlussConfiguration): Unit = {
+    conf.getAllConfs
+      .filter(_._1.startsWith(SparkFlussConf.SPARK_FLUSS_CONF_PREFIX))
+      .foreach {
+        case (k, v) =>
+          
flussConfig.setString(k.substring(SparkFlussConf.SPARK_FLUSS_CONF_PREFIX.length),
 v)
+      }
+  }
+
   override def newWriteBuilder(logicalWriteInfo: LogicalWriteInfo): 
WriteBuilder = {
+    populateSparkConf(flussConfig)
     if (tableInfo.getPrimaryKeys.isEmpty) {
       new FlussAppendWriteBuilder(tablePath, logicalWriteInfo.schema(), 
flussConfig)
     } else {
@@ -51,17 +60,11 @@ class SparkTable(
   }
 
   override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
+    populateSparkConf(flussConfig)
     if (tableInfo.getPrimaryKeys.isEmpty) {
       new FlussAppendScanBuilder(tablePath, tableInfo, options, flussConfig)
     } else {
-      val newFlussConfig = if (conf.getConf(SparkFlussConf.READ_OPTIMIZED, 
false)) {
-        val newFlussConfig_ = new FlussConfiguration(flussConfig)
-        newFlussConfig_.setBoolean(SparkFlussConf.READ_OPTIMIZED_OPTION.key(), 
true)
-        newFlussConfig_
-      } else {
-        flussConfig
-      }
-      new FlussUpsertScanBuilder(tablePath, tableInfo, options, newFlussConfig)
+      new FlussUpsertScanBuilder(tablePath, tableInfo, options, flussConfig)
     }
   }
 }
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
index 075e0a275..54570285a 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
@@ -18,7 +18,6 @@
 package org.apache.fluss.spark.read
 
 import org.apache.fluss.client.table.scanner.ScanRecord
-import org.apache.fluss.client.table.scanner.log.ScanRecords
 import org.apache.fluss.config.Configuration
 import org.apache.fluss.metadata.{TableBucket, TablePath}
 
@@ -36,41 +35,41 @@ class FlussAppendPartitionReader(
   private val logScanner = 
table.newScan().project(projection).createLogScanner()
 
   // Iterator for current batch of records
-  private var currentRecords: java.util.Iterator[ScanRecord] = _
+  private var currentRecords: java.util.Iterator[ScanRecord] = 
java.util.Collections.emptyIterator()
+
+  // The latest offset of fluss is -2
+  private var currentOffset: Long = flussPartition.startOffset.max(0L)
 
   // initialize log scanner
   initialize()
 
-  override def next(): Boolean = {
-    if (closed) {
-      return false
-    }
-
-    // If we have records in current batch, return next one
-    if (currentRecords != null && currentRecords.hasNext) {
-      val scanRecord = currentRecords.next()
-      currentRow = convertToSparkRow(scanRecord)
-      return true
-    }
-
-    // Poll for more records
+  private def pollMoreRecords(): Unit = {
     val scanRecords = logScanner.poll(POLL_TIMEOUT)
+    if ((scanRecords == null || scanRecords.isEmpty) && currentOffset < 
flussPartition.stopOffset) {
+      throw new IllegalStateException(s"No more data from fluss server," +
+        s" but current offset $currentOffset not reach the stop offset 
${flussPartition.stopOffset}")
+    }
+    currentRecords = scanRecords.records(tableBucket).iterator()
+  }
 
-    if (scanRecords == null || scanRecords.isEmpty) {
+  override def next(): Boolean = {
+    if (closed || currentOffset >= flussPartition.stopOffset) {
       return false
     }
 
-    // Get records for our bucket
-    val bucketRecords = scanRecords.records(tableBucket)
-    if (bucketRecords.isEmpty) {
-      return false
+    if (!currentRecords.hasNext) {
+      pollMoreRecords()
     }
 
-    currentRecords = bucketRecords.iterator()
+    // If we have records in current batch, return next one
     if (currentRecords.hasNext) {
       val scanRecord = currentRecords.next()
       currentRow = convertToSparkRow(scanRecord)
+      currentOffset = scanRecord.logOffset() + 1
       true
+    } else if (currentOffset < flussPartition.stopOffset) {
+      throw new IllegalStateException(s"No more data from fluss server," +
+        s" but current offset $currentOffset not reach the stop offset 
${flussPartition.stopOffset}")
     } else {
       false
     }
@@ -83,10 +82,16 @@ class FlussAppendPartitionReader(
   }
 
   private def initialize(): Unit = {
+    if (flussPartition.startOffset >= flussPartition.stopOffset) {
+      throw new IllegalArgumentException(s"Invalid offset range 
$flussPartition")
+    }
+    logInfo(s"Prepare read table $tablePath partition $partitionId bucket 
$bucketId" +
+      s" with start offset ${flussPartition.startOffset} stop offset 
${flussPartition.stopOffset}")
     if (partitionId != null) {
-      logScanner.subscribeFromBeginning(partitionId, bucketId)
+      logScanner.subscribe(partitionId, bucketId, flussPartition.startOffset)
     } else {
-      logScanner.subscribeFromBeginning(bucketId)
+      logScanner.subscribe(bucketId, flussPartition.startOffset)
     }
+    pollMoreRecords()
   }
 }
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala
index 1bcffa6fd..d2b9be546 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala
@@ -19,7 +19,7 @@ package org.apache.fluss.spark.read
 
 import org.apache.fluss.client.{Connection, ConnectionFactory}
 import org.apache.fluss.client.admin.Admin
-import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, 
OffsetsInitializer}
+import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, 
OffsetsInitializer, SnapshotOffsetsInitializer}
 import org.apache.fluss.client.metadata.KvSnapshots
 import org.apache.fluss.client.table.scanner.log.LogScanner
 import org.apache.fluss.config.Configuration
@@ -47,6 +47,10 @@ abstract class FlussBatch(
 
   lazy val partitionInfos: util.List[PartitionInfo] = 
admin.listPartitionInfos(tablePath).get()
 
+  def startOffsetsInitializer: OffsetsInitializer
+
+  def stoppingOffsetsInitializer: OffsetsInitializer
+
   protected def projection: Array[Int] = {
     val columnNameToIndex = 
tableInfo.getSchema.getColumnNames.asScala.zipWithIndex.toMap
     readSchema.fields.map {
@@ -76,26 +80,77 @@ class FlussAppendBatch(
     flussConfig: Configuration)
   extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
 
+  override val startOffsetsInitializer: OffsetsInitializer = {
+    FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig)
+  }
+
+  override val stoppingOffsetsInitializer: OffsetsInitializer = {
+    FlussOffsetInitializers.stoppingOffsetsInitializer(true, options, 
flussConfig)
+  }
+
   override def planInputPartitions(): Array[InputPartition] = {
-    def createPartitions(partitionId: Option[Long]): Array[InputPartition] = {
-      (0 until tableInfo.getNumBuckets).map {
+    val bucketOffsetsRetrieverImpl = new BucketOffsetsRetrieverImpl(admin, 
tablePath)
+    val buckets = (0 until tableInfo.getNumBuckets).toSeq
+
+    def createPartitions(
+        partitionId: Option[Long],
+        startBucketOffsets: Map[Integer, Long],
+        stoppingBucketOffsets: Map[Integer, Long]): Array[InputPartition] = {
+      buckets.map {
         bucketId =>
-          val tableBucket = partitionId match {
+          val (startBucketOffset, stoppingBucketOffset) =
+            (startBucketOffsets(bucketId), stoppingBucketOffsets(bucketId))
+          partitionId match {
             case Some(partitionId) =>
-              new TableBucket(tableInfo.getTableId, partitionId, bucketId)
+              val tableBucket = new TableBucket(tableInfo.getTableId, 
partitionId, bucketId)
+              FlussAppendInputPartition(tableBucket, startBucketOffset, 
stoppingBucketOffset)
+                .asInstanceOf[InputPartition]
             case None =>
-              new TableBucket(tableInfo.getTableId, bucketId)
+              val tableBucket = new TableBucket(tableInfo.getTableId, bucketId)
+              FlussAppendInputPartition(tableBucket, startBucketOffset, 
stoppingBucketOffset)
+                .asInstanceOf[InputPartition]
           }
-          FlussAppendInputPartition(tableBucket).asInstanceOf[InputPartition]
       }.toArray
     }
 
     if (tableInfo.isPartitioned) {
-      partitionInfos.asScala.flatMap {
-        partitionInfo => createPartitions(Some(partitionInfo.getPartitionId))
-      }.toArray
+      partitionInfos.asScala
+        .map {
+          partitionInfo =>
+            val startBucketOffsets = startOffsetsInitializer.getBucketOffsets(
+              partitionInfo.getPartitionName,
+              buckets.map(Integer.valueOf).asJava,
+              bucketOffsetsRetrieverImpl)
+            val stoppingBucketOffsets = 
stoppingOffsetsInitializer.getBucketOffsets(
+              partitionInfo.getPartitionName,
+              buckets.map(Integer.valueOf).asJava,
+              bucketOffsetsRetrieverImpl)
+            (
+              partitionInfo.getPartitionId,
+              startBucketOffsets.asScala.map(e => (e._1, Long2long(e._2))),
+              stoppingBucketOffsets.asScala.map(e => (e._1, Long2long(e._2))))
+        }
+        .flatMap {
+          case (partitionId, startBucketOffsets, stoppingBucketOffsets) =>
+            createPartitions(
+              Some(partitionId),
+              startBucketOffsets.toMap,
+              stoppingBucketOffsets.toMap)
+        }
+        .toArray
     } else {
-      createPartitions(None)
+      val startBucketOffsets = startOffsetsInitializer.getBucketOffsets(
+        null,
+        buckets.map(Integer.valueOf).asJava,
+        bucketOffsetsRetrieverImpl)
+      val stoppingBucketOffsets = stoppingOffsetsInitializer.getBucketOffsets(
+        null,
+        buckets.map(Integer.valueOf).asJava,
+        bucketOffsetsRetrieverImpl)
+      createPartitions(
+        None,
+        startBucketOffsets.asScala.map(e => (e._1, Long2long(e._2))).toMap,
+        stoppingBucketOffsets.asScala.map(e => (e._1, Long2long(e._2))).toMap)
     }
   }
 
@@ -114,7 +169,18 @@ class FlussUpsertBatch(
     flussConfig: Configuration)
   extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
 
-  private val latestOffsetsInitializer = OffsetsInitializer.latest()
+  override val startOffsetsInitializer: OffsetsInitializer = {
+    val offsetsInitializer = 
FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig)
+    if (!offsetsInitializer.isInstanceOf[SnapshotOffsetsInitializer]) {
+      throw new UnsupportedOperationException("Upsert scan only support FULL 
startup mode.")
+    }
+    offsetsInitializer
+  }
+
+  override val stoppingOffsetsInitializer: OffsetsInitializer = {
+    FlussOffsetInitializers.stoppingOffsetsInitializer(true, options, 
flussConfig)
+  }
+
   private val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, 
tablePath)
 
   override def planInputPartitions(): Array[InputPartition] = {
@@ -123,7 +189,10 @@ class FlussUpsertBatch(
       val partitionId = kvSnapshots.getPartitionId
       val bucketIds = kvSnapshots.getBucketIds
       val bucketIdToLogOffset =
-        latestOffsetsInitializer.getBucketOffsets(partitionName, bucketIds, 
bucketOffsetsRetriever)
+        stoppingOffsetsInitializer.getBucketOffsets(
+          partitionName,
+          bucketIds,
+          bucketOffsetsRetriever)
       bucketIds.asScala
         .map {
           bucketId =>
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
index 75bb056ce..b573828cf 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
@@ -36,7 +36,8 @@ trait FlussInputPartition extends InputPartition {
  * @param tableBucket
  *   the table bucket to read from
  */
-case class FlussAppendInputPartition(tableBucket: TableBucket) extends 
FlussInputPartition
+case class FlussAppendInputPartition(tableBucket: TableBucket, startOffset: 
Long, stopOffset: Long)
+  extends FlussInputPartition
 
 /**
  * Represents an input partition for reading data from a primary key table 
bucket. This partition
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussOffsetInitializers.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussOffsetInitializers.scala
new file mode 100644
index 000000000..d1e2bc4ff
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussOffsetInitializers.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.initializer.OffsetsInitializer
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.spark.SparkFlussConf
+
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+object FlussOffsetInitializers {
+  def startOffsetsInitializer(
+      options: CaseInsensitiveStringMap,
+      flussConfig: Configuration): OffsetsInitializer = {
+    val startupMode = options
+      .getOrDefault(
+        SparkFlussConf.SCAN_START_UP_MODE.key(),
+        flussConfig.get(SparkFlussConf.SCAN_START_UP_MODE))
+      .toUpperCase
+
+    SparkFlussConf.StartUpMode.withName(startupMode) match {
+      case SparkFlussConf.StartUpMode.EARLIEST => OffsetsInitializer.earliest()
+      case SparkFlussConf.StartUpMode.FULL => OffsetsInitializer.full()
+      case SparkFlussConf.StartUpMode.LATEST => OffsetsInitializer.latest()
+      case _ =>
+        throw new IllegalArgumentException(
+          s"Unsupported scan start up mode: 
${options.get(SparkFlussConf.SCAN_START_UP_MODE.key())}")
+    }
+  }
+
+  def stoppingOffsetsInitializer(
+      isBatch: Boolean,
+      options: CaseInsensitiveStringMap,
+      flussConfig: Configuration): OffsetsInitializer = {
+    if (isBatch) {
+      OffsetsInitializer.latest()
+    } else {
+      throw new UnsupportedOperationException("Stream read is not supported 
yet.")
+    }
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala
index 10203898f..07b6a47f4 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala
@@ -23,18 +23,22 @@ import org.apache.fluss.client.table.scanner.ScanRecord
 import org.apache.fluss.config.Configuration
 import org.apache.fluss.metadata.{TableInfo, TablePath}
 import org.apache.fluss.row.{InternalRow => FlussInternalRow}
+import org.apache.fluss.spark.SparkFlussConf
 import org.apache.fluss.spark.row.DataConverter
 import org.apache.fluss.types.RowType
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.read.PartitionReader
 
 import java.time.Duration
 
 abstract class FlussPartitionReader(tablePath: TablePath, flussConfig: 
Configuration)
-  extends PartitionReader[InternalRow] {
+  extends PartitionReader[InternalRow]
+  with Logging {
 
-  protected val POLL_TIMEOUT: Duration = Duration.ofMillis(100)
+  protected val POLL_TIMEOUT: Duration =
+    
Duration.ofMillis(flussConfig.get(SparkFlussConf.SCAN_POLL_TIMEOUT).toMillis)
   protected lazy val conn: Connection = 
ConnectionFactory.createConnection(flussConfig)
   protected lazy val table: Table = conn.getTable(tablePath)
   protected lazy val tableInfo: TableInfo = table.getTableInfo
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala
index c1cb02236..359614cbc 100644
--- 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala
@@ -38,6 +38,14 @@ class SparkPrimaryKeyTableReadTest extends 
FlussSparkTestBase {
     new Configuration()
   }
 
+  override protected def beforeEach(): Unit = {
+    super.beforeEach()
+    sql(
+      s"set 
${SparkFlussConf.SPARK_FLUSS_CONF_PREFIX}${SparkFlussConf.SCAN_START_UP_MODE.key()}=full")
+    sql(
+      s"set 
${SparkFlussConf.SPARK_FLUSS_CONF_PREFIX}${SparkFlussConf.READ_OPTIMIZED_OPTION.key()}=false")
+  }
+
   test("Spark Read: primary key table") {
     withTable("t") {
       val tablePath = createTablePath("t")
@@ -101,7 +109,8 @@ class SparkPrimaryKeyTableReadTest extends 
FlussSparkTestBase {
           Row(800L, 23L, "addr3") ::
           Nil
       )
-      withSQLConf(SparkFlussConf.READ_OPTIMIZED.key -> "true") {
+      withSQLConf(
+        
s"${SparkFlussConf.SPARK_FLUSS_CONF_PREFIX}${SparkFlussConf.READ_OPTIMIZED_OPTION.key()}"
 -> "true") {
         checkAnswer(
           sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
           Row(600L, 21L, 601, "addr1") ::
@@ -123,6 +132,13 @@ class SparkPrimaryKeyTableReadTest extends 
FlussSparkTestBase {
           Row(800L, 23L, "addr3") ::
           Nil
       )
+
+      // Only support FULL startup mode.
+      withSQLConf(
+        
s"${SparkFlussConf.SPARK_FLUSS_CONF_PREFIX}${SparkFlussConf.SCAN_START_UP_MODE.key()}"
 -> "latest") {
+        intercept[UnsupportedOperationException](
+          sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId").show())
+      }
     }
   }
 

Reply via email to