This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 5d7630a37 [spark] Add basic streaming read support for sparksql with
latest mode (#2548)
5d7630a37 is described below
commit 5d7630a37b9ad27ec625e9c1e8c8c9a29ba5dfe5
Author: Yang Zhang <[email protected]>
AuthorDate: Sun Feb 8 12:14:47 2026 +0800
[spark] Add basic streaming read support for sparksql with latest mode
(#2548)
---
.../fluss/spark/catalog/AbstractSparkTable.scala | 1 +
.../spark/read/FlussAppendPartitionReader.scala | 3 +-
.../fluss/spark/read/FlussInputPartition.scala | 16 +-
.../fluss/spark/read/FlussMicroBatchStream.scala | 346 +++++++++++++++++++++
.../fluss/spark/read/FlussOffsetInitializers.scala | 4 +-
.../org/apache/fluss/spark/read/FlussScan.scala | 21 ++
.../apache/fluss/spark/read/FlussScanBuilder.scala | 2 +-
.../spark/read/FlussUpsertPartitionReader.scala | 2 +-
.../apache/fluss/spark/SparkStreamingTest.scala | 280 ++++++++++++++++-
9 files changed, 665 insertions(+), 10 deletions(-)
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
index e64f42a51..f9856a919 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
@@ -46,6 +46,7 @@ abstract class AbstractSparkTable(val admin: Admin, val
tableInfo: TableInfo) ex
Set(
TableCapability.BATCH_READ,
TableCapability.BATCH_WRITE,
+ TableCapability.MICRO_BATCH_READ,
TableCapability.STREAMING_WRITE
).asJava
}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
index 54570285a..520d6ae71 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
@@ -85,8 +85,7 @@ class FlussAppendPartitionReader(
if (flussPartition.startOffset >= flussPartition.stopOffset) {
throw new IllegalArgumentException(s"Invalid offset range
$flussPartition")
}
- logInfo(s"Prepare read table $tablePath partition $partitionId bucket
$bucketId" +
- s" with start offset ${flussPartition.startOffset} stop offset
${flussPartition.stopOffset}")
+ logInfo(s"Prepare read table $tablePath $flussPartition")
if (partitionId != null) {
logScanner.subscribe(partitionId, bucketId, flussPartition.startOffset)
} else {
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
index b573828cf..397b62430 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
@@ -37,7 +37,13 @@ trait FlussInputPartition extends InputPartition {
* the table bucket to read from
*/
case class FlussAppendInputPartition(tableBucket: TableBucket, startOffset:
Long, stopOffset: Long)
- extends FlussInputPartition
+ extends FlussInputPartition {
+ override def toString: String = {
+ s"FlussAppendInputPartition{tableId=${tableBucket.getTableId},
bucketId=${tableBucket.getBucket}," +
+ s" partitionId=${tableBucket.getPartitionId}" +
+ s" logStartOffset=$startOffset, logStopOffset=$stopOffset"
+ }
+}
/**
* Represents an input partition for reading data from a primary key table
bucket. This partition
@@ -57,4 +63,10 @@ case class FlussUpsertInputPartition(
snapshotId: Long,
logStartingOffset: Long,
logStoppingOffset: Long)
- extends FlussInputPartition
+ extends FlussInputPartition {
+ override def toString: String = {
+ s"FlussUpsertInputPartition{tableId=${tableBucket.getTableId},
bucketId=${tableBucket.getBucket}," +
+ s" partitionId=${tableBucket.getPartitionId}, snapshotId=$snapshotId," +
+ s" logStartOffset=$logStartingOffset, logStopOffset=$logStoppingOffset"
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala
new file mode 100644
index 000000000..e3d77a9d7
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussMicroBatchStream.scala
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.{Connection, ConnectionFactory}
+import org.apache.fluss.client.admin.Admin
+import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl,
OffsetsInitializer}
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.{PartitionInfo, TableBucket, TableInfo,
TablePath}
+import org.apache.fluss.utils.json.TableBucketOffsets
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connector.read.{InputPartition,
PartitionReaderFactory}
+import org.apache.spark.sql.connector.read.streaming._
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.util
+import java.util.Optional
+
+import scala.collection.JavaConverters._
+
+case class FlussSourceOffset(tableBucketOffsets: TableBucketOffsets) extends
Offset {
+ override val json: String = new String(tableBucketOffsets.toJsonBytes,
"utf-8")
+}
+
+abstract class FlussMicroBatchStream(
+ val tablePath: TablePath,
+ tableInfo: TableInfo,
+ readSchema: StructType,
+ options: CaseInsensitiveStringMap,
+ flussConfig: Configuration,
+ checkpointLocation: String)
+ extends SupportsTriggerAvailableNow
+ with ReportsSourceMetrics
+ with MicroBatchStream
+ with Logging
+ with AutoCloseable {
+
+ lazy val conn: Connection = ConnectionFactory.createConnection(flussConfig)
+
+ lazy val admin: Admin = conn.getAdmin
+
+ lazy val bucketOffsetsRetriever: BucketOffsetsRetrieverImpl =
+ new BucketOffsetsRetrieverImpl(admin, tableInfo.getTablePath)
+
+ lazy val partitionInfos: util.List[PartitionInfo] =
admin.listPartitionInfos(tablePath).get()
+
+ private var allDataForTriggerAvailableNow: Option[TableBucketOffsets] = None
+
+ val startOffsetsInitializer: OffsetsInitializer =
+ FlussOffsetInitializers.startOffsetsInitializer(options, flussConfig)
+
+ val stoppingOffsetsInitializer: OffsetsInitializer =
+ FlussOffsetInitializers.stoppingOffsetsInitializer(false, options,
flussConfig)
+
+ protected def projection: Array[Int] = {
+ val columnNameToIndex =
tableInfo.getSchema.getColumnNames.asScala.zipWithIndex.toMap
+ readSchema.fields.map {
+ field =>
+ columnNameToIndex.getOrElse(
+ field.name,
+ throw new IllegalArgumentException(s"Invalid field name:
${field.name}"))
+ }
+ }
+
+ override def close(): Unit = {
+ if (admin != null) {
+ admin.close()
+ }
+ if (conn != null) {
+ conn.close()
+ }
+ }
+
+ override def latestOffset(): Offset = {
+ throw new UnsupportedOperationException(
+ "latestOffset(Offset, ReadLimit) should be called instead of this
method")
+ }
+
+ override def getDefaultReadLimit: ReadLimit = {
+ ReadLimit.allAvailable()
+ }
+
+ override def initialOffset(): Offset = {
+ val initialTableBucketOffsets = getOrCreateInitialPartitionOffsets()
+ FlussSourceOffset(initialTableBucketOffsets)
+ }
+
+ override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = {
+ if (!readLimit.isInstanceOf[ReadAllAvailable]) {
+ throw new UnsupportedOperationException(s"Only ReadAllAvailable is
supported, but $readLimit")
+ }
+
+ val latestTableBucketOffsets = if
(allDataForTriggerAvailableNow.isDefined) {
+ allDataForTriggerAvailableNow.get
+ } else {
+ fetchLatestOffsets().get
+ }
+ FlussSourceOffset(latestTableBucketOffsets)
+ }
+
+ override def prepareForTriggerAvailableNow(): Unit = {
+ allDataForTriggerAvailableNow = fetchLatestOffsets()
+ }
+
+ private def fetchLatestOffsets(): Option[TableBucketOffsets] = {
+ val buckets = (0 until tableInfo.getNumBuckets).toSeq
+ val offsetsInitializer = OffsetsInitializer.latest()
+ if (tableInfo.isPartitioned) {
+ val partitionOffsets = partitionInfos.asScala.map(
+ partitionInfo =>
+ FlussMicroBatchStream.getLatestOffsets(
+ tableInfo,
+ offsetsInitializer,
+ bucketOffsetsRetriever,
+ buckets,
+ Some(partitionInfo)))
+ val mergedOffsets = partitionOffsets
+ .map(_.getOffsets)
+ .reduce((l, r) => (l.asScala ++ r.asScala).asJava)
+ Some(new TableBucketOffsets(tableInfo.getTableId, mergedOffsets))
+ } else {
+ Some(
+ FlussMicroBatchStream
+ .getLatestOffsets(tableInfo, offsetsInitializer,
bucketOffsetsRetriever, buckets, None))
+ }
+ }
+
+ // No need to notify fluss server
+ override def commit(end: Offset): Unit = {}
+
+ override def stop(): Unit = close()
+
+ override def deserializeOffset(json: String): Offset = {
+ FlussSourceOffset(TableBucketOffsets.fromJsonBytes(json.getBytes("utf-8")))
+ }
+
+ override def metrics(latestConsumedOffset: Optional[Offset]):
util.Map[String, String] = {
+ // TODO add metrics
+ Map.empty[String, String].asJava
+ }
+
+ private def getOrCreateInitialPartitionOffsets(): TableBucketOffsets = {
+ if (tableInfo.isPartitioned) {
+ initPartitionedSplits()
+ } else {
+ initNonPartitionedSplits()
+ }
+ }
+
+ private def initPartitionedSplits(): TableBucketOffsets = {
+ val partitionOffsets = partitionInfos.asScala.map {
+ partitionInfo =>
+ if (tableInfo.hasPrimaryKey) {
+ getSnapshotAndLogSplits(Some(partitionInfo))
+ } else {
+ getLogSplit(Some(partitionInfo))
+ }
+ }
+
+ val mergedOffsets = partitionOffsets
+ .map(_.getOffsets)
+ .reduce((l, r) => (l.asScala ++ r.asScala).asJava)
+
+ new TableBucketOffsets(tableInfo.getTableId, mergedOffsets)
+ }
+
+ private def initNonPartitionedSplits(): TableBucketOffsets = {
+ if (tableInfo.hasPrimaryKey) {
+ getSnapshotAndLogSplits(None)
+ } else {
+ getLogSplit(None)
+ }
+ }
+
+ private def getSnapshotAndLogSplits(partitionInfo: Option[PartitionInfo]):
TableBucketOffsets = {
+ // TODO read snapshot when more startup mode supported
+ getLogSplit(partitionInfo)
+ }
+
+ private def getLogSplit(partitionInfo: Option[PartitionInfo]):
TableBucketOffsets = {
+ val buckets = (0 until tableInfo.getNumBuckets).toSeq
+ FlussMicroBatchStream.getLatestOffsets(
+ tableInfo,
+ startOffsetsInitializer,
+ bucketOffsetsRetriever,
+ buckets,
+ partitionInfo)
+ }
+}
+
+object FlussMicroBatchStream {
+ def getLatestOffsets(
+ tableInfo: TableInfo,
+ offsetsInitializer: OffsetsInitializer,
+ bucketOffsetsRetrieverImpl: BucketOffsetsRetrieverImpl,
+ buckets: Seq[Int],
+ partitionInfo: Option[PartitionInfo]): TableBucketOffsets = {
+ val latestOffsets = partitionInfo match {
+ case Some(partitionInfo) =>
+ offsetsInitializer
+ .getBucketOffsets(
+ partitionInfo.getPartitionName,
+ buckets.map(Integer.valueOf).asJava,
+ bucketOffsetsRetrieverImpl)
+ .asScala
+ .map {
+ case (bucket, offset) =>
+ val tableBucket =
+ new TableBucket(tableInfo.getTableId,
partitionInfo.getPartitionId, bucket)
+ tableBucket -> offset
+ }
+
+ case None =>
+ offsetsInitializer
+ .getBucketOffsets(null, buckets.map(Integer.valueOf).asJava,
bucketOffsetsRetrieverImpl)
+ .asScala
+ .map {
+ case (bucket, offset) =>
+ val tableBucket = new TableBucket(tableInfo.getTableId, bucket)
+ tableBucket -> offset
+ }
+ }
+ new TableBucketOffsets(
+ tableInfo.getTableId,
+ latestOffsets.map(e => (e._1, long2Long(e._2))).asJava)
+ }
+}
+
+/** Batch for reading log table (append-only table). */
+class FlussAppendMicroBatchStream(
+ tablePath: TablePath,
+ tableInfo: TableInfo,
+ readSchema: StructType,
+ options: CaseInsensitiveStringMap,
+ flussConfig: Configuration,
+ checkpointLocation: String)
+ extends FlussMicroBatchStream(
+ tablePath,
+ tableInfo,
+ readSchema,
+ options,
+ flussConfig,
+ checkpointLocation) {
+
+ override def createReaderFactory(): PartitionReaderFactory = {
+ new FlussAppendPartitionReaderFactory(tablePath, projection, options,
flussConfig)
+ }
+
+ override def planInputPartitions(start: Offset, end: Offset):
Array[InputPartition] = {
+ // TODO process new partition and deleted partition
+ val startOffsets = start.asInstanceOf[FlussSourceOffset].tableBucketOffsets
+ val stopOffsets = end.asInstanceOf[FlussSourceOffset].tableBucketOffsets
+
+ if (
+ startOffsets.getOffsets
+ .keySet()
+ .asScala
+ .diff(stopOffsets.getOffsets.keySet().asScala)
+ .nonEmpty
+ ) {
+ throw new IllegalArgumentException(
+ "start and end offset must have the same table bucket info")
+ }
+
+ val inputPartitions = startOffsets.getOffsets
+ .keySet()
+ .asScala
+ .map {
+ tableBucket =>
+ val startOffset = startOffsets.getOffsets.get(tableBucket)
+ val stopOffset = stopOffsets.getOffsets.get(tableBucket)
+ FlussAppendInputPartition(tableBucket, startOffset, stopOffset)
+ }
+ .filter(e => e.startOffset < e.stopOffset)
+ .toArray
+ inputPartitions.map(_.asInstanceOf[InputPartition])
+ }
+}
+
+class FlussUpsertMicroBatchStream(
+ tablePath: TablePath,
+ tableInfo: TableInfo,
+ readSchema: StructType,
+ options: CaseInsensitiveStringMap,
+ flussConfig: Configuration,
+ checkpointLocation: String)
+ extends FlussMicroBatchStream(
+ tablePath,
+ tableInfo,
+ readSchema,
+ options,
+ flussConfig,
+ checkpointLocation) {
+
+ override def planInputPartitions(start: Offset, end: Offset):
Array[InputPartition] = {
+ // TODO process new partition and deleted partition
+ val startOffsets = start.asInstanceOf[FlussSourceOffset].tableBucketOffsets
+ val stopOffsets = end.asInstanceOf[FlussSourceOffset].tableBucketOffsets
+
+ if (
+ startOffsets.getOffsets
+ .keySet()
+ .asScala
+ .diff(stopOffsets.getOffsets.keySet().asScala)
+ .nonEmpty
+ ) {
+ throw new IllegalArgumentException(
+ "start and end offset must have the same table bucket info")
+ }
+
+ val inputPartitions = startOffsets.getOffsets
+ .keySet()
+ .asScala
+ .map {
+ tableBucket =>
+ val startOffset = startOffsets.getOffsets.get(tableBucket)
+ val stopOffset = stopOffsets.getOffsets.get(tableBucket)
+ // TODO read snapshot with startup mode.
+ FlussUpsertInputPartition(tableBucket, -1, startOffset, stopOffset)
+ }
+ .filter(e => e.logStartingOffset < e.logStoppingOffset)
+ .toArray
+ inputPartitions.map(_.asInstanceOf[InputPartition])
+ }
+
+ override def createReaderFactory(): PartitionReaderFactory = {
+ new FlussUpsertPartitionReaderFactory(tablePath, projection, options,
flussConfig)
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussOffsetInitializers.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussOffsetInitializers.scala
index d1e2bc4ff..1f0a8806a 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussOffsetInitializers.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussOffsetInitializers.scala
@@ -17,7 +17,7 @@
package org.apache.fluss.spark.read
-import org.apache.fluss.client.initializer.OffsetsInitializer
+import org.apache.fluss.client.initializer.{NoStoppingOffsetsInitializer,
OffsetsInitializer}
import org.apache.fluss.config.Configuration
import org.apache.fluss.spark.SparkFlussConf
@@ -50,7 +50,7 @@ object FlussOffsetInitializers {
if (isBatch) {
OffsetsInitializer.latest()
} else {
- throw new UnsupportedOperationException("Stream read is not supported
yet.")
+ new NoStoppingOffsetsInitializer()
}
}
}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
index 77ad081ff..a54396127 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
@@ -22,6 +22,7 @@ import org.apache.fluss.metadata.{TableInfo, TablePath}
import org.apache.fluss.spark.SparkConversions
import org.apache.spark.sql.connector.read.{Batch, Scan}
+import org.apache.spark.sql.connector.read.streaming.MicroBatchStream
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -48,6 +49,16 @@ case class FlussAppendScan(
override def toBatch: Batch = {
new FlussAppendBatch(tablePath, tableInfo, readSchema, options,
flussConfig)
}
+
+ override def toMicroBatchStream(checkpointLocation: String):
MicroBatchStream = {
+ new FlussAppendMicroBatchStream(
+ tablePath,
+ tableInfo,
+ readSchema,
+ options,
+ flussConfig,
+ checkpointLocation)
+ }
}
/** Fluss Upsert Scan. */
@@ -62,4 +73,14 @@ case class FlussUpsertScan(
override def toBatch: Batch = {
new FlussUpsertBatch(tablePath, tableInfo, readSchema, options,
flussConfig)
}
+
+ override def toMicroBatchStream(checkpointLocation: String):
MicroBatchStream = {
+ new FlussUpsertMicroBatchStream(
+ tablePath,
+ tableInfo,
+ readSchema,
+ options,
+ flussConfig,
+ checkpointLocation)
+ }
}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
index cd4d17ec0..cd3e6768f 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
/** An interface that extends from Spark [[ScanBuilder]]. */
trait FlussScanBuilder extends ScanBuilder with
SupportsPushDownRequiredColumns {
- protected var requiredSchema: Option[StructType] = _
+ protected var requiredSchema: Option[StructType] = None
override def pruneColumns(requiredSchema: StructType): Unit = {
this.requiredSchema = Some(requiredSchema)
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
index c48a82dbd..e8e56a2a5 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
@@ -30,7 +30,6 @@ import org.apache.fluss.spark.utils.LogChangesIterator
import org.apache.fluss.utils.CloseableIterator
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
import java.util.Comparator
@@ -210,6 +209,7 @@ class FlussUpsertPartitionReader(
private def initialize(): Unit = {
val currentTs = System.currentTimeMillis()
+ logInfo(s"Prepare read table $tablePath $flussPartition")
val sortMergeReader = createSortMergeReader()
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala
index f7358139b..d9816d6c1 100644
---
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala
@@ -17,11 +17,26 @@
package org.apache.fluss.spark
-import org.apache.spark.sql.execution.streaming.MemoryStream
-import org.apache.spark.sql.streaming.StreamTest
+import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl,
OffsetsInitializer}
+import org.apache.fluss.client.table.Table
+import org.apache.fluss.spark.read.{FlussMicroBatchStream, FlussSourceOffset}
+import org.apache.fluss.spark.write.{FlussAppendDataWriter,
FlussUpsertDataWriter}
+import org.apache.fluss.utils.json.TableBucketOffsets
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
+import
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecution}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField,
StructType}
+import org.apache.spark.unsafe.types.UTF8String
import java.io.File
+import scala.collection.JavaConverters._
+
class SparkStreamingTest extends FlussSparkTestBase with StreamTest {
import testImplicits._
@@ -132,4 +147,265 @@ class SparkStreamingTest extends FlussSparkTestBase with
StreamTest {
runTestWithStream("t", input1, input2, expect1, expect2)
}
}
+
+ test("read: log table") {
+ val tableName = "t"
+ withTable(tableName) {
+ sql("CREATE TABLE t (id int, data string)")
+ sql("INSERT INTO t VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+ val schema = StructType(Seq(StructField("id", IntegerType),
StructField("data", StringType)))
+
+ // Test with ProcessAllAvailable
+ testStream(spark.readStream.options(Map("scan.startup.mode" ->
"latest")).table(tableName))(
+ ProcessAllAvailable(),
+ CheckLastBatch(),
+ StopStream,
+ StartStream(),
+ AddFlussData(tableName, schema, Seq(Row(4, "data4"), Row(5, "data5"))),
+ ProcessAllAvailable(),
+ CheckLastBatch(Row(4, "data4"), Row(5, "data5")),
+ CheckAnswer(Row(4, "data4"), Row(5, "data5"))
+ )
+
+ // Test with timed trigger
+ val clock = new StreamManualClock
+ testStream(spark.readStream.options(Map("scan.startup.mode" ->
"latest")).table(tableName))(
+ StartStream(trigger = Trigger.ProcessingTime(500), clock),
+ AdvanceManualClock(500),
+ CheckNewAnswer(),
+ AddFlussData(tableName, schema, Seq(Row(4, "data4"), Row(5, "data5"))),
+ AdvanceManualClock(500),
+ CheckLastBatch(Row(4, "data4"), Row(5, "data5")),
+ CheckAnswer(Row(4, "data4"), Row(5, "data5")),
+ AddFlussData(tableName, schema, Seq(Row(6, "data6"), Row(7, "data7"))),
+ AdvanceManualClock(500),
+ CheckLastBatch(Row(6, "data6"), Row(7, "data7")),
+ CheckAnswer(Row(4, "data4"), Row(5, "data5"), Row(6, "data6"), Row(7,
"data7"))
+ )
+ }
+ }
+
+ test("read: log partition table") {
+ val tableName = "t"
+ withTable(tableName) {
+ sql("CREATE TABLE t (id int, data string, pt string) PARTITIONED BY
(pt)")
+ sql("INSERT INTO t VALUES (1, 'a', '11'), (2, 'b', '11'), (3, 'c',
'22')")
+
+ val schema = StructType(
+ Seq(
+ StructField("id", IntegerType),
+ StructField("data", StringType),
+ StructField("pt", StringType)))
+
+ // Test with ProcessAllAvailable
+ testStream(spark.readStream.options(Map("scan.startup.mode" ->
"latest")).table(tableName))(
+ ProcessAllAvailable(),
+ CheckLastBatch(),
+ StopStream,
+ StartStream(),
+ AddFlussData(tableName, schema, Seq(Row(4, "data4", "22"), Row(5,
"data5", "11"))),
+ ProcessAllAvailable(),
+ CheckLastBatch(Row(4, "data4", "22"), Row(5, "data5", "11")),
+ CheckAnswer(Row(4, "data4", "22"), Row(5, "data5", "11"))
+ )
+
+ // Test with timed trigger
+ val clock = new StreamManualClock
+ testStream(spark.readStream.options(Map("scan.startup.mode" ->
"latest")).table(tableName))(
+ StartStream(trigger = Trigger.ProcessingTime(500), clock),
+ AdvanceManualClock(500),
+ CheckNewAnswer(),
+ AddFlussData(tableName, schema, Seq(Row(4, "data4", "22"), Row(5,
"data5", "11"))),
+ AdvanceManualClock(500),
+ CheckLastBatch(Row(4, "data4", "22"), Row(5, "data5", "11")),
+ CheckAnswer(Row(4, "data4", "22"), Row(5, "data5", "11")),
+ AddFlussData(tableName, schema, Seq(Row(6, "data6", "22"), Row(7,
"data7", "11"))),
+ AdvanceManualClock(500),
+ CheckLastBatch(Row(6, "data6", "22"), Row(7, "data7", "11")),
+ CheckAnswer(
+ Row(4, "data4", "22"),
+ Row(5, "data5", "11"),
+ Row(6, "data6", "22"),
+ Row(7, "data7", "11"))
+ )
+ }
+ }
+
+ test("read: primary key table") {
+ val tableName = "t"
+ withTable(tableName) {
+ sql(
+ "CREATE TABLE t (pk1 int, pk2 string, id int, data string)
TBLPROPERTIES('primary.key' = 'pk1, pk2', 'bucket.num' = 1)")
+ sql("INSERT INTO t VALUES (1, 'a', 11, 'aa'), (2, 'b', 22, 'bb'), (3,
'c', 33, 'cc')")
+
+ val schema = StructType(
+ Seq(
+ StructField("pk1", IntegerType),
+ StructField("pk2", StringType),
+ StructField("id", IntegerType),
+ StructField("data", StringType)))
+
+ // Test with ProcessAllAvailable
+ testStream(spark.readStream.options(Map("scan.startup.mode" ->
"latest")).table(tableName))(
+ ProcessAllAvailable(),
+ CheckLastBatch(),
+ StopStream,
+ StartStream(),
+ AddFlussData(tableName, schema, Seq(Row(4, "data4", 44, "dd"), Row(5,
"data5", 55, "ee"))),
+ ProcessAllAvailable(),
+ CheckLastBatch(Row(4, "data4", 44, "dd"), Row(5, "data5", 55, "ee")),
+ CheckAnswer(Row(4, "data4", 44, "dd"), Row(5, "data5", 55, "ee"))
+ )
+
+ // Test with timed trigger
+ val clock = new StreamManualClock
+ testStream(spark.readStream.options(Map("scan.startup.mode" ->
"latest")).table(tableName))(
+ StartStream(trigger = Trigger.ProcessingTime(500), clock),
+ AdvanceManualClock(500),
+ CheckNewAnswer(),
+ AddFlussData(tableName, schema, Seq(Row(4, "data4", 44, "dd"), Row(5,
"data5", 55, "ee"))),
+ AdvanceManualClock(500),
+ CheckLastBatch(Row(4, "data4", 44, "dd"), Row(5, "data5", 55, "ee")),
+ CheckAnswer(Row(4, "data4", 44, "dd"), Row(5, "data5", 55, "ee")),
+ AddFlussData(tableName, schema, Seq(Row(6, "data6", 66, "ff"), Row(7,
"data7", 77, "gg"))),
+ AdvanceManualClock(500),
+ CheckLastBatch(Row(6, "data6", 66, "ff"), Row(7, "data7", 77, "gg")),
+ CheckAnswer(
+ Row(4, "data4", 44, "dd"),
+ Row(5, "data5", 55, "ee"),
+ Row(6, "data6", 66, "ff"),
+ Row(7, "data7", 77, "gg"))
+ )
+ }
+ }
+
+ test("read: primary key partition table") {
+ val tableName = "t"
+ withTable(tableName) {
+ sql(
+ "CREATE TABLE t (pk1 int, pk2 string, id int, data string, dt string)
PARTITIONED BY(dt) TBLPROPERTIES('primary.key' = 'pk1, pk2, dt', 'bucket.num'
= 1)")
+ sql(
+ "INSERT INTO t VALUES (1, 'a', 11, 'aa', 'a'), (2, 'b', 22, 'bb',
'b'), (3, 'c', 33, 'cc', 'b')")
+
+ val schema = StructType(
+ Seq(
+ StructField("pk1", IntegerType),
+ StructField("pk2", StringType),
+ StructField("id", IntegerType),
+ StructField("data", StringType),
+ StructField("dt", StringType)
+ ))
+
+ // Test with ProcessAllAvailable
+ testStream(spark.readStream.options(Map("scan.startup.mode" ->
"latest")).table(tableName))(
+ ProcessAllAvailable(),
+ CheckLastBatch(),
+ StopStream,
+ StartStream(),
+ AddFlussData(
+ tableName,
+ schema,
+ Seq(Row(4, "data4", 44, "dd", "a"), Row(5, "data5", 55, "ee", "b"))),
+ ProcessAllAvailable(),
+ CheckAnswer(Row(4, "data4", 44, "dd", "a"), Row(5, "data5", 55, "ee",
"b"))
+ )
+
+ // Test with timed trigger
+ val clock = new StreamManualClock
+ testStream(spark.readStream.options(Map("scan.startup.mode" ->
"latest")).table(tableName))(
+ StartStream(trigger = Trigger.ProcessingTime(500), clock),
+ AdvanceManualClock(500),
+ CheckNewAnswer(),
+ AddFlussData(
+ tableName,
+ schema,
+ Seq(Row(4, "data4", 44, "dd", "a"), Row(5, "data5", 55, "ee", "b"))),
+ AdvanceManualClock(500),
+ CheckLastBatch(Row(4, "data4", 44, "dd", "a"), Row(5, "data5", 55,
"ee", "b")),
+ CheckAnswer(Row(4, "data4", 44, "dd", "a"), Row(5, "data5", 55, "ee",
"b")),
+ AddFlussData(
+ tableName,
+ schema,
+ Seq(Row(6, "data6", 66, "ff", "b"), Row(7, "data7", 77, "gg", "a"))),
+ AdvanceManualClock(500),
+ CheckLastBatch(Row(6, "data6", 66, "ff", "b"), Row(7, "data7", 77,
"gg", "a")),
+ CheckAnswer(
+ Row(4, "data4", 44, "dd", "a"),
+ Row(5, "data5", 55, "ee", "b"),
+ Row(6, "data6", 66, "ff", "b"),
+ Row(7, "data7", 77, "gg", "a"))
+ )
+ }
+ }
+
+ private def writeToLogTable(table: Table, schema: StructType, dataArr:
Seq[Row]): Unit = {
+ val writer = if (table.getTableInfo.hasPrimaryKey) {
+ FlussUpsertDataWriter(table.getTableInfo.getTablePath, schema,
conn.getConfiguration)
+ } else {
+ FlussAppendDataWriter(table.getTableInfo.getTablePath, schema,
conn.getConfiguration)
+ }
+ val rows = dataArr.map {
+ row =>
+ val internalRow = InternalRow.fromSeq(row.toSeq.map {
+ case v: String => UTF8String.fromString(v)
+ case v => v
+ })
+ internalRow
+ }
+ rows.foreach(internalRow => writer.write(internalRow))
+ writer.commit()
+ }
+
+ case class AddFlussData[T](tableName: String, schema: StructType, dataArr:
Seq[Row])
+ extends AddData {
+ override def addData(query: Option[StreamExecution]): (SparkDataStream,
Offset) = {
+ require(
+ query.nonEmpty,
+ "Cannot add data when there is no query for finding the active fluss
stream source")
+ val sources: Seq[FlussMicroBatchStream] = {
+ query.get.logicalPlan.collect {
+ case r: StreamingDataSourceV2Relation if
r.stream.isInstanceOf[FlussMicroBatchStream] =>
+ r.stream
+ }
+ }.distinct.map(_.asInstanceOf[FlussMicroBatchStream])
+ if (!sources.exists(s =>
s.tablePath.equals(createTablePath(tableName)))) {
+ throw new IllegalArgumentException(
+ s"Could not find fluss stream source for table $tableName")
+ }
+
+ val flussTable = loadFlussTable(createTablePath(tableName))
+ writeToLogTable(flussTable, schema, dataArr)
+
+ val flussSource = sources.filter(s =>
s.tablePath.equals(createTablePath(tableName))).head
+ val buckets = (0 until flussTable.getTableInfo.getNumBuckets).toSeq
+
+ val offsetsInitializer = OffsetsInitializer.latest()
+ val tableBucketOffsets = if (flussTable.getTableInfo.isPartitioned) {
+ val partitionInfos =
admin.listPartitionInfos(flussTable.getTableInfo.getTablePath).get()
+ val partitionOffsets = partitionInfos.asScala.map(
+ partitionInfo =>
+ FlussMicroBatchStream.getLatestOffsets(
+ flussTable.getTableInfo,
+ offsetsInitializer,
+ new BucketOffsetsRetrieverImpl(admin,
flussTable.getTableInfo.getTablePath),
+ buckets,
+ Some(partitionInfo)))
+ val mergedOffsets = partitionOffsets
+ .map(_.getOffsets)
+ .reduce((l, r) => (l.asScala ++ r.asScala).asJava)
+ new TableBucketOffsets(flussTable.getTableInfo.getTableId,
mergedOffsets)
+ } else {
+ FlussMicroBatchStream.getLatestOffsets(
+ flussTable.getTableInfo,
+ offsetsInitializer,
+ new BucketOffsetsRetrieverImpl(admin,
flussTable.getTableInfo.getTablePath),
+ buckets,
+ None)
+ }
+
+ val offset = FlussSourceOffset(tableBucketOffsets)
+ (flussSource, offset)
+ }
+ }
}