This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e49f2d4e [spark] support spark batch write (#2277)
4e49f2d4e is described below

commit 4e49f2d4ed0696d15f9ef0b74139918fd0ee5ac6
Author: Yann Byron <[email protected]>
AuthorDate: Mon Jan 5 01:16:52 2026 +0800

    [spark] support spark batch write (#2277)
---
 .../java/org/apache/fluss/row/TimestampLtz.java    |   6 +-
 .../java/org/apache/fluss/row/TimestampNtz.java    |  16 ++
 .../java/org/apache/fluss/utils/DateTimeUtils.java |   6 +
 .../org/apache/fluss/spark/SparkCatalog.scala      |   3 +-
 .../scala/org/apache/fluss/spark/SparkTable.scala  |  23 ++-
 .../fluss/spark/catalog/AbstractSparkTable.scala   |   7 +-
 .../fluss/spark/catalog/WithFlussAdmin.scala       |   9 +-
 .../apache/fluss/spark/row/SparkAsFlussArray.scala | 132 +++++++++++++++
 .../apache/fluss/spark/row/SparkAsFlussRow.scala   | 128 ++++++++++++++
 .../apache/fluss/spark/write/FlussBatchWrite.scala |  58 +++++++
 .../apache/fluss/spark/write/FlussDataWriter.scala | 138 ++++++++++++++++
 .../FlussWrite.scala}                              |  36 ++--
 .../fluss/spark/write/FlussWriteBuilder.scala      |  51 ++++++
 fluss-spark/fluss-spark-ut/pom.xml                 |  16 ++
 .../apache/fluss/spark/FlussSparkTestBase.scala    |  46 ++++++
 .../org/apache/fluss/spark/SparkWriteTest.scala    | 183 +++++++++++++++++++++
 .../fluss/spark/row/SparkAsFlussArrayTest.scala    | 132 +++++++++++++++
 .../fluss/spark/row/SparkAsFlussRowTest.scala      |  82 +++++++++
 .../org/apache/fluss/spark/util/TestUtils.scala    |  54 ++++++
 19 files changed, 1099 insertions(+), 27 deletions(-)

diff --git a/fluss-common/src/main/java/org/apache/fluss/row/TimestampLtz.java 
b/fluss-common/src/main/java/org/apache/fluss/row/TimestampLtz.java
index c5ce237e3..be63996c0 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/TimestampLtz.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/TimestampLtz.java
@@ -23,6 +23,8 @@ import org.apache.fluss.types.LocalZonedTimestampType;
 import java.io.Serializable;
 import java.time.Instant;
 
+import static org.apache.fluss.utils.DateTimeUtils.MICROS_PER_MILLIS;
+import static org.apache.fluss.utils.DateTimeUtils.NANOS_PER_MICROS;
 import static org.apache.fluss.utils.Preconditions.checkArgument;
 
 /**
@@ -39,10 +41,6 @@ public class TimestampLtz implements 
Comparable<TimestampLtz>, Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    public static final long MICROS_PER_MILLIS = 1000L;
-
-    public static final long NANOS_PER_MICROS = 1000L;
-
     // this field holds the integral second and the milli-of-second.
     private final long millisecond;
 
diff --git a/fluss-common/src/main/java/org/apache/fluss/row/TimestampNtz.java 
b/fluss-common/src/main/java/org/apache/fluss/row/TimestampNtz.java
index cc2e4586d..56febfd89 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/TimestampNtz.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/TimestampNtz.java
@@ -25,6 +25,8 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 
+import static org.apache.fluss.utils.DateTimeUtils.MICROS_PER_MILLIS;
+import static org.apache.fluss.utils.DateTimeUtils.NANOS_PER_MICROS;
 import static org.apache.fluss.utils.Preconditions.checkArgument;
 
 /**
@@ -70,6 +72,20 @@ public class TimestampNtz implements 
Comparable<TimestampNtz>, Serializable {
         return nanoOfMillisecond;
     }
 
+    /**
+     * Creates an instance of {@link TimestampNtz} from microseconds.
+     *
+     * <p>The nanos-of-millisecond field will be set to zero.
+     *
+     * @param microseconds the number of microseconds since {@code 1970-01-01 
00:00:00}; a negative
+     *     number is the number of microseconds before {@code 1970-01-01 
00:00:00}
+     */
+    public static TimestampNtz fromMicros(long microseconds) {
+        long mills = Math.floorDiv(microseconds, MICROS_PER_MILLIS);
+        long nanos = (microseconds - mills * MICROS_PER_MILLIS) * 
NANOS_PER_MICROS;
+        return TimestampNtz.fromMillis(mills, (int) nanos);
+    }
+
     /**
      * Creates an instance of {@link TimestampNtz} from milliseconds.
      *
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/utils/DateTimeUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/utils/DateTimeUtils.java
index 9106993ea..3019df4e3 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/DateTimeUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/DateTimeUtils.java
@@ -47,6 +47,12 @@ public class DateTimeUtils {
     /** The julian date of the epoch, 1970-01-01. */
     public static final int EPOCH_JULIAN = 2440588;
 
+    /** The number of microseconds in a millisecond. */
+    public static final long MICROS_PER_MILLIS = 1000L;
+
+    /** The number of nanoseconds in a microsecond. */
+    public static final long NANOS_PER_MICROS = 1000L;
+
     /** The number of milliseconds in a second. */
     private static final long MILLIS_PER_SECOND = 1000L;
 
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
index b620d2700..c1521ebfe 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkCatalog.scala
@@ -49,7 +49,8 @@ class SparkCatalog extends TableCatalog with 
SupportsFlussNamespaces with WithFl
 
   override def loadTable(ident: Identifier): Table = {
     try {
-      SparkTable(admin.getTableInfo(toTablePath(ident)).get())
+      val tablePath = toTablePath(ident)
+      SparkTable(tablePath, admin.getTableInfo(tablePath).get(), flussConfig)
     } catch {
       case e: ExecutionException if 
e.getCause.isInstanceOf[TableNotExistException] =>
         throw new NoSuchTableException(ident)
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
index 1416ab415..6d31f6fbc 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
@@ -17,9 +17,24 @@
 
 package org.apache.fluss.spark
 
-import org.apache.fluss.metadata.TableInfo
+import org.apache.fluss.config.{Configuration => FlussConfiguration}
+import org.apache.fluss.metadata.{TableInfo, TablePath}
 import org.apache.fluss.spark.catalog.{AbstractSparkTable, 
SupportsFlussPartitionManagement}
+import org.apache.fluss.spark.write.{FlussAppendWriteBuilder, 
FlussUpsertWriteBuilder}
 
-case class SparkTable(table: TableInfo)
-  extends AbstractSparkTable(table)
-  with SupportsFlussPartitionManagement {}
+import org.apache.spark.sql.connector.catalog.SupportsWrite
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+
+case class SparkTable(tablePath: TablePath, tableInfo: TableInfo, flussConfig: 
FlussConfiguration)
+  extends AbstractSparkTable(tableInfo)
+  with SupportsFlussPartitionManagement
+  with SupportsWrite {
+
+  override def newWriteBuilder(logicalWriteInfo: LogicalWriteInfo): 
WriteBuilder = {
+    if (tableInfo.getPrimaryKeys.isEmpty) {
+      new FlussAppendWriteBuilder(tablePath, logicalWriteInfo.schema(), 
flussConfig)
+    } else {
+      new FlussUpsertWriteBuilder(tablePath, logicalWriteInfo.schema(), 
flussConfig)
+    }
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
index 8694c7f59..b1558dbe9 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
@@ -17,7 +17,8 @@
 
 package org.apache.fluss.spark.catalog
 
-import org.apache.fluss.metadata.TableInfo
+import org.apache.fluss.config.{Configuration => FlussConfiguration}
+import org.apache.fluss.metadata.{TableInfo, TablePath}
 import org.apache.fluss.spark.SparkConversions
 
 import org.apache.spark.sql.connector.catalog.{Table, TableCapability}
@@ -39,5 +40,7 @@ abstract class AbstractSparkTable(tableInfo: TableInfo) 
extends Table {
 
   override def schema(): StructType = _schema
 
-  override def capabilities(): util.Set[TableCapability] = 
Set.empty[TableCapability].asJava
+  override def capabilities(): util.Set[TableCapability] = {
+    Set(TableCapability.BATCH_WRITE).asJava
+  }
 }
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala
index fa62b91a4..2241b6b96 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/WithFlussAdmin.scala
@@ -33,6 +33,7 @@ trait WithFlussAdmin extends AutoCloseable {
 
   private var _connection: Connection = _
   private var _admin: Admin = _
+  private var _flussConfig: FlussConfiguration = _
 
   // TODO: init lake spark catalog
   protected var lakeCatalog: CatalogPlugin = _
@@ -43,10 +44,16 @@ trait WithFlussAdmin extends AutoCloseable {
       entry: util.Map.Entry[String, String] => flussConfigs.put(entry.getKey, 
entry.getValue)
     }
 
-    _connection = 
ConnectionFactory.createConnection(FlussConfiguration.fromMap(flussConfigs))
+    _flussConfig = FlussConfiguration.fromMap(flussConfigs)
+    _connection = ConnectionFactory.createConnection(_flussConfig)
     _admin = _connection.getAdmin
   }
 
+  protected def flussConfig: FlussConfiguration = {
+    Preconditions.checkNotNull(_flussConfig, "Fluss Configuration is not 
initialized.")
+    _flussConfig
+  }
+
   protected def admin: Admin = {
     Preconditions.checkNotNull(_admin, "Fluss Admin is not initialized.")
     _admin
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussArray.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussArray.scala
new file mode 100644
index 000000000..00e82ea67
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussArray.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.row
+
+import org.apache.fluss.row.{BinaryString, Decimal, InternalArray => 
FlussInternalArray, InternalRow => FlussInternalRow, TimestampLtz, TimestampNtz}
+
+import org.apache.spark.sql.catalyst.util.{ArrayData => SparkArrayData}
+import org.apache.spark.sql.types.{ArrayType => SparkArrayType, DataType => 
SparkDataType, StructType}
+
+/** Wraps a Spark [[SparkArrayData]] as a Fluss [[FlussInternalArray]]. */
+class SparkAsFlussArray(arrayData: SparkArrayData, elementType: SparkDataType)
+  extends FlussInternalArray
+  with Serializable {
+
+  /** Returns the number of elements in this array. */
+  override def size(): Int = arrayData.numElements()
+
+  override def toBooleanArray: Array[Boolean] = arrayData.toBooleanArray()
+
+  override def toByteArray: Array[Byte] = arrayData.toByteArray()
+
+  override def toShortArray: Array[Short] = arrayData.toShortArray()
+
+  override def toIntArray: Array[Int] = arrayData.toIntArray()
+
+  override def toLongArray: Array[Long] = arrayData.toLongArray()
+
+  override def toFloatArray: Array[Float] = arrayData.toFloatArray()
+
+  override def toDoubleArray: Array[Double] = arrayData.toDoubleArray()
+
+  /** Returns true if the element is null at the given position. */
+  override def isNullAt(pos: Int): Boolean = arrayData.isNullAt(pos)
+
+  /** Returns the boolean value at the given position. */
+  override def getBoolean(pos: Int): Boolean = arrayData.getBoolean(pos)
+
+  /** Returns the byte value at the given position. */
+  override def getByte(pos: Int): Byte = arrayData.getByte(pos)
+
+  /** Returns the short value at the given position. */
+  override def getShort(pos: Int): Short = arrayData.getShort(pos)
+
+  /** Returns the integer value at the given position. */
+  override def getInt(pos: Int): Int = arrayData.getInt(pos)
+
+  /** Returns the long value at the given position. */
+  override def getLong(pos: Int): Long = arrayData.getLong(pos)
+
+  /** Returns the float value at the given position. */
+  override def getFloat(pos: Int): Float = arrayData.getFloat(pos)
+
+  /** Returns the double value at the given position. */
+  override def getDouble(pos: Int): Double = arrayData.getDouble(pos)
+
+  /** Returns the string value at the given position with fixed length. */
+  override def getChar(pos: Int, length: Int): BinaryString =
+    BinaryString.fromBytes(arrayData.getUTF8String(pos).getBytes)
+
+  /** Returns the string value at the given position. */
+  override def getString(pos: Int): BinaryString =
+    BinaryString.fromBytes(arrayData.getUTF8String(pos).getBytes)
+
+  /**
+   * Returns the decimal value at the given position.
+   *
+   * <p>The precision and scale are required to determine whether the decimal 
value was stored in a
+   * compact representation (see [[Decimal]]).
+   */
+  override def getDecimal(pos: Int, precision: Int, scale: Int): Decimal = {
+    val sparkDecimal = arrayData.getDecimal(pos, precision, scale)
+    if (sparkDecimal.precision <= 
org.apache.spark.sql.types.Decimal.MAX_LONG_DIGITS)
+      Decimal.fromUnscaledLong(
+        sparkDecimal.toUnscaledLong,
+        sparkDecimal.precision,
+        sparkDecimal.scale)
+    else
+      Decimal.fromBigDecimal(
+        sparkDecimal.toJavaBigDecimal,
+        sparkDecimal.precision,
+        sparkDecimal.scale)
+  }
+
+  /**
+   * Returns the timestamp value at the given position.
+   *
+   * <p>The precision is required to determine whether the timestamp value was 
stored in a compact
+   * representation (see [[TimestampNtz]]).
+   */
+  override def getTimestampNtz(pos: Int, precision: Int): TimestampNtz =
+    TimestampNtz.fromMicros(arrayData.getLong(pos))
+
+  /**
+   * Returns the timestamp value at the given position.
+   *
+   * <p>The precision is required to determine whether the timestamp value was 
stored in a compact
+   * representation (see [[TimestampLtz]]).
+   */
+  override def getTimestampLtz(pos: Int, precision: Int): TimestampLtz =
+    TimestampLtz.fromEpochMicros(arrayData.getLong(pos))
+
+  /** Returns the binary value at the given position with fixed length. */
+  override def getBinary(pos: Int, length: Int): Array[Byte] = 
arrayData.getBinary(pos)
+
+  /** Returns the binary value at the given position. */
+  override def getBytes(pos: Int): Array[Byte] = arrayData.getBinary(pos)
+
+  /** Returns the array value at the given position. */
+  override def getArray(pos: Int) = new SparkAsFlussArray(
+    arrayData.getArray(pos),
+    elementType.asInstanceOf[SparkArrayType].elementType)
+
+  /** Returns the row value at the given position. */
+  override def getRow(pos: Int, numFields: Int): FlussInternalRow =
+    new SparkAsFlussRow(elementType.asInstanceOf[StructType])
+      .replace(arrayData.getStruct(pos, numFields))
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussRow.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussRow.scala
new file mode 100644
index 000000000..ad8440dc7
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussRow.scala
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.row
+
+import org.apache.fluss.row.{BinaryString, Decimal, InternalRow => 
FlussInternalRow, TimestampLtz, TimestampNtz}
+
+import org.apache.spark.sql.catalyst.{InternalRow => SparkInternalRow}
+import org.apache.spark.sql.types.StructType
+
+/** Wraps a Spark [[SparkInternalRow]] as a Fluss [[FlussInternalRow]]. */
+class SparkAsFlussRow(schema: StructType) extends FlussInternalRow with 
Serializable {
+
+  val fieldCount: Int = schema.length
+
+  var row: SparkInternalRow = _
+
+  def replace(row: SparkInternalRow): SparkAsFlussRow = {
+    this.row = row
+    this
+  }
+
+  /**
+   * Returns the number of fields in this row.
+   *
+   * <p>The number does not include [[org.apache.fluss.record.ChangeType]]. It 
is kept separately.
+   */
+  override def getFieldCount: Int = fieldCount
+
+  /** Returns true if the element is null at the given position. */
+  override def isNullAt(pos: Int): Boolean = row.isNullAt(pos)
+
+  /** Returns the boolean value at the given position. */
+  override def getBoolean(pos: Int): Boolean = row.getBoolean(pos)
+
+  /** Returns the byte value at the given position. */
+  override def getByte(pos: Int): Byte = row.getByte(pos)
+
+  /** Returns the short value at the given position. */
+  override def getShort(pos: Int): Short = row.getShort(pos)
+
+  /** Returns the integer value at the given position. */
+  override def getInt(pos: Int): Int = row.getInt(pos)
+
+  /** Returns the long value at the given position. */
+  override def getLong(pos: Int): Long = row.getLong(pos)
+
+  /** Returns the float value at the given position. */
+  override def getFloat(pos: Int): Float = row.getFloat(pos)
+
+  /** Returns the double value at the given position. */
+  override def getDouble(pos: Int): Double = row.getDouble(pos)
+
+  /** Returns the string value at the given position with fixed length. */
+  override def getChar(pos: Int, length: Int): BinaryString =
+    BinaryString.fromString(row.getUTF8String(pos).toString)
+
+  /** Returns the string value at the given position. */
+  override def getString(pos: Int): BinaryString = 
BinaryString.fromString(row.getString(pos))
+
+  /**
+   * Returns the decimal value at the given position.
+   *
+   * <p>The precision and scale are required to determine whether the decimal 
value was stored in a
+   * compact representation (see [[Decimal]]).
+   */
+  override def getDecimal(pos: Int, precision: Int, scale: Int): Decimal = {
+    val sparkDecimal = row.getDecimal(pos, precision, scale)
+    if (sparkDecimal.precision <= 
org.apache.spark.sql.types.Decimal.MAX_LONG_DIGITS)
+      Decimal.fromUnscaledLong(
+        sparkDecimal.toUnscaledLong,
+        sparkDecimal.precision,
+        sparkDecimal.scale)
+    else
+      Decimal.fromBigDecimal(
+        sparkDecimal.toJavaBigDecimal,
+        sparkDecimal.precision,
+        sparkDecimal.scale)
+  }
+
+  /**
+   * Returns the timestamp value at the given position.
+   *
+   * <p>The precision is required to determine whether the timestamp value was 
stored in a compact
+   * representation (see [[TimestampNtz]]).
+   */
+  override def getTimestampNtz(pos: Int, precision: Int): TimestampNtz =
+    TimestampNtz.fromMicros(row.getLong(pos))
+
+  /**
+   * Returns the timestamp value at the given position.
+   *
+   * <p>The precision is required to determine whether the timestamp value was 
stored in a compact
+   * representation (see [[TimestampLtz]]).
+   */
+  override def getTimestampLtz(pos: Int, precision: Int): TimestampLtz =
+    TimestampLtz.fromEpochMicros(row.getLong(pos))
+
+  /** Returns the binary value at the given position with fixed length. */
+  override def getBinary(pos: Int, length: Int): Array[Byte] = 
row.getBinary(pos)
+
+  /** Returns the binary value at the given position. */
+  override def getBytes(pos: Int): Array[Byte] = row.getBinary(pos)
+
+  /** Returns the array value at the given position. */
+  override def getArray(pos: Int) =
+    new SparkAsFlussArray(row.getArray(pos), schema.fields(pos).dataType)
+
+  /** Returns the row value at the given position. */
+  override def getRow(pos: Int, numFields: Int): FlussInternalRow =
+    new SparkAsFlussRow(schema.fields(pos).dataType.asInstanceOf[StructType])
+      .replace(row.getStruct(pos, numFields))
+
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussBatchWrite.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussBatchWrite.scala
new file mode 100644
index 000000000..ad31ce0a9
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussBatchWrite.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.write
+
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.TablePath
+
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, 
PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+
+/** An interface that extends from Spark [[BatchWrite]]. */
+trait FlussBatchWrite extends BatchWrite with Serializable {
+
+  override def commit(writerCommitMessages: Array[WriterCommitMessage]): Unit 
= {}
+
+  override def abort(writerCommitMessages: Array[WriterCommitMessage]): Unit = 
{}
+
+}
+
+/** Fluss Append Batch Write. */
+class FlussAppendBatchWrite(
+    val tablePath: TablePath,
+    val dataSchema: StructType,
+    val flussConfig: Configuration)
+  extends FlussBatchWrite {
+
+  override def createBatchWriterFactory(physicalWriteInfo: PhysicalWriteInfo): 
DataWriterFactory = {
+    (_: Int, _: Long) => FlussAppendDataWriter(tablePath, dataSchema, 
flussConfig)
+  }
+}
+
+/** Fluss Upsert Batch Write. */
+case class FlussUpsertBatchWrite(
+    tablePath: TablePath,
+    dataSchema: StructType,
+    flussConfig: Configuration)
+  extends FlussBatchWrite {
+
+  override def createBatchWriterFactory(physicalWriteInfo: PhysicalWriteInfo): 
DataWriterFactory = {
+    (_: Int, _: Long) => FlussUpsertDataWriter(tablePath, dataSchema, 
flussConfig)
+  }
+
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussDataWriter.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussDataWriter.scala
new file mode 100644
index 000000000..94fae6133
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussDataWriter.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.write
+
+import org.apache.fluss.client.{Connection, ConnectionFactory}
+import org.apache.fluss.client.table.Table
+import org.apache.fluss.client.table.writer.{AppendResult, AppendWriter, 
TableWriter, UpsertResult, UpsertWriter}
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.TablePath
+import org.apache.fluss.spark.row.SparkAsFlussRow
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+
+import java.io.IOException
+import java.util.concurrent.CompletableFuture
+
+/**
+ * A fluss implementation of Spark [[WriterCommitMessage]]. Fluss, as a 
service, accepts data and
+ * commit inside of it, so client does nothing.
+ */
+case class FlussWriterCommitMessage() extends WriterCommitMessage
+
+/** An abstract class to Spark [[DataWriter]]. */
+abstract class FlussDataWriter[T](
+    tablePath: TablePath,
+    dataSchema: StructType,
+    flussConfig: Configuration)
+  extends DataWriter[InternalRow]
+  with Logging {
+
+  private lazy val conn: Connection = 
ConnectionFactory.createConnection(flussConfig)
+
+  lazy val table: Table = conn.getTable(tablePath)
+
+  val writer: TableWriter
+
+  protected val flussRow = new SparkAsFlussRow(dataSchema)
+
+  @volatile
+  private var asyncWriterException: Option[Throwable] = _
+
+  def writeRow(record: SparkAsFlussRow): CompletableFuture[T]
+
+  override def write(record: InternalRow): Unit = {
+    checkAsyncException()
+
+    writeRow(flussRow.replace(record)).whenComplete {
+      (_, exception) =>
+        if (exception != null && asyncWriterException.isEmpty) {
+          asyncWriterException = Some(exception)
+        }
+    }
+  }
+
+  override def commit(): WriterCommitMessage = {
+    writer.flush()
+    checkAsyncException()
+
+    FlussWriterCommitMessage()
+  }
+
+  override def abort(): Unit = this.close()
+
+  override def close(): Unit = {
+    if (table != null) {
+      table.close()
+    }
+    if (conn != null) {
+      conn.close()
+    }
+
+    // Rethrow exception for the case in which close is called before write() 
and commit().
+    checkAsyncException()
+
+    logInfo("Finished closing Fluss data write.")
+  }
+
+  @throws[IOException]
+  private def checkAsyncException(): Unit = {
+    val throwable = asyncWriterException
+    throwable match {
+      case Some(exception) =>
+        asyncWriterException = None
+        logError("Exception occurs while write row to fluss.", exception)
+        throw new IOException(
+          "One or more Fluss Writer send requests have encountered exception",
+          exception)
+      case _ =>
+    }
+  }
+
+}
+
+/** Spark-Fluss Append Data Writer. */
+case class FlussAppendDataWriter(
+    tablePath: TablePath,
+    dataSchema: StructType,
+    flussConfig: Configuration)
+  extends FlussDataWriter[AppendResult](tablePath, dataSchema, flussConfig) {
+
+  override val writer: AppendWriter = table.newAppend().createWriter()
+
+  override def writeRow(record: SparkAsFlussRow): 
CompletableFuture[AppendResult] = {
+    writer.append(record)
+  }
+}
+
+/** Spark-Fluss Upsert Data Writer. */
+case class FlussUpsertDataWriter(
+    tablePath: TablePath,
+    dataSchema: StructType,
+    flussConfig: Configuration)
+  extends FlussDataWriter[UpsertResult](tablePath, dataSchema, flussConfig) {
+
+  override val writer: UpsertWriter = table.newUpsert().createWriter()
+
+  override def writeRow(record: SparkAsFlussRow): 
CompletableFuture[UpsertResult] = {
+    writer.upsert(record)
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussWrite.scala
similarity index 51%
copy from 
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
copy to 
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussWrite.scala
index 8694c7f59..57f81c619 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussWrite.scala
@@ -15,29 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.spark.catalog
+package org.apache.fluss.spark.write
 
-import org.apache.fluss.metadata.TableInfo
-import org.apache.fluss.spark.SparkConversions
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.TablePath
 
-import org.apache.spark.sql.connector.catalog.{Table, TableCapability}
+import org.apache.spark.sql.connector.write.{BatchWrite, Write}
 import org.apache.spark.sql.types.StructType
 
-import java.util
+/** An interface that extends from Spark [[Write]]. */
+trait FlussWrite extends Write
 
-import scala.collection.JavaConverters._
+/** Fluss Append Write. */
+case class FlussAppendWrite(
+    tablePath: TablePath,
+    dataSchema: StructType,
+    flussConfig: Configuration)
+  extends FlussWrite {
 
-abstract class AbstractSparkTable(tableInfo: TableInfo) extends Table {
+  override def toBatch: BatchWrite = new FlussAppendBatchWrite(tablePath, 
dataSchema, flussConfig)
 
-  protected lazy val _schema: StructType =
-    SparkConversions.toSparkDataType(tableInfo.getSchema.getRowType)
-
-  protected lazy val _partitionSchema = new StructType(
-    _schema.fields.filter(tableInfo.getPartitionKeys.contains))
+}
 
-  override def name(): String = tableInfo.toString
+/** Fluss Upsert Write. */
+case class FlussUpsertWrite(
+    tablePath: TablePath,
+    dataSchema: StructType,
+    flussConfig: Configuration)
+  extends FlussWrite {
 
-  override def schema(): StructType = _schema
+  override def toBatch: BatchWrite = FlussUpsertBatchWrite(tablePath, 
dataSchema, flussConfig)
 
-  override def capabilities(): util.Set[TableCapability] = 
Set.empty[TableCapability].asJava
 }
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussWriteBuilder.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussWriteBuilder.scala
new file mode 100644
index 000000000..9dbe6f995
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussWriteBuilder.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.write
+
+import org.apache.fluss.config.{Configuration => FlussConfiguration}
+import org.apache.fluss.metadata.TablePath
+
+import org.apache.spark.sql.connector.write.WriteBuilder
+import org.apache.spark.sql.types.StructType
+
+/** An interface that extends from Spark [[WriteBuilder]]. */
+trait FlussWriteBuilder extends WriteBuilder
+
+/** Fluss Append Write Builder. */
+class FlussAppendWriteBuilder(
+    tablePath: TablePath,
+    dataSchema: StructType,
+    flussConfig: FlussConfiguration)
+  extends FlussWriteBuilder {
+
+  override def build: FlussWrite = {
+    FlussAppendWrite(tablePath, dataSchema, flussConfig)
+  }
+}
+
+/** Fluss Upsert Write Builder. */
+class FlussUpsertWriteBuilder(
+    tablePath: TablePath,
+    dataSchema: StructType,
+    flussConfig: FlussConfiguration)
+  extends FlussWriteBuilder {
+
+  override def build: FlussWrite = {
+    FlussUpsertWrite(tablePath, dataSchema, flussConfig)
+  }
+}
diff --git a/fluss-spark/fluss-spark-ut/pom.xml 
b/fluss-spark/fluss-spark-ut/pom.xml
index 7592c6ffd..ffa424d26 100644
--- a/fluss-spark/fluss-spark-ut/pom.xml
+++ b/fluss-spark/fluss-spark-ut/pom.xml
@@ -60,6 +60,22 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.fluss</groupId>
+            <artifactId>fluss-common</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- to avoid this issue: https://github.com/apache/arrow/issues/40896 
-->
+        <dependency>
+            <groupId>org.eclipse.collections</groupId>
+            <artifactId>eclipse-collections</artifactId>
+            <version>11.1.0</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.fluss</groupId>
             <artifactId>fluss-test-utils</artifactId>
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
index 08b6d4600..2a5135ab8 100644
--- 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
@@ -19,7 +19,11 @@ package org.apache.fluss.spark
 
 import org.apache.fluss.client.{Connection, ConnectionFactory}
 import org.apache.fluss.client.admin.Admin
+import org.apache.fluss.client.table.Table
+import org.apache.fluss.client.table.scanner.log.LogScanner
 import org.apache.fluss.config.{ConfigOptions, Configuration}
+import org.apache.fluss.metadata.{TableDescriptor, TablePath}
+import org.apache.fluss.row.InternalRow
 import org.apache.fluss.server.testutils.FlussClusterExtension
 
 import org.apache.spark.SparkConf
@@ -31,6 +35,8 @@ import org.scalatest.Tag
 
 import java.time.Duration
 
+import scala.collection.JavaConverters._
+
 class FlussSparkTestBase extends QueryTest with SharedSparkSession {
 
   import FlussSparkTestBase._
@@ -61,6 +67,46 @@ class FlussSparkTestBase extends QueryTest with 
SharedSparkSession {
     super.test(testName, testTags: _*)(testFun)(pos)
   }
 
+  def createTablePath(tableName: String): TablePath = {
+    TablePath.of(DEFAULT_DATABASE, tableName)
+  }
+
+  def createFlussTable(tablePath: TablePath, tableDescriptor: 
TableDescriptor): Unit = {
+    admin.createTable(tablePath, tableDescriptor, true).get()
+  }
+
+  def loadFlussTable(tablePath: TablePath): Table = {
+    conn.getTable(tablePath)
+  }
+
+  /**
+   * Get row with change type from table and logScanner if provided.
+   *
+   * @return
+   *   Tuple composed of [[org.apache.fluss.record.ChangeType]] and 
[[InternalRow]]
+   */
+  def getRowsWithChangeType(
+      table: Table,
+      logScannerOption: Option[LogScanner] = None): Array[(String, 
InternalRow)] = {
+    val logScanner = logScannerOption match {
+      case Some(ls) => ls
+      case _ =>
+        val ls = table.newScan().createLogScanner()
+        (0 until table.getTableInfo.getNumBuckets).foreach(i => 
ls.subscribeFromBeginning(i))
+        ls
+    }
+    val scanRecords = logScanner.poll(Duration.ofSeconds(1))
+    scanRecords
+      .buckets()
+      .asScala
+      .flatMap(
+        tableBucket =>
+          scanRecords
+            .records(tableBucket)
+            .asScala
+            .map(r => (r.getChangeType.shortString, r.getRow)))
+      .toArray
+  }
 }
 
 @RegisterExtension
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkWriteTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkWriteTest.scala
new file mode 100644
index 000000000..179c05e80
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkWriteTest.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.metadata.{Schema, TableDescriptor}
+import org.apache.fluss.row.{BinaryString, GenericRow, InternalRow}
+import org.apache.fluss.spark.util.TestUtils.{createGenericRow, FLUSS_ROWTYPE}
+import org.apache.fluss.types.DataTypes
+
+import org.assertj.core.api.Assertions.assertThat
+
+import java.sql.Timestamp
+import java.time.Duration
+
+import scala.collection.JavaConverters._
+
+class SparkWriteTest extends FlussSparkTestBase {
+
+  import SparkWriteTest._
+
+  test("Fluss Write: all data types") {
+    val tablePath = createTablePath("test_all_data_types")
+    val tableDescriptor: TableDescriptor = TableDescriptor.builder
+      .schema(Schema.newBuilder().fromRowType(FLUSS_ROWTYPE).build)
+      .build()
+    createFlussTable(tablePath, tableDescriptor)
+
+    spark.sql(s"""
+                 |INSERT INTO $DEFAULT_DATABASE.test_all_data_types
+                 |VALUES (
+                 |  true, 1, 10, 100, 1000L, 12.3F, 45.6D,
+                 |  1234567.89, 12345678900987654321.12,
+                 |  "test",
+                 |  TO_TIMESTAMP('2025-12-31 10:00:00', 'yyyy-MM-dd kk:mm:ss'),
+                 |  array(11.11F, 22.22F), struct(123L, "apache fluss")
+                 |)
+                 |""".stripMargin)
+
+    val table = loadFlussTable(tablePath)
+    val rows = getRowsWithChangeType(table).map(_._2)
+    assertThat(rows.length).isEqualTo(1)
+
+    val row = rows.head
+    assertThat(row.getFieldCount).isEqualTo(13)
+    assertThat(row.getBoolean(0)).isEqualTo(true)
+    assertThat(row.getByte(1)).isEqualTo(1.toByte)
+    assertThat(row.getShort(2)).isEqualTo(10.toShort)
+    assertThat(row.getInt(3)).isEqualTo(100)
+    assertThat(row.getLong(4)).isEqualTo(1000L)
+    assertThat(row.getFloat(5)).isEqualTo(12.3f)
+    assertThat(row.getDouble(6)).isEqualTo(45.6)
+    assertThat(row.getDecimal(7, 10, 
2).toBigDecimal).isEqualTo(BigDecimal("1234567.89").bigDecimal)
+    assertThat(row.getDecimal(8, 38, 2).toBigDecimal)
+      .isEqualTo(BigDecimal("12345678900987654321.12").bigDecimal)
+    assertThat(row.getString(9).toString).isEqualTo("test")
+    assertThat(row.getTimestampLtz(10, 6).toInstant)
+      .isEqualTo(Timestamp.valueOf("2025-12-31 10:00:00.0").toInstant)
+    assertThat(row.getArray(11).toFloatArray).containsExactly(Array(11.11f, 
22.22f): _*)
+    val nestedRow = row.getRow(12, 2)
+    assertThat(nestedRow.getFieldCount).isEqualTo(2)
+    assertThat(nestedRow.getLong(0)).isEqualTo(123L)
+    assertThat(nestedRow.getString(1).toString).isEqualTo("apache fluss")
+  }
+
+  test("Fluss Write: log table") {
+    val tablePath = createTablePath(logTableName)
+    createFlussTable(tablePath, logTableDescriptor)
+
+    sql(s"""
+           |INSERT INTO $DEFAULT_DATABASE.$logTableName VALUES
+           |(600L, 21L, 601, "addr1"), (700L, 22L, 602, "addr2"),
+           |(800L, 23L, 603, "addr3"), (900L, 24L, 604, "addr4"),
+           |(1000L, 25L, 605, "addr5")
+           |""".stripMargin)
+
+    val table = loadFlussTable(tablePath)
+    val flussRows = getRowsWithChangeType(table).map(_._2)
+
+    val expectRows = Array(
+      createGenericRow(600L, 21L, 601, BinaryString.fromString("addr1")),
+      createGenericRow(700L, 22L, 602, BinaryString.fromString("addr2")),
+      createGenericRow(800L, 23L, 603, BinaryString.fromString("addr3")),
+      createGenericRow(900L, 24L, 604, BinaryString.fromString("addr4")),
+      createGenericRow(1000L, 25L, 605, BinaryString.fromString("addr5"))
+    )
+    assertThat(flussRows.length).isEqualTo(5)
+    assertThat(flussRows).containsAll(expectRows.toIterable.asJava)
+  }
+
+  test("Fluss Write: upsert table") {
+    val tablePath = createTablePath(pkTableName)
+    createFlussTable(tablePath, pkTableDescriptor)
+    val table = loadFlussTable(tablePath)
+    val logScanner = table.newScan.createLogScanner
+    (0 until table.getTableInfo.getNumBuckets).foreach(i => 
logScanner.subscribeFromBeginning(i))
+
+    // insert data
+    sql(s"""
+           |INSERT INTO $DEFAULT_DATABASE.$pkTableName VALUES
+           |(600L, 21L, 601, "addr1"), (700L, 22L, 602, "addr2"),
+           |(800L, 23L, 603, "addr3"), (900L, 24L, 604, "addr4"),
+           |(1000L, 25L, 605, "addr5")
+           |""".stripMargin)
+
+    val flussRows1 = getRowsWithChangeType(table, Some(logScanner))
+    val expectRows1 = Array(
+      ("+I", createGenericRow(600L, 21L, 601, 
BinaryString.fromString("addr1"))),
+      ("+I", createGenericRow(700L, 22L, 602, 
BinaryString.fromString("addr2"))),
+      ("+I", createGenericRow(800L, 23L, 603, 
BinaryString.fromString("addr3"))),
+      ("+I", createGenericRow(900L, 24L, 604, 
BinaryString.fromString("addr4"))),
+      ("+I", createGenericRow(1000L, 25L, 605, 
BinaryString.fromString("addr5")))
+    )
+    assertThat(flussRows1.length).isEqualTo(5)
+    assertThat(flussRows1).containsAll(expectRows1.toIterable.asJava)
+
+    // update data
+    sql(s"""
+           |INSERT INTO $DEFAULT_DATABASE.$pkTableName
+           |VALUES (800L, 230L, 603, "addr3"), (900L, 240L, 604, "addr4")
+           |""".stripMargin)
+
+    val flussRows2 = getRowsWithChangeType(table, Some(logScanner))
+    val expectRows2 = Array(
+      ("-U", createGenericRow(800L, 23L, 603, 
BinaryString.fromString("addr3"))),
+      ("+U", createGenericRow(800L, 230L, 603, 
BinaryString.fromString("addr3"))),
+      ("-U", createGenericRow(900L, 24L, 604, 
BinaryString.fromString("addr4"))),
+      ("+U", createGenericRow(900L, 240L, 604, 
BinaryString.fromString("addr4")))
+    )
+    assertThat(flussRows2.length).isEqualTo(4)
+    assertThat(flussRows2).containsAll(expectRows2.toIterable.asJava)
+  }
+}
+
+object SparkWriteTest {
+
+  val pkTableName: String = "orders_test_pk"
+  val pkSchema: Schema = Schema.newBuilder
+    .column("orderId", DataTypes.BIGINT)
+    .column("itemId", DataTypes.BIGINT)
+    .column("amount", DataTypes.INT)
+    .column("address", DataTypes.STRING)
+    .primaryKey("orderId")
+    .build
+  val pkTableDescriptor: TableDescriptor =
+    TableDescriptor.builder.schema(pkSchema).distributedBy(1, "orderId").build
+
+  val logTableName: String = "orders_test_log"
+  val logSchema: Schema = Schema.newBuilder
+    .column("orderId", DataTypes.BIGINT)
+    .column("itemId", DataTypes.BIGINT)
+    .column("amount", DataTypes.INT)
+    .column("address", DataTypes.STRING)
+    .build
+  val logTableDescriptor: TableDescriptor = 
TableDescriptor.builder.schema(logSchema).build
+
+  case class GenericRowBuilder(fieldCount: Int) {
+    val row = new GenericRow(fieldCount)
+
+    def setField(pos: Int, value: Any): GenericRowBuilder = {
+      row.setField(pos, value)
+      this
+    }
+
+    def builder(): InternalRow = {
+      row
+    }
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussArrayTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussArrayTest.scala
new file mode 100644
index 000000000..fb4ee6308
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussArrayTest.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.row
+
+import org.apache.fluss.spark.FlussSparkTestBase
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.GenericArrayData
+import org.apache.spark.sql.types.{BooleanType, ByteType, Decimal, 
DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, 
StringType, StructField, StructType}
+import org.apache.spark.unsafe.types.UTF8String
+import org.assertj.core.api.Assertions.assertThat
+
+class SparkAsFlussArrayTest extends FlussSparkTestBase {
+
+  test("Fluss SparkAsFlussArray: Boolean Type") {
+
+    val data = Array(true, false, false, true)
+    val sparkArrayData = new GenericArrayData(data)
+    val flussArray = new SparkAsFlussArray(sparkArrayData, BooleanType)
+
+    assertThat(flussArray.size()).isEqualTo(4)
+    assertThat(flussArray.toBooleanArray).containsExactly(data: _*)
+    (1 until data.length).foreach(i => 
assertThat(flussArray.getBoolean(i)).isEqualTo(data(i)))
+  }
+
+  test("Fluss SparkAsFlussArray: Byte Type") {
+    val data = Array(12.toByte, 34.toByte, 56.toByte, 78.toByte)
+    val sparkArrayData = new GenericArrayData(data)
+    val flussArray = new SparkAsFlussArray(sparkArrayData, ByteType)
+
+    assertThat(flussArray.size()).isEqualTo(4)
+    assertThat(flussArray.toByteArray).containsExactly(data: _*)
+    (1 until data.length).foreach(i => 
assertThat(flussArray.getByte(i)).isEqualTo(data(i)))
+  }
+
+  test("Fluss SparkAsFlussArray: Short Type") {
+    val data = Array(12.toShort, 34.toShort, 56.toShort, 78.toShort)
+    val sparkArrayData = new GenericArrayData(data)
+    val flussArray = new SparkAsFlussArray(sparkArrayData, ShortType)
+
+    assertThat(flussArray.size()).isEqualTo(4)
+    assertThat(flussArray.toShortArray).containsExactly(data: _*)
+    (1 until data.length).foreach(i => 
assertThat(flussArray.getShort(i)).isEqualTo(data(i)))
+  }
+
+  test("Fluss SparkAsFlussArray: Int Type") {
+    val data = Array(1234, 3456, 5678, 7890)
+    val sparkArrayData = new GenericArrayData(data)
+    val flussArray = new SparkAsFlussArray(sparkArrayData, IntegerType)
+
+    assertThat(flussArray.size()).isEqualTo(4)
+    assertThat(flussArray.toIntArray).containsExactly(data: _*)
+    (1 until data.length).foreach(i => 
assertThat(flussArray.getInt(i)).isEqualTo(data(i)))
+  }
+
+  test("Fluss SparkAsFlussArray: Long Type") {
+    val data = Array(1234567890L, 3456789012L, 5678901234L, 7890123456L)
+    val sparkArrayData = new GenericArrayData(data)
+    val flussArray = new SparkAsFlussArray(sparkArrayData, LongType)
+
+    assertThat(flussArray.size()).isEqualTo(4)
+    assertThat(flussArray.toLongArray).containsExactly(data: _*)
+    (1 until data.length).foreach(i => 
assertThat(flussArray.getLong(i)).isEqualTo(data(i)))
+  }
+
+  test("Fluss SparkAsFlussArray: Float Type") {
+    val data = Array(123456.78f, 345678.90f, 567890.12f, 789012.34f)
+    val sparkArrayData = new GenericArrayData(data)
+    val flussArray = new SparkAsFlussArray(sparkArrayData, FloatType)
+
+    assertThat(flussArray.size()).isEqualTo(4)
+    assertThat(flussArray.toFloatArray).containsExactly(data: _*)
+    (1 until data.length).foreach(i => 
assertThat(flussArray.getFloat(i)).isEqualTo(data(i)))
+  }
+
+  test("Fluss SparkAsFlussArray: Double Type") {
+    val data = Array(1234567890.12, 3456789012.34, 5678901234.56, 
7890123456.78)
+    val sparkArrayData = new GenericArrayData(data)
+    val flussArray = new SparkAsFlussArray(sparkArrayData, DoubleType)
+
+    assertThat(flussArray.size()).isEqualTo(4)
+    assertThat(flussArray.toDoubleArray).containsExactly(data: _*)
+    (1 until data.length).foreach(i => 
assertThat(flussArray.getDouble(i)).isEqualTo(data(i)))
+  }
+
+  test("Fluss SparkAsFlussArray: Struct Type") {
+    val schema = StructType(
+      Array(
+        StructField("id", LongType),
+        StructField("salary", new DecimalType(10, 2)),
+        StructField("pt", StringType)
+      ))
+    val data = Array(
+      InternalRow.apply(1L, Decimal(BigDecimal("123.4")), 
UTF8String.fromString("apache")),
+      null,
+      InternalRow.apply(2L, Decimal(BigDecimal("567.8")), 
UTF8String.fromString("fluss"))
+    )
+    val flussRow = data.map(new SparkAsFlussRow(schema).replace)
+
+    val sparkArrayData = new GenericArrayData(data)
+    val flussArray = new SparkAsFlussArray(sparkArrayData, schema)
+
+    assertThat(flussArray.size()).isEqualTo(3)
+    (1 until data.length).foreach {
+      i =>
+        val isNull = if (i == 1) true else false
+        assertThat(flussArray.isNullAt(i)).isEqualTo(isNull)
+        if (!isNull) {
+          assertThat(flussArray.getRow(i, 
3).getLong(0)).isEqualTo(flussRow(i).getLong(0))
+          assertThat(flussArray.getRow(i, 3).getDecimal(1, 10, 2).toBigDecimal)
+            .isEqualTo(flussRow(i).getDecimal(1, 10, 2).toBigDecimal)
+          assertThat(flussArray.getRow(i, 3).getString(2).toString)
+            .isEqualTo(flussRow(i).getString(2).toString)
+        }
+    }
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussRowTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussRowTest.scala
new file mode 100644
index 000000000..6a5240427
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/SparkAsFlussRowTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.row
+
+import org.apache.fluss.spark.FlussSparkTestBase
+import org.apache.fluss.spark.util.TestUtils.SCHEMA
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.GenericArrayData
+import org.apache.spark.sql.types.Decimal
+import org.apache.spark.unsafe.types.UTF8String
+import org.assertj.core.api.Assertions.assertThat
+
+import java.sql.Timestamp
+
+class SparkAsFlussRowTest extends FlussSparkTestBase {
+
+  private var row: SparkAsFlussRow = _
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    val data = InternalRow.fromSeq(
+      Seq(
+        true,
+        1.toByte,
+        10.toShort,
+        100,
+        1000L,
+        12.3f,
+        45.6d,
+        Decimal(BigDecimal("1234567.89")),
+        Decimal(BigDecimal("12345678900987654321.12")),
+        UTF8String.fromString("test"),
+        Timestamp.valueOf("2025-12-31 10:00:00").getTime * 1000,
+        new GenericArrayData(Array(11.11f, 22.22f)),
+        InternalRow.apply(123L, UTF8String.fromString("apache fluss"))
+      ))
+    row = new SparkAsFlussRow(SCHEMA).replace(data)
+  }
+
+  test("Fluss SparkAsFlussRow") {
+    assertThat(row.fieldCount).isEqualTo(13)
+
+    assertThat(row.getBoolean(0)).isEqualTo(true)
+    assertThat(row.getByte(1)).isEqualTo(1.toByte)
+    assertThat(row.getShort(2)).isEqualTo(10.toShort)
+    assertThat(row.getInt(3)).isEqualTo(100)
+    assertThat(row.getLong(4)).isEqualTo(1000L)
+    assertThat(row.getFloat(5)).isEqualTo(12.3f)
+    assertThat(row.getDouble(6)).isEqualTo(45.6)
+    assertThat(row.getDecimal(7, 10, 
2).toBigDecimal).isEqualTo(BigDecimal("1234567.89").bigDecimal)
+    assertThat(row.getDecimal(8, 38, 2).toBigDecimal)
+      .isEqualTo(BigDecimal("12345678900987654321.12").bigDecimal)
+    assertThat(row.getString(9).toString).isEqualTo("test")
+    assertThat(row.getTimestampLtz(10, 6).toInstant)
+      .isEqualTo(Timestamp.valueOf("2025-12-31 10:00:00.0").toInstant)
+
+    // test array type
+    assertThat(row.getArray(11).toFloatArray).containsExactly(Array(11.11f, 
22.22f): _*)
+
+    // test row type
+    val nestedRow = row.getRow(12, 2)
+    assertThat(nestedRow.getFieldCount).isEqualTo(2)
+    assertThat(nestedRow.getLong(0)).isEqualTo(123L)
+    assertThat(nestedRow.getString(1).toString).isEqualTo("apache fluss")
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/util/TestUtils.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/util/TestUtils.scala
new file mode 100644
index 000000000..704ca2ed5
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/util/TestUtils.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.util
+
+import org.apache.fluss.row.GenericRow
+import org.apache.fluss.spark.SparkConversions
+import org.apache.fluss.types.RowType
+
+import org.apache.spark.sql.types._
+
+object TestUtils {
+
+  val SCHEMA: StructType = StructType(
+    Seq(
+      StructField("c_bool", BooleanType),
+      StructField("c_byte", ByteType),
+      StructField("c_short", ShortType),
+      StructField("c_int", IntegerType),
+      StructField("c_long", LongType),
+      StructField("c_float", FloatType),
+      StructField("c_double", DoubleType),
+      StructField("c_decimal_1", DecimalType(10, 2)),
+      StructField("c_decimal_2", DecimalType(38, 2)),
+      StructField("c_string", StringType),
+      // StructField("date", DateType),
+      StructField("c_timestamp", TimestampType),
+      StructField("c_array", ArrayType(FloatType, containsNull = false)),
+      StructField(
+        "c_row",
+        StructType(Seq(StructField("id", LongType), StructField("name", 
StringType))))
+    )
+  )
+
+  val FLUSS_ROWTYPE: RowType = SparkConversions.toFlussDataType(SCHEMA)
+
+  def createGenericRow(values: Any*): GenericRow = {
+    GenericRow.of(values.map(_.asInstanceOf[java.lang.Object]): _*)
+  }
+}

Reply via email to