This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d7630a37 [spark] Add basic streaming read support for sparksql with 
latest mode (#2548)
5d7630a37 is described below

commit 5d7630a37b9ad27ec625e9c1e8c8c9a29ba5dfe5
Author: Yang Zhang <[email protected]>
AuthorDate: Sun Feb 8 12:14:47 2026 +0800

    [spark] Add basic streaming read support for sparksql with latest mode 
(#2548)
---
 .../fluss/spark/catalog/AbstractSparkTable.scala   |   1 +
 .../spark/read/FlussAppendPartitionReader.scala    |   3 +-
 .../fluss/spark/read/FlussInputPartition.scala     |  16 +-
 .../fluss/spark/read/FlussMicroBatchStream.scala   | 346 +++++++++++++++++++++
 .../fluss/spark/read/FlussOffsetInitializers.scala |   4 +-
 .../org/apache/fluss/spark/read/FlussScan.scala    |  21 ++
 .../apache/fluss/spark/read/FlussScanBuilder.scala |   2 +-
 .../spark/read/FlussUpsertPartitionReader.scala    |   2 +-
 .../apache/fluss/spark/SparkStreamingTest.scala    | 280 ++++++++++++++++-
 9 files changed, 665 insertions(+), 10 deletions(-)

diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
index e64f42a51..f9856a919 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
@@ -46,6 +46,7 @@ abstract class AbstractSparkTable(val admin: Admin, val 
tableInfo: TableInfo) ex
     Set(
       TableCapability.BATCH_READ,
       TableCapability.BATCH_WRITE,
+      TableCapability.MICRO_BATCH_READ,
       TableCapability.STREAMING_WRITE
     ).asJava
   }
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
index 54570285a..520d6ae71 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
@@ -85,8 +85,7 @@ class FlussAppendPartitionReader(
     if (flussPartition.startOffset >= flussPartition.stopOffset) {
       throw new IllegalArgumentException(s"Invalid offset range 
$flussPartition")
     }
-    logInfo(s"Prepare read table $tablePath partition $partitionId bucket 
$bucketId" +
-      s" with start offset ${flussPartition.startOffset} stop offset 
${flussPartition.stopOffset}")
+    logInfo(s"Prepare read table $tablePath $flussPartition")
     if (partitionId != null) {
       logScanner.subscribe(partitionId, bucketId, flussPartition.startOffset)
     } else {
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
index b573828cf..397b62430 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
@@ -37,7 +37,13 @@ trait FlussInputPartition extends InputPartition {
  *   the table bucket to read from
  */
 case class FlussAppendInputPartition(tableBucket: TableBucket, startOffset: 
Long, stopOffset: Long)
-  extends FlussInputPartition
+  extends FlussInputPartition {
+  override def toString: String = {
+    s"FlussAppendInputPartition{tableId=${tableBucket.getTableId}, 
bucketId=${tableBucket.getBucket}," +
+      s" partitionId=${tableBucket.getPartitionId}" +
+      s" logStartOffset=$startOffset, logStopOffset=$stopOffset"
+  }
+}
 
 /**
  * Represents an input partition for reading data from a primary key table 
bucket. This partition
@@ -57,4 +63,10 @@ case class FlussUpsertInputPartition(
     snapshotId: Long,
     logStartingOffset: Long,
     logStoppingOffset: Long)
-  extends FlussInputPartition
+  extends FlussInputPartition {
+  override def toString: String = {
+    s"FlussUpsertInputPartition{tableId=${tableBucket.getTableId}, 
bucketId=${tableBucket.getBucket}," +
+      s" partitionId=${tableBucket.getPartitionId}, snapshotId=$snapshotId," +
+      s" logStartOffset=$logStartingOffset, logStopOffset=$logStoppingOffset"
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala
new file mode 100644
index 000000000..e3d77a9d7
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.{Connection, ConnectionFactory}
+import org.apache.fluss.client.admin.Admin
+import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, 
OffsetsInitializer}
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.{PartitionInfo, TableBucket, TableInfo, 
TablePath}
+import org.apache.fluss.utils.json.TableBucketOffsets
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory}
+import org.apache.spark.sql.connector.read.streaming._
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.util
+import java.util.Optional
+
+import scala.collection.JavaConverters._
+
+case class FlussSourceOffset(tableBucketOffsets: TableBucketOffsets) extends 
Offset {
+  override val json: String = new String(tableBucketOffsets.toJsonBytes, 
"utf-8")
+}
+
+abstract class FlussMicroBatchStream(
+    val tablePath: TablePath,
+    tableInfo: TableInfo,
+    readSchema: StructType,
+    options: CaseInsensitiveStringMap,
+    flussConfig: Configuration,
+    checkpointLocation: String)
+  extends SupportsTriggerAvailableNow
+  with ReportsSourceMetrics
+  with MicroBatchStream
+  with Logging
+  with AutoCloseable {
+
+  lazy val conn: Connection = ConnectionFactory.createConnection(flussConfig)
+
+  lazy val admin: Admin = conn.getAdmin
+
+  lazy val bucketOffsetsRetriever: BucketOffsetsRetrieverImpl =
+    new BucketOffsetsRetrieverImpl(admin, tableInfo.getTablePath)
+
+  lazy val partitionInfos: util.List[PartitionInfo] = 
admin.listPartitionInfos(tablePath).get()
+
+  private var allDataForTriggerAvailableNow: Option[TableBucketOffsets] = None
+
+  val startOffsetsInitializer: OffsetsInitializer =
+    FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig)
+
+  val stoppingOffsetsInitializer: OffsetsInitializer =
+    FlussOffsetInitializers.stoppingOffsetsInitializer(false, options, 
flussConfig)
+
+  protected def projection: Array[Int] = {
+    val columnNameToIndex = 
tableInfo.getSchema.getColumnNames.asScala.zipWithIndex.toMap
+    readSchema.fields.map {
+      field =>
+        columnNameToIndex.getOrElse(
+          field.name,
+          throw new IllegalArgumentException(s"Invalid field name: 
${field.name}"))
+    }
+  }
+
+  override def close(): Unit = {
+    if (admin != null) {
+      admin.close()
+    }
+    if (conn != null) {
+      conn.close()
+    }
+  }
+
+  override def latestOffset(): Offset = {
+    throw new UnsupportedOperationException(
+      "latestOffset(Offset, ReadLimit) should be called instead of this 
method")
+  }
+
+  override def getDefaultReadLimit: ReadLimit = {
+    ReadLimit.allAvailable()
+  }
+
+  override def initialOffset(): Offset = {
+    val initialTableBucketOffsets = getOrCreateInitialPartitionOffsets()
+    FlussSourceOffset(initialTableBucketOffsets)
+  }
+
+  override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
+    if (!readLimit.isInstanceOf[ReadAllAvailable]) {
+      throw new UnsupportedOperationException(s"Only ReadAllAvailable is 
supported, but $readLimit")
+    }
+
+    val latestTableBucketOffsets = if 
(allDataForTriggerAvailableNow.isDefined) {
+      allDataForTriggerAvailableNow.get
+    } else {
+      fetchLatestOffsets().get
+    }
+    FlussSourceOffset(latestTableBucketOffsets)
+  }
+
+  override def prepareForTriggerAvailableNow(): Unit = {
+    allDataForTriggerAvailableNow = fetchLatestOffsets()
+  }
+
+  private def fetchLatestOffsets(): Option[TableBucketOffsets] = {
+    val buckets = (0 until tableInfo.getNumBuckets).toSeq
+    val offsetsInitializer = OffsetsInitializer.latest()
+    if (tableInfo.isPartitioned) {
+      val partitionOffsets = partitionInfos.asScala.map(
+        partitionInfo =>
+          FlussMicroBatchStream.getLatestOffsets(
+            tableInfo,
+            offsetsInitializer,
+            bucketOffsetsRetriever,
+            buckets,
+            Some(partitionInfo)))
+      val mergedOffsets = partitionOffsets
+        .map(_.getOffsets)
+        .reduce((l, r) => (l.asScala ++ r.asScala).asJava)
+      Some(new TableBucketOffsets(tableInfo.getTableId, mergedOffsets))
+    } else {
+      Some(
+        FlussMicroBatchStream
+          .getLatestOffsets(tableInfo, offsetsInitializer, 
bucketOffsetsRetriever, buckets, None))
+    }
+  }
+
+  // No need to notify fluss server
+  override def commit(end: Offset): Unit = {}
+
+  override def stop(): Unit = close()
+
+  override def deserializeOffset(json: String): Offset = {
+    FlussSourceOffset(TableBucketOffsets.fromJsonBytes(json.getBytes("utf-8")))
+  }
+
+  override def metrics(latestConsumedOffset: Optional[Offset]): 
util.Map[String, String] = {
+    // TODO add metrics
+    Map.empty[String, String].asJava
+  }
+
+  private def getOrCreateInitialPartitionOffsets(): TableBucketOffsets = {
+    if (tableInfo.isPartitioned) {
+      initPartitionedSplits()
+    } else {
+      initNonPartitionedSplits()
+    }
+  }
+
+  private def initPartitionedSplits(): TableBucketOffsets = {
+    val partitionOffsets = partitionInfos.asScala.map {
+      partitionInfo =>
+        if (tableInfo.hasPrimaryKey) {
+          getSnapshotAndLogSplits(Some(partitionInfo))
+        } else {
+          getLogSplit(Some(partitionInfo))
+        }
+    }
+
+    val mergedOffsets = partitionOffsets
+      .map(_.getOffsets)
+      .reduce((l, r) => (l.asScala ++ r.asScala).asJava)
+
+    new TableBucketOffsets(tableInfo.getTableId, mergedOffsets)
+  }
+
+  private def initNonPartitionedSplits(): TableBucketOffsets = {
+    if (tableInfo.hasPrimaryKey) {
+      getSnapshotAndLogSplits(None)
+    } else {
+      getLogSplit(None)
+    }
+  }
+
+  private def getSnapshotAndLogSplits(partitionInfo: Option[PartitionInfo]): 
TableBucketOffsets = {
+    // TODO read snapshot when more startup mode supported
+    getLogSplit(partitionInfo)
+  }
+
+  private def getLogSplit(partitionInfo: Option[PartitionInfo]): 
TableBucketOffsets = {
+    val buckets = (0 until tableInfo.getNumBuckets).toSeq
+    FlussMicroBatchStream.getLatestOffsets(
+      tableInfo,
+      startOffsetsInitializer,
+      bucketOffsetsRetriever,
+      buckets,
+      partitionInfo)
+  }
+}
+
+object FlussMicroBatchStream {
+  def getLatestOffsets(
+      tableInfo: TableInfo,
+      offsetsInitializer: OffsetsInitializer,
+      bucketOffsetsRetrieverImpl: BucketOffsetsRetrieverImpl,
+      buckets: Seq[Int],
+      partitionInfo: Option[PartitionInfo]): TableBucketOffsets = {
+    val latestOffsets = partitionInfo match {
+      case Some(partitionInfo) =>
+        offsetsInitializer
+          .getBucketOffsets(
+            partitionInfo.getPartitionName,
+            buckets.map(Integer.valueOf).asJava,
+            bucketOffsetsRetrieverImpl)
+          .asScala
+          .map {
+            case (bucket, offset) =>
+              val tableBucket =
+                new TableBucket(tableInfo.getTableId, 
partitionInfo.getPartitionId, bucket)
+              tableBucket -> offset
+          }
+
+      case None =>
+        offsetsInitializer
+          .getBucketOffsets(null, buckets.map(Integer.valueOf).asJava, 
bucketOffsetsRetrieverImpl)
+          .asScala
+          .map {
+            case (bucket, offset) =>
+              val tableBucket = new TableBucket(tableInfo.getTableId, bucket)
+              tableBucket -> offset
+          }
+    }
+    new TableBucketOffsets(
+      tableInfo.getTableId,
+      latestOffsets.map(e => (e._1, long2Long(e._2))).asJava)
+  }
+}
+
+/** Batch for reading log table (append-only table). */
+class FlussAppendMicroBatchStream(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    readSchema: StructType,
+    options: CaseInsensitiveStringMap,
+    flussConfig: Configuration,
+    checkpointLocation: String)
+  extends FlussMicroBatchStream(
+    tablePath,
+    tableInfo,
+    readSchema,
+    options,
+    flussConfig,
+    checkpointLocation) {
+
+  override def createReaderFactory(): PartitionReaderFactory = {
+    new FlussAppendPartitionReaderFactory(tablePath, projection, options, 
flussConfig)
+  }
+
+  override def planInputPartitions(start: Offset, end: Offset): 
Array[InputPartition] = {
+    // TODO process new partition and deleted partition
+    val startOffsets = start.asInstanceOf[FlussSourceOffset].tableBucketOffsets
+    val stopOffsets = end.asInstanceOf[FlussSourceOffset].tableBucketOffsets
+
+    if (
+      startOffsets.getOffsets
+        .keySet()
+        .asScala
+        .diff(stopOffsets.getOffsets.keySet().asScala)
+        .nonEmpty
+    ) {
+      throw new IllegalArgumentException(
+        "start and end offset must have the same table bucket info")
+    }
+
+    val inputPartitions = startOffsets.getOffsets
+      .keySet()
+      .asScala
+      .map {
+        tableBucket =>
+          val startOffset = startOffsets.getOffsets.get(tableBucket)
+          val stopOffset = stopOffsets.getOffsets.get(tableBucket)
+          FlussAppendInputPartition(tableBucket, startOffset, stopOffset)
+      }
+      .filter(e => e.startOffset < e.stopOffset)
+      .toArray
+    inputPartitions.map(_.asInstanceOf[InputPartition])
+  }
+}
+
+class FlussUpsertMicroBatchStream(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    readSchema: StructType,
+    options: CaseInsensitiveStringMap,
+    flussConfig: Configuration,
+    checkpointLocation: String)
+  extends FlussMicroBatchStream(
+    tablePath,
+    tableInfo,
+    readSchema,
+    options,
+    flussConfig,
+    checkpointLocation) {
+
+  override def planInputPartitions(start: Offset, end: Offset): 
Array[InputPartition] = {
+    // TODO process new partition and deleted partition
+    val startOffsets = start.asInstanceOf[FlussSourceOffset].tableBucketOffsets
+    val stopOffsets = end.asInstanceOf[FlussSourceOffset].tableBucketOffsets
+
+    if (
+      startOffsets.getOffsets
+        .keySet()
+        .asScala
+        .diff(stopOffsets.getOffsets.keySet().asScala)
+        .nonEmpty
+    ) {
+      throw new IllegalArgumentException(
+        "start and end offset must have the same table bucket info")
+    }
+
+    val inputPartitions = startOffsets.getOffsets
+      .keySet()
+      .asScala
+      .map {
+        tableBucket =>
+          val startOffset = startOffsets.getOffsets.get(tableBucket)
+          val stopOffset = stopOffsets.getOffsets.get(tableBucket)
+          // TODO read snapshot with startup mode.
+          FlussUpsertInputPartition(tableBucket, -1, startOffset, stopOffset)
+      }
+      .filter(e => e.logStartingOffset < e.logStoppingOffset)
+      .toArray
+    inputPartitions.map(_.asInstanceOf[InputPartition])
+  }
+
+  override def createReaderFactory(): PartitionReaderFactory = {
+    new FlussUpsertPartitionReaderFactory(tablePath, projection, options, 
flussConfig)
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussOffsetInitializers.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussOffsetInitializers.scala
index d1e2bc4ff..1f0a8806a 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussOffsetInitializers.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussOffsetInitializers.scala
@@ -17,7 +17,7 @@
 
 package org.apache.fluss.spark.read
 
-import org.apache.fluss.client.initializer.OffsetsInitializer
+import org.apache.fluss.client.initializer.{NoStoppingOffsetsInitializer, 
OffsetsInitializer}
 import org.apache.fluss.config.Configuration
 import org.apache.fluss.spark.SparkFlussConf
 
@@ -50,7 +50,7 @@ object FlussOffsetInitializers {
     if (isBatch) {
       OffsetsInitializer.latest()
     } else {
-      throw new UnsupportedOperationException("Stream read is not supported 
yet.")
+      new NoStoppingOffsetsInitializer()
     }
   }
 }
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
index 77ad081ff..a54396127 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
@@ -22,6 +22,7 @@ import org.apache.fluss.metadata.{TableInfo, TablePath}
 import org.apache.fluss.spark.SparkConversions
 
 import org.apache.spark.sql.connector.read.{Batch, Scan}
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -48,6 +49,16 @@ case class FlussAppendScan(
   override def toBatch: Batch = {
     new FlussAppendBatch(tablePath, tableInfo, readSchema, options, 
flussConfig)
   }
+
+  override def toMicroBatchStream(checkpointLocation: String): 
MicroBatchStream = {
+    new FlussAppendMicroBatchStream(
+      tablePath,
+      tableInfo,
+      readSchema,
+      options,
+      flussConfig,
+      checkpointLocation)
+  }
 }
 
 /** Fluss Upsert Scan. */
@@ -62,4 +73,14 @@ case class FlussUpsertScan(
   override def toBatch: Batch = {
     new FlussUpsertBatch(tablePath, tableInfo, readSchema, options, 
flussConfig)
   }
+
+  override def toMicroBatchStream(checkpointLocation: String): 
MicroBatchStream = {
+    new FlussUpsertMicroBatchStream(
+      tablePath,
+      tableInfo,
+      readSchema,
+      options,
+      flussConfig,
+      checkpointLocation)
+  }
 }
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
index cd4d17ec0..cd3e6768f 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 /** An interface that extends from Spark [[ScanBuilder]]. */
 trait FlussScanBuilder extends ScanBuilder with 
SupportsPushDownRequiredColumns {
 
-  protected var requiredSchema: Option[StructType] = _
+  protected var requiredSchema: Option[StructType] = None
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
     this.requiredSchema = Some(requiredSchema)
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
index c48a82dbd..e8e56a2a5 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
@@ -30,7 +30,6 @@ import org.apache.fluss.spark.utils.LogChangesIterator
 import org.apache.fluss.utils.CloseableIterator
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 import java.util.Comparator
 
@@ -210,6 +209,7 @@ class FlussUpsertPartitionReader(
 
   private def initialize(): Unit = {
     val currentTs = System.currentTimeMillis()
+    logInfo(s"Prepare read table $tablePath $flussPartition")
 
     val sortMergeReader = createSortMergeReader()
 
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala
index f7358139b..d9816d6c1 100644
--- 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala
@@ -17,11 +17,26 @@
 
 package org.apache.fluss.spark
 
-import org.apache.spark.sql.execution.streaming.MemoryStream
-import org.apache.spark.sql.streaming.StreamTest
+import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, 
OffsetsInitializer}
+import org.apache.fluss.client.table.Table
+import org.apache.fluss.spark.read.{FlussMicroBatchStream, FlussSourceOffset}
+import org.apache.fluss.spark.write.{FlussAppendDataWriter, 
FlussUpsertDataWriter}
+import org.apache.fluss.utils.json.TableBucketOffsets
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
+import 
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecution}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
+import org.apache.spark.unsafe.types.UTF8String
 
 import java.io.File
 
+import scala.collection.JavaConverters._
+
 class SparkStreamingTest extends FlussSparkTestBase with StreamTest {
   import testImplicits._
 
@@ -132,4 +147,265 @@ class SparkStreamingTest extends FlussSparkTestBase with 
StreamTest {
       runTestWithStream("t", input1, input2, expect1, expect2)
     }
   }
+
+  test("read: log table") {
+    val tableName = "t"
+    withTable(tableName) {
+      sql("CREATE TABLE t (id int, data string)")
+      sql("INSERT INTO t VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+      val schema = StructType(Seq(StructField("id", IntegerType), 
StructField("data", StringType)))
+
+      // Test with ProcessAllAvailable
+      testStream(spark.readStream.options(Map("scan.startup.mode" -> 
"latest")).table(tableName))(
+        ProcessAllAvailable(),
+        CheckLastBatch(),
+        StopStream,
+        StartStream(),
+        AddFlussData(tableName, schema, Seq(Row(4, "data4"), Row(5, "data5"))),
+        ProcessAllAvailable(),
+        CheckLastBatch(Row(4, "data4"), Row(5, "data5")),
+        CheckAnswer(Row(4, "data4"), Row(5, "data5"))
+      )
+
+      // Test with timed trigger
+      val clock = new StreamManualClock
+      testStream(spark.readStream.options(Map("scan.startup.mode" -> 
"latest")).table(tableName))(
+        StartStream(trigger = Trigger.ProcessingTime(500), clock),
+        AdvanceManualClock(500),
+        CheckNewAnswer(),
+        AddFlussData(tableName, schema, Seq(Row(4, "data4"), Row(5, "data5"))),
+        AdvanceManualClock(500),
+        CheckLastBatch(Row(4, "data4"), Row(5, "data5")),
+        CheckAnswer(Row(4, "data4"), Row(5, "data5")),
+        AddFlussData(tableName, schema, Seq(Row(6, "data6"), Row(7, "data7"))),
+        AdvanceManualClock(500),
+        CheckLastBatch(Row(6, "data6"), Row(7, "data7")),
+        CheckAnswer(Row(4, "data4"), Row(5, "data5"), Row(6, "data6"), Row(7, 
"data7"))
+      )
+    }
+  }
+
+  test("read: log partition table") {
+    val tableName = "t"
+    withTable(tableName) {
+      sql("CREATE TABLE t (id int, data string, pt string) PARTITIONED BY 
(pt)")
+      sql("INSERT INTO t VALUES (1, 'a', '11'), (2, 'b', '11'), (3, 'c', 
'22')")
+
+      val schema = StructType(
+        Seq(
+          StructField("id", IntegerType),
+          StructField("data", StringType),
+          StructField("pt", StringType)))
+
+      // Test with ProcessAllAvailable
+      testStream(spark.readStream.options(Map("scan.startup.mode" -> 
"latest")).table(tableName))(
+        ProcessAllAvailable(),
+        CheckLastBatch(),
+        StopStream,
+        StartStream(),
+        AddFlussData(tableName, schema, Seq(Row(4, "data4", "22"), Row(5, 
"data5", "11"))),
+        ProcessAllAvailable(),
+        CheckLastBatch(Row(4, "data4", "22"), Row(5, "data5", "11")),
+        CheckAnswer(Row(4, "data4", "22"), Row(5, "data5", "11"))
+      )
+
+      // Test with timed trigger
+      val clock = new StreamManualClock
+      testStream(spark.readStream.options(Map("scan.startup.mode" -> 
"latest")).table(tableName))(
+        StartStream(trigger = Trigger.ProcessingTime(500), clock),
+        AdvanceManualClock(500),
+        CheckNewAnswer(),
+        AddFlussData(tableName, schema, Seq(Row(4, "data4", "22"), Row(5, 
"data5", "11"))),
+        AdvanceManualClock(500),
+        CheckLastBatch(Row(4, "data4", "22"), Row(5, "data5", "11")),
+        CheckAnswer(Row(4, "data4", "22"), Row(5, "data5", "11")),
+        AddFlussData(tableName, schema, Seq(Row(6, "data6", "22"), Row(7, 
"data7", "11"))),
+        AdvanceManualClock(500),
+        CheckLastBatch(Row(6, "data6", "22"), Row(7, "data7", "11")),
+        CheckAnswer(
+          Row(4, "data4", "22"),
+          Row(5, "data5", "11"),
+          Row(6, "data6", "22"),
+          Row(7, "data7", "11"))
+      )
+    }
+  }
+
+  test("read: primary key table") {
+    val tableName = "t"
+    withTable(tableName) {
+      sql(
+        "CREATE TABLE t (pk1 int, pk2 string, id int, data string) 
TBLPROPERTIES('primary.key' = 'pk1, pk2',  'bucket.num' = 1)")
+      sql("INSERT INTO t VALUES (1, 'a', 11, 'aa'), (2, 'b', 22, 'bb'), (3, 
'c', 33, 'cc')")
+
+      val schema = StructType(
+        Seq(
+          StructField("pk1", IntegerType),
+          StructField("pk2", StringType),
+          StructField("id", IntegerType),
+          StructField("data", StringType)))
+
+      // Test with ProcessAllAvailable
+      testStream(spark.readStream.options(Map("scan.startup.mode" -> 
"latest")).table(tableName))(
+        ProcessAllAvailable(),
+        CheckLastBatch(),
+        StopStream,
+        StartStream(),
+        AddFlussData(tableName, schema, Seq(Row(4, "data4", 44, "dd"), Row(5, 
"data5", 55, "ee"))),
+        ProcessAllAvailable(),
+        CheckLastBatch(Row(4, "data4", 44, "dd"), Row(5, "data5", 55, "ee")),
+        CheckAnswer(Row(4, "data4", 44, "dd"), Row(5, "data5", 55, "ee"))
+      )
+
+      // Test with timed trigger
+      val clock = new StreamManualClock
+      testStream(spark.readStream.options(Map("scan.startup.mode" -> 
"latest")).table(tableName))(
+        StartStream(trigger = Trigger.ProcessingTime(500), clock),
+        AdvanceManualClock(500),
+        CheckNewAnswer(),
+        AddFlussData(tableName, schema, Seq(Row(4, "data4", 44, "dd"), Row(5, 
"data5", 55, "ee"))),
+        AdvanceManualClock(500),
+        CheckLastBatch(Row(4, "data4", 44, "dd"), Row(5, "data5", 55, "ee")),
+        CheckAnswer(Row(4, "data4", 44, "dd"), Row(5, "data5", 55, "ee")),
+        AddFlussData(tableName, schema, Seq(Row(6, "data6", 66, "ff"), Row(7, 
"data7", 77, "gg"))),
+        AdvanceManualClock(500),
+        CheckLastBatch(Row(6, "data6", 66, "ff"), Row(7, "data7", 77, "gg")),
+        CheckAnswer(
+          Row(4, "data4", 44, "dd"),
+          Row(5, "data5", 55, "ee"),
+          Row(6, "data6", 66, "ff"),
+          Row(7, "data7", 77, "gg"))
+      )
+    }
+  }
+
+  test("read: primary key partition table") {
+    val tableName = "t"
+    withTable(tableName) {
+      sql(
+        "CREATE TABLE t (pk1 int, pk2 string, id int, data string, dt string) 
PARTITIONED BY(dt) TBLPROPERTIES('primary.key' = 'pk1, pk2, dt',  'bucket.num' 
= 1)")
+      sql(
+        "INSERT INTO t VALUES (1, 'a', 11, 'aa', 'a'), (2, 'b', 22, 'bb', 
'b'), (3, 'c', 33, 'cc', 'b')")
+
+      val schema = StructType(
+        Seq(
+          StructField("pk1", IntegerType),
+          StructField("pk2", StringType),
+          StructField("id", IntegerType),
+          StructField("data", StringType),
+          StructField("dt", StringType)
+        ))
+
+      // Test with ProcessAllAvailable
+      testStream(spark.readStream.options(Map("scan.startup.mode" -> 
"latest")).table(tableName))(
+        ProcessAllAvailable(),
+        CheckLastBatch(),
+        StopStream,
+        StartStream(),
+        AddFlussData(
+          tableName,
+          schema,
+          Seq(Row(4, "data4", 44, "dd", "a"), Row(5, "data5", 55, "ee", "b"))),
+        ProcessAllAvailable(),
+        CheckAnswer(Row(4, "data4", 44, "dd", "a"), Row(5, "data5", 55, "ee", 
"b"))
+      )
+
+      // Test with timed trigger
+      val clock = new StreamManualClock
+      testStream(spark.readStream.options(Map("scan.startup.mode" -> 
"latest")).table(tableName))(
+        StartStream(trigger = Trigger.ProcessingTime(500), clock),
+        AdvanceManualClock(500),
+        CheckNewAnswer(),
+        AddFlussData(
+          tableName,
+          schema,
+          Seq(Row(4, "data4", 44, "dd", "a"), Row(5, "data5", 55, "ee", "b"))),
+        AdvanceManualClock(500),
+        CheckLastBatch(Row(4, "data4", 44, "dd", "a"), Row(5, "data5", 55, 
"ee", "b")),
+        CheckAnswer(Row(4, "data4", 44, "dd", "a"), Row(5, "data5", 55, "ee", 
"b")),
+        AddFlussData(
+          tableName,
+          schema,
+          Seq(Row(6, "data6", 66, "ff", "b"), Row(7, "data7", 77, "gg", "a"))),
+        AdvanceManualClock(500),
+        CheckLastBatch(Row(6, "data6", 66, "ff", "b"), Row(7, "data7", 77, 
"gg", "a")),
+        CheckAnswer(
+          Row(4, "data4", 44, "dd", "a"),
+          Row(5, "data5", 55, "ee", "b"),
+          Row(6, "data6", 66, "ff", "b"),
+          Row(7, "data7", 77, "gg", "a"))
+      )
+    }
+  }
+
+  private def writeToLogTable(table: Table, schema: StructType, dataArr: 
Seq[Row]): Unit = {
+    val writer = if (table.getTableInfo.hasPrimaryKey) {
+      FlussUpsertDataWriter(table.getTableInfo.getTablePath, schema, 
conn.getConfiguration)
+    } else {
+      FlussAppendDataWriter(table.getTableInfo.getTablePath, schema, 
conn.getConfiguration)
+    }
+    val rows = dataArr.map {
+      row =>
+        val internalRow = InternalRow.fromSeq(row.toSeq.map {
+          case v: String => UTF8String.fromString(v)
+          case v => v
+        })
+        internalRow
+    }
+    rows.foreach(internalRow => writer.write(internalRow))
+    writer.commit()
+  }
+
+  case class AddFlussData[T](tableName: String, schema: StructType, dataArr: 
Seq[Row])
+    extends AddData {
+    override def addData(query: Option[StreamExecution]): (SparkDataStream, 
Offset) = {
+      require(
+        query.nonEmpty,
+        "Cannot add data when there is no query for finding the active fluss 
stream source")
+      val sources: Seq[FlussMicroBatchStream] = {
+        query.get.logicalPlan.collect {
+          case r: StreamingDataSourceV2Relation if 
r.stream.isInstanceOf[FlussMicroBatchStream] =>
+            r.stream
+        }
+      }.distinct.map(_.asInstanceOf[FlussMicroBatchStream])
+      if (!sources.exists(s => 
s.tablePath.equals(createTablePath(tableName)))) {
+        throw new IllegalArgumentException(
+          s"Could not find fluss stream source for table $tableName")
+      }
+
+      val flussTable = loadFlussTable(createTablePath(tableName))
+      writeToLogTable(flussTable, schema, dataArr)
+
+      val flussSource = sources.filter(s => 
s.tablePath.equals(createTablePath(tableName))).head
+      val buckets = (0 until flussTable.getTableInfo.getNumBuckets).toSeq
+
+      val offsetsInitializer = OffsetsInitializer.latest()
+      val tableBucketOffsets = if (flussTable.getTableInfo.isPartitioned) {
+        val partitionInfos = 
admin.listPartitionInfos(flussTable.getTableInfo.getTablePath).get()
+        val partitionOffsets = partitionInfos.asScala.map(
+          partitionInfo =>
+            FlussMicroBatchStream.getLatestOffsets(
+              flussTable.getTableInfo,
+              offsetsInitializer,
+              new BucketOffsetsRetrieverImpl(admin, 
flussTable.getTableInfo.getTablePath),
+              buckets,
+              Some(partitionInfo)))
+        val mergedOffsets = partitionOffsets
+          .map(_.getOffsets)
+          .reduce((l, r) => (l.asScala ++ r.asScala).asJava)
+        new TableBucketOffsets(flussTable.getTableInfo.getTableId, 
mergedOffsets)
+      } else {
+        FlussMicroBatchStream.getLatestOffsets(
+          flussTable.getTableInfo,
+          offsetsInitializer,
+          new BucketOffsetsRetrieverImpl(admin, 
flussTable.getTableInfo.getTablePath),
+          buckets,
+          None)
+      }
+
+      val offset = FlussSourceOffset(tableBucketOffsets)
+      (flussSource, offset)
+    }
+  }
 }

Reply via email to