This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 344e38ff5 [spark] Add spark streaming write support (#2357)
344e38ff5 is described below

commit 344e38ff5166d8851b9fe1bde1868769928bad30
Author: Yang Zhang <[email protected]>
AuthorDate: Sun Jan 18 11:07:26 2026 +0800

    [spark] Add spark streaming write support (#2357)
---
 .../fluss/spark/catalog/AbstractSparkTable.scala   |   2 +-
 .../fluss/spark/write/FlussStreamingWrite.scala    |  88 ++++++++++++++
 .../org/apache/fluss/spark/write/FlussWrite.scala  |   5 +
 .../apache/fluss/spark/FlussSparkTestBase.scala    |   9 +-
 .../apache/fluss/spark/SparkStreamingTest.scala    | 135 +++++++++++++++++++++
 5 files changed, 231 insertions(+), 8 deletions(-)

diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
index 33f73f600..a1f3e09d4 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
@@ -43,7 +43,7 @@ abstract class AbstractSparkTable(val admin: Admin, val 
tableInfo: TableInfo) ex
   override def schema(): StructType = _schema
 
   override def capabilities(): util.Set[TableCapability] = {
-    Set(TableCapability.BATCH_WRITE).asJava
+    Set(TableCapability.BATCH_WRITE, TableCapability.STREAMING_WRITE).asJava
   }
 
   override def partitioning(): Array[Transform] = {
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussStreamingWrite.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussStreamingWrite.scala
new file mode 100644
index 000000000..25013d3fc
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussStreamingWrite.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.write
+
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.TablePath
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write.{DataWriter, PhysicalWriteInfo, 
WriterCommitMessage}
+import 
org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, 
StreamingWrite}
+import org.apache.spark.sql.types.StructType
+
+/** An interface that extends from Spark [[StreamingWrite]]. */
+trait FlussStreamingWrite extends StreamingWrite with Serializable {
+  override def useCommitCoordinator(): Boolean = false
+
+  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
+
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
+}
+
+/** Fluss Append Streaming Write. */
+class FlussAppendStreamingWrite(
+    val tablePath: TablePath,
+    val dataSchema: StructType,
+    val flussConfig: Configuration)
+  extends FlussStreamingWrite {
+
+  override def createStreamingWriterFactory(info: PhysicalWriteInfo): 
StreamingDataWriterFactory =
+    FlussAppendStreamingWriterFactory(tablePath, dataSchema, flussConfig)
+}
+
+private case class FlussAppendStreamingWriterFactory(
+    tablePath: TablePath,
+    dataSchema: StructType,
+    flussConfig: Configuration)
+  extends StreamingDataWriterFactory
+  with Logging {
+
+  override def createWriter(
+      partitionId: Int,
+      taskId: Long,
+      epochId: Long): DataWriter[InternalRow] = {
+    FlussAppendDataWriter(tablePath, dataSchema, flussConfig)
+  }
+}
+
+/** Fluss Upsert Streaming Write. */
+case class FlussUpsertStreamingWrite(
+    tablePath: TablePath,
+    dataSchema: StructType,
+    flussConfig: Configuration)
+  extends FlussStreamingWrite {
+
+  override def createStreamingWriterFactory(info: PhysicalWriteInfo): 
StreamingDataWriterFactory =
+    FlussUpsertStreamingWriterFactory(tablePath, dataSchema, flussConfig)
+}
+
+private case class FlussUpsertStreamingWriterFactory(
+    tablePath: TablePath,
+    dataSchema: StructType,
+    flussConfig: Configuration)
+  extends StreamingDataWriterFactory
+  with Logging {
+
+  override def createWriter(
+      partitionId: Int,
+      taskId: Long,
+      epochId: Long): DataWriter[InternalRow] = {
+    FlussUpsertDataWriter(tablePath, dataSchema, flussConfig)
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussWrite.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussWrite.scala
index 57f81c619..36cce0818 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussWrite.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussWrite.scala
@@ -21,6 +21,7 @@ import org.apache.fluss.config.Configuration
 import org.apache.fluss.metadata.TablePath
 
 import org.apache.spark.sql.connector.write.{BatchWrite, Write}
+import org.apache.spark.sql.connector.write.streaming.StreamingWrite
 import org.apache.spark.sql.types.StructType
 
 /** An interface that extends from Spark [[Write]]. */
@@ -35,6 +36,8 @@ case class FlussAppendWrite(
 
   override def toBatch: BatchWrite = new FlussAppendBatchWrite(tablePath, 
dataSchema, flussConfig)
 
+  override def toStreaming: StreamingWrite =
+    new FlussAppendStreamingWrite(tablePath, dataSchema, flussConfig)
 }
 
 /** Fluss Upsert Write. */
@@ -46,4 +49,6 @@ case class FlussUpsertWrite(
 
   override def toBatch: BatchWrite = FlussUpsertBatchWrite(tablePath, 
dataSchema, flussConfig)
 
+  override def toStreaming: StreamingWrite =
+    FlussUpsertStreamingWrite(tablePath, dataSchema, flussConfig)
 }
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
index 6123b0195..2de158b7d 100644
--- 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
@@ -90,14 +90,9 @@ class FlussSparkTestBase extends QueryTest with 
SharedSparkSession {
     }
     val scanRecords = logScanner.poll(Duration.ofSeconds(1))
     scanRecords
-      .buckets()
+      .iterator()
       .asScala
-      .flatMap(
-        tableBucket =>
-          scanRecords
-            .records(tableBucket)
-            .asScala
-            .map(r => (r.getChangeType.shortString, r.getRow)))
+      .map(record => (record.getChangeType.shortString(), record.getRow))
       .toArray
   }
 }
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala
new file mode 100644
index 000000000..f7358139b
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkStreamingTest.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming.StreamTest
+
+import java.io.File
+
+class SparkStreamingTest extends FlussSparkTestBase with StreamTest {
+  import testImplicits._
+
+  /**
+   * Will run streaming write twice with same checkpoint dir and verify each 
write results
+   * separately.
+   */
+  private def runTestWithStream(
+      tableIdentifier: String,
+      input1: Seq[(Long, String)],
+      input2: Seq[(Long, String)],
+      expect1: Seq[(String, Long, String)],
+      expect2: Seq[(String, Long, String)]): Unit = {
+    withTempDir {
+      checkpointDir =>
+        verifyStream(tableIdentifier, checkpointDir, Seq.empty, input1, 
expect1)
+
+        verifyStream(tableIdentifier, checkpointDir, Seq(input1), input2, 
expect2)
+    }
+  }
+
+  private def verifyStream(
+      tableIdentifier: String,
+      checkpointDir: File,
+      prevInputs: Seq[Seq[(Long, String)]],
+      newInputs: Seq[(Long, String)],
+      expectedOutputs: Seq[(String, Long, String)]): Unit = {
+    val inputData = MemoryStream[(Long, String)]
+    val inputDF = inputData.toDF().toDF("id", "data")
+
+    prevInputs.foreach(inputsPerBatch => inputData.addData(inputsPerBatch: _*))
+
+    val query = inputDF.writeStream
+      .option("checkpointLocation", checkpointDir.getAbsolutePath)
+      .toTable(tableIdentifier)
+
+    inputData.addData(newInputs: _*)
+
+    query.processAllAvailable()
+    query.stop()
+
+    // TODO verified from spark read
+    val table = loadFlussTable(createTablePath(tableIdentifier))
+    val rowsWithType = getRowsWithChangeType(table)
+    assert(rowsWithType.length == expectedOutputs.length)
+
+    val row = rowsWithType.head._2
+    assert(row.getFieldCount == 2)
+
+    val result = rowsWithType.zip(expectedOutputs).forall {
+      case (flussRowWithType, expect) =>
+        flussRowWithType._1.equals(expect._1) && flussRowWithType._2.getLong(
+          0) == expect._2 && flussRowWithType._2.getString(1).toString == 
expect._3
+    }
+    if (!result) {
+      fail(s"""
+              |checking $table data failed
+              |expect data:${expectedOutputs.mkString("\n", "\n", "\n")}
+              |fluss data:${rowsWithType.mkString("\n", "\n", "\n")}
+              |""".stripMargin)
+    }
+  }
+
+  test("write: write to log table") {
+    withTable("t") {
+      val tablePath = createTablePath("t")
+      spark.sql(s"CREATE TABLE t (id bigint, data string)")
+      val table = loadFlussTable(tablePath)
+      assert(!table.getTableInfo.hasPrimaryKey)
+      assert(!table.getTableInfo.hasBucketKey)
+
+      val rows = getRowsWithChangeType(table).map(_._2)
+      assert(rows.isEmpty)
+
+      val input1 = Seq((1L, "a"), (2L, "b"), (3L, "c"))
+      val input2 = Seq((4L, "d"), (5L, "e"), (6L, "f"))
+      val expect1 = input1.map(r => ("+A", r._1, r._2))
+      val expect2 = (input1 ++ input2).map(r => ("+A", r._1, r._2))
+      runTestWithStream("t", input1, input2, expect1, expect2)
+    }
+  }
+
+  test("write: write to primary key table") {
+    withTable("t") {
+      val tablePath = createTablePath("t")
+      spark.sql(s"""
+                   |CREATE TABLE t (id bigint, data string) 
TBLPROPERTIES("primary.key" = "id")
+                   |""".stripMargin)
+      val table = loadFlussTable(tablePath)
+      assert(table.getTableInfo.hasBucketKey)
+      assert(table.getTableInfo.hasPrimaryKey)
+      assert(table.getTableInfo.getPrimaryKeys.get(0).equalsIgnoreCase("id"))
+
+      val rows = getRowsWithChangeType(table).map(_._2)
+      assert(rows.isEmpty)
+
+      val input1 = Seq((1L, "a"), (2L, "b"), (3L, "c"))
+      val input2 = Seq((1L, "d"), (5L, "e"), (6L, "f"))
+      val expect1 = input1.map(r => ("+I", r._1, r._2))
+      val expect2 = Seq(
+        ("+I", 1L, "a"),
+        ("+I", 2L, "b"),
+        ("+I", 3L, "c"),
+        ("-U", 1L, "a"),
+        ("+U", 1L, "d"),
+        ("+I", 5L, "e"),
+        ("+I", 6L, "f"))
+      runTestWithStream("t", input1, input2, expect1, expect2)
+    }
+  }
+}

Reply via email to