This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 344e38ff5 [spark] Add spark streaming write support (#2357)
344e38ff5 is described below
commit 344e38ff5166d8851b9fe1bde1868769928bad30
Author: Yang Zhang <[email protected]>
AuthorDate: Sun Jan 18 11:07:26 2026 +0800
[spark] Add spark streaming write support (#2357)
---
.../fluss/spark/catalog/AbstractSparkTable.scala | 2 +-
.../fluss/spark/write/FlussStreamingWrite.scala | 88 ++++++++++++++
.../org/apache/fluss/spark/write/FlussWrite.scala | 5 +
.../apache/fluss/spark/FlussSparkTestBase.scala | 9 +-
.../apache/fluss/spark/SparkStreamingTest.scala | 135 +++++++++++++++++++++
5 files changed, 231 insertions(+), 8 deletions(-)
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
index 33f73f600..a1f3e09d4 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
@@ -43,7 +43,7 @@ abstract class AbstractSparkTable(val admin: Admin, val
tableInfo: TableInfo) ex
override def schema(): StructType = _schema
override def capabilities(): util.Set[TableCapability] = {
- Set(TableCapability.BATCH_WRITE).asJava
+ Set(TableCapability.BATCH_WRITE, TableCapability.STREAMING_WRITE).asJava
}
override def partitioning(): Array[Transform] = {
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussStreamingWrite.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussStreamingWrite.scala
new file mode 100644
index 000000000..25013d3fc
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussStreamingWrite.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.write
+
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.TablePath
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write.{DataWriter, PhysicalWriteInfo,
WriterCommitMessage}
+import
org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory,
StreamingWrite}
+import org.apache.spark.sql.types.StructType
+
+/** An interface that extends from Spark [[StreamingWrite]]. */
+trait FlussStreamingWrite extends StreamingWrite with Serializable {
+ override def useCommitCoordinator(): Boolean = false
+
+ override def commit(epochId: Long, messages: Array[WriterCommitMessage]):
Unit = {}
+
+ override def abort(epochId: Long, messages: Array[WriterCommitMessage]):
Unit = {}
+}
+
+/** Fluss Append Streaming Write. */
+class FlussAppendStreamingWrite(
+ val tablePath: TablePath,
+ val dataSchema: StructType,
+ val flussConfig: Configuration)
+ extends FlussStreamingWrite {
+
+ override def createStreamingWriterFactory(info: PhysicalWriteInfo):
StreamingDataWriterFactory =
+ FlussAppendStreamingWriterFactory(tablePath, dataSchema, flussConfig)
+}
+
+private case class FlussAppendStreamingWriterFactory(
+ tablePath: TablePath,
+ dataSchema: StructType,
+ flussConfig: Configuration)
+ extends StreamingDataWriterFactory
+ with Logging {
+
+ override def createWriter(
+ partitionId: Int,
+ taskId: Long,
+ epochId: Long): DataWriter[InternalRow] = {
+ FlussAppendDataWriter(tablePath, dataSchema, flussConfig)
+ }
+}
+
+/** Fluss Upsert Streaming Write. */
+case class FlussUpsertStreamingWrite(
+ tablePath: TablePath,
+ dataSchema: StructType,
+ flussConfig: Configuration)
+ extends FlussStreamingWrite {
+
+ override def createStreamingWriterFactory(info: PhysicalWriteInfo):
StreamingDataWriterFactory =
+ FlussUpsertStreamingWriterFactory(tablePath, dataSchema, flussConfig)
+}
+
+private case class FlussUpsertStreamingWriterFactory(
+ tablePath: TablePath,
+ dataSchema: StructType,
+ flussConfig: Configuration)
+ extends StreamingDataWriterFactory
+ with Logging {
+
+ override def createWriter(
+ partitionId: Int,
+ taskId: Long,
+ epochId: Long): DataWriter[InternalRow] = {
+ FlussUpsertDataWriter(tablePath, dataSchema, flussConfig)
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussWrite.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussWrite.scala
index 57f81c619..36cce0818 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussWrite.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussWrite.scala
@@ -21,6 +21,7 @@ import org.apache.fluss.config.Configuration
import org.apache.fluss.metadata.TablePath
import org.apache.spark.sql.connector.write.{BatchWrite, Write}
+import org.apache.spark.sql.connector.write.streaming.StreamingWrite
import org.apache.spark.sql.types.StructType
/** An interface that extends from Spark [[Write]]. */
@@ -35,6 +36,8 @@ case class FlussAppendWrite(
override def toBatch: BatchWrite = new FlussAppendBatchWrite(tablePath,
dataSchema, flussConfig)
+ override def toStreaming: StreamingWrite =
+ new FlussAppendStreamingWrite(tablePath, dataSchema, flussConfig)
}
/** Fluss Upsert Write. */
@@ -46,4 +49,6 @@ case class FlussUpsertWrite(
override def toBatch: BatchWrite = FlussUpsertBatchWrite(tablePath,
dataSchema, flussConfig)
+ override def toStreaming: StreamingWrite =
+ FlussUpsertStreamingWrite(tablePath, dataSchema, flussConfig)
}
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
index 6123b0195..2de158b7d 100644
---
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
@@ -90,14 +90,9 @@ class FlussSparkTestBase extends QueryTest with
SharedSparkSession {
}
val scanRecords = logScanner.poll(Duration.ofSeconds(1))
scanRecords
- .buckets()
+ .iterator()
.asScala
- .flatMap(
- tableBucket =>
- scanRecords
- .records(tableBucket)
- .asScala
- .map(r => (r.getChangeType.shortString, r.getRow)))
+ .map(record => (record.getChangeType.shortString(), record.getRow))
.toArray
}
}
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala
new file mode 100644
index 000000000..f7358139b
--- /dev/null
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.StreamTest
+
+import java.io.File
+
+class SparkStreamingTest extends FlussSparkTestBase with StreamTest {
+ import testImplicits._
+
+ /**
+ * Will run streaming write twice with same checkpoint dir and verify each
write results
+ * separately.
+ */
+ private def runTestWithStream(
+ tableIdentifier: String,
+ input1: Seq[(Long, String)],
+ input2: Seq[(Long, String)],
+ expect1: Seq[(String, Long, String)],
+ expect2: Seq[(String, Long, String)]): Unit = {
+ withTempDir {
+ checkpointDir =>
+ verifyStream(tableIdentifier, checkpointDir, Seq.empty, input1,
expect1)
+
+ verifyStream(tableIdentifier, checkpointDir, Seq(input1), input2,
expect2)
+ }
+ }
+
+ private def verifyStream(
+ tableIdentifier: String,
+ checkpointDir: File,
+ prevInputs: Seq[Seq[(Long, String)]],
+ newInputs: Seq[(Long, String)],
+ expectedOutputs: Seq[(String, Long, String)]): Unit = {
+ val inputData = MemoryStream[(Long, String)]
+ val inputDF = inputData.toDF().toDF("id", "data")
+
+ prevInputs.foreach(inputsPerBatch => inputData.addData(inputsPerBatch: _*))
+
+ val query = inputDF.writeStream
+ .option("checkpointLocation", checkpointDir.getAbsolutePath)
+ .toTable(tableIdentifier)
+
+ inputData.addData(newInputs: _*)
+
+ query.processAllAvailable()
+ query.stop()
+
+ // TODO verified from spark read
+ val table = loadFlussTable(createTablePath(tableIdentifier))
+ val rowsWithType = getRowsWithChangeType(table)
+ assert(rowsWithType.length == expectedOutputs.length)
+
+ val row = rowsWithType.head._2
+ assert(row.getFieldCount == 2)
+
+ val result = rowsWithType.zip(expectedOutputs).forall {
+ case (flussRowWithType, expect) =>
+ flussRowWithType._1.equals(expect._1) && flussRowWithType._2.getLong(
+ 0) == expect._2 && flussRowWithType._2.getString(1).toString ==
expect._3
+ }
+ if (!result) {
+ fail(s"""
+ |checking $table data failed
+ |expect data:${expectedOutputs.mkString("\n", "\n", "\n")}
+ |fluss data:${rowsWithType.mkString("\n", "\n", "\n")}
+ |""".stripMargin)
+ }
+ }
+
+ test("write: write to log table") {
+ withTable("t") {
+ val tablePath = createTablePath("t")
+ spark.sql(s"CREATE TABLE t (id bigint, data string)")
+ val table = loadFlussTable(tablePath)
+ assert(!table.getTableInfo.hasPrimaryKey)
+ assert(!table.getTableInfo.hasBucketKey)
+
+ val rows = getRowsWithChangeType(table).map(_._2)
+ assert(rows.isEmpty)
+
+ val input1 = Seq((1L, "a"), (2L, "b"), (3L, "c"))
+ val input2 = Seq((4L, "d"), (5L, "e"), (6L, "f"))
+ val expect1 = input1.map(r => ("+A", r._1, r._2))
+ val expect2 = (input1 ++ input2).map(r => ("+A", r._1, r._2))
+ runTestWithStream("t", input1, input2, expect1, expect2)
+ }
+ }
+
+ test("write: write to primary key table") {
+ withTable("t") {
+ val tablePath = createTablePath("t")
+ spark.sql(s"""
+ |CREATE TABLE t (id bigint, data string)
TBLPROPERTIES("primary.key" = "id")
+ |""".stripMargin)
+ val table = loadFlussTable(tablePath)
+ assert(table.getTableInfo.hasBucketKey)
+ assert(table.getTableInfo.hasPrimaryKey)
+ assert(table.getTableInfo.getPrimaryKeys.get(0).equalsIgnoreCase("id"))
+
+ val rows = getRowsWithChangeType(table).map(_._2)
+ assert(rows.isEmpty)
+
+ val input1 = Seq((1L, "a"), (2L, "b"), (3L, "c"))
+ val input2 = Seq((1L, "d"), (5L, "e"), (6L, "f"))
+ val expect1 = input1.map(r => ("+I", r._1, r._2))
+ val expect2 = Seq(
+ ("+I", 1L, "a"),
+ ("+I", 2L, "b"),
+ ("+I", 3L, "c"),
+ ("-U", 1L, "a"),
+ ("+U", 1L, "d"),
+ ("+I", 5L, "e"),
+ ("+I", 6L, "f"))
+ runTestWithStream("t", input1, input2, expect1, expect2)
+ }
+ }
+}