wuchong commented on code in PR #2277:
URL: https://github.com/apache/fluss/pull/2277#discussion_r2655578221


##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussRow.scala:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.row
+
+import org.apache.fluss.row.{BinaryString, Decimal, InternalRow => 
FlussInternalRow, TimestampLtz, TimestampNtz}
+
+import org.apache.spark.sql.catalyst.{InternalRow => SparkInternalRow}
+import org.apache.spark.sql.catalyst.util.SparkDateTimeUtils
+import org.apache.spark.sql.types.StructType
+
+/** Wraps a Spark [[SparkInternalRow]] as a Fluss [[FlussInternalRow]]. */
+class SparkAsFlussRow(schema: StructType) extends FlussInternalRow with 
Serializable {
+
+  val fieldCount: Int = schema.length
+
+  var row: SparkInternalRow = _
+
+  def replace(row: SparkInternalRow): SparkAsFlussRow = {
+    this.row = row
+    this
+  }
+
+  /**
+   * Returns the number of fields in this row.
+   *
+   * <p>The number does not include {@link ChangeType}. It is kept separately.
+   */
+  override def getFieldCount: Int = fieldCount
+
+  /** Returns true if the element is null at the given position. */
+  override def isNullAt(pos: Int): Boolean = row.isNullAt(pos)
+
+  /** Returns the boolean value at the given position. */
+  override def getBoolean(pos: Int): Boolean = row.getBoolean(pos)
+
+  /** Returns the byte value at the given position. */
+  override def getByte(pos: Int): Byte = row.getByte(pos)
+
+  /** Returns the short value at the given position. */
+  override def getShort(pos: Int): Short = row.getShort(pos)
+
+  /** Returns the integer value at the given position. */
+  override def getInt(pos: Int): Int = row.getInt(pos)
+
+  /** Returns the long value at the given position. */
+  override def getLong(pos: Int): Long = row.getLong(pos)
+
+  /** Returns the float value at the given position. */
+  override def getFloat(pos: Int): Float = row.getFloat(pos)
+
+  /** Returns the double value at the given position. */
+  override def getDouble(pos: Int): Double = row.getDouble(pos)
+
+  /** Returns the string value at the given position with fixed length. */
+  override def getChar(pos: Int, length: Int): BinaryString =
+    BinaryString.fromString(row.getUTF8String(pos).toString)
+
+  /** Returns the string value at the given position. */
+  override def getString(pos: Int): BinaryString = 
BinaryString.fromString(row.getString(pos))
+
+  /**
+   * Returns the decimal value at the given position.
+   *
+   * <p>The precision and scale are required to determine whether the decimal 
value was stored in a
+   * compact representation (see {@link Decimal}).
+   */
+  override def getDecimal(pos: Int, precision: Int, scale: Int): Decimal = {
+    val sparkDecimal = row.getDecimal(pos, precision, scale)
+    if (sparkDecimal.precision <= 
org.apache.spark.sql.types.Decimal.MAX_LONG_DIGITS)
+      Decimal.fromUnscaledLong(
+        sparkDecimal.toUnscaledLong,
+        sparkDecimal.precision,
+        sparkDecimal.scale)
+    else
+      Decimal.fromBigDecimal(
+        sparkDecimal.toJavaBigDecimal,
+        sparkDecimal.precision,
+        sparkDecimal.scale)
+  }
+
+  /**
+   * Returns the timestamp value at the given position.
+   *
+   * <p>The precision is required to determine whether the timestamp value was 
stored in a compact
+   * representation (see {@link TimestampNtz}).
+   */
+  override def getTimestampNtz(pos: Int, precision: Int): TimestampNtz =
+    
TimestampNtz.fromMillis(SparkDateTimeUtils.microsToMillis(row.getLong(pos)))

Review Comment:
   ditto



##########
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkWriteTest.scala:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.metadata.{Schema, TableDescriptor}
+import org.apache.fluss.row.{BinaryString, GenericRow, InternalRow}
+import org.apache.fluss.spark.util.TestUtils.FLUSS_ROWTYPE
+import org.apache.fluss.types.DataTypes
+
+import org.assertj.core.api.Assertions.assertThat
+
+import java.sql.Timestamp
+import java.time.Duration
+
+import scala.collection.JavaConverters._
+
+class SparkWriteTest extends FlussSparkTestBase {
+
+  import SparkWriteTest._
+
+  test("Fluss Write: all data types") {
+    val tablePath = createTablePath("test_all_data_types")
+    val tableDescriptor: TableDescriptor = TableDescriptor.builder
+      .schema(Schema.newBuilder().fromRowType(FLUSS_ROWTYPE).build)
+      .build()
+    createFlussTable(tablePath, tableDescriptor)
+
+    spark.sql(s"""
+                 |INSERT INTO $DEFAULT_DATABASE.test_all_data_types
+                 |VALUES (
+                 |  true, 1, 10, 100, 1000L, 12.3F, 45.6D,
+                 |  1234567.89, 12345678900987654321.12,
+                 |  "test",
+                 |  TO_TIMESTAMP('2025-12-31 10:00:00', 'yyyy-MM-dd kk:mm:ss'),
+                 |  array(11.11F, 22.22F), struct(123L, "apache fluss")
+                 |)
+                 |""".stripMargin)
+
+    val table = loadFlussTable(tablePath)
+    val rows = getRowsWithChangeType(table).map(_._2)
+    assertThat(rows.length).isEqualTo(1)
+
+    val row = rows.head
+    assertThat(row.getFieldCount).isEqualTo(13)
+    assertThat(row.getBoolean(0)).isEqualTo(true)
+    assertThat(row.getByte(1)).isEqualTo(1.toByte)
+    assertThat(row.getShort(2)).isEqualTo(10.toShort)
+    assertThat(row.getInt(3)).isEqualTo(100)
+    assertThat(row.getLong(4)).isEqualTo(1000L)
+    assertThat(row.getFloat(5)).isEqualTo(12.3f)
+    assertThat(row.getDouble(6)).isEqualTo(45.6)
+    assertThat(row.getDecimal(7, 10, 
2).toBigDecimal).isEqualTo(BigDecimal("1234567.89").bigDecimal)
+    assertThat(row.getDecimal(8, 38, 2).toBigDecimal)
+      .isEqualTo(BigDecimal("12345678900987654321.12").bigDecimal)
+    assertThat(row.getString(9).toString).isEqualTo("test")
+    assertThat(row.getTimestampLtz(10, 6).toInstant)
+      .isEqualTo(Timestamp.valueOf("2025-12-31 10:00:00.0").toInstant)
+    assertThat(row.getArray(11).toFloatArray).containsExactly(Array(11.11f, 
22.22f): _*)
+    val nestedRow = row.getRow(12, 2)
+    assertThat(nestedRow.getFieldCount).isEqualTo(2)
+    assertThat(nestedRow.getLong(0)).isEqualTo(123L)
+    assertThat(nestedRow.getString(1).toString).isEqualTo("apache fluss")
+  }
+
+  test("Fluss Write: log table") {
+    val tablePath = createTablePath(logTableName)
+    createFlussTable(tablePath, logTableDescriptor)
+
+    sql(s"""
+           |INSERT INTO $DEFAULT_DATABASE.$logTableName VALUES
+           |(600L, 21L, 601, "addr1"), (700L, 22L, 602, "addr2"),
+           |(800L, 23L, 603, "addr3"), (900L, 24L, 604, "addr4"),
+           |(1000L, 25L, 605, "addr5")
+           |""".stripMargin)
+
+    val table = loadFlussTable(tablePath)
+    val flussRows = getRowsWithChangeType(table).map(_._2)
+
+    val expectRows = Array(
+      GenericRowBuilder(4)
+        .setField(0, 600L)
+        .setField(1, 21L)
+        .setField(2, 601)
+        .setField(3, BinaryString.fromString("addr1"))
+        .builder(),

Review Comment:
   We can replace the builder with `GenericRow.of(800L, 23L, 603, 
fromString("addr3"))`, which is more concise.



##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussDataWriter.scala:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.write
+
+import org.apache.fluss.client.{Connection, ConnectionFactory}
+import org.apache.fluss.client.table.Table
+import org.apache.fluss.client.table.writer.{AppendWriter, TableWriter, 
UpsertWriter}
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.TablePath
+import org.apache.fluss.spark.row.SparkAsFlussRow
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A fluss implementation of Spark [[WriterCommitMessage]]. Fluss, As a 
service, accepts data and
+ * commit inside of it, so client does nothing.
+ */
+case class FlussWriterCommitMessage() extends WriterCommitMessage
+
+/** An abstract class to Spark [[DataWriter]]. */
+abstract class FlussDataWriter(
+    tablePath: TablePath,
+    dataSchema: StructType,
+    flussConfig: Configuration)
+  extends DataWriter[InternalRow]
+  with Logging {
+
+  private lazy val conn: Connection = 
ConnectionFactory.createConnection(flussConfig)
+
+  lazy val table: Table = conn.getTable(tablePath)
+
+  val writer: TableWriter
+
+  protected val flussRow = new SparkAsFlussRow(dataSchema)
+
+  override def commit(): WriterCommitMessage = {
+    writer.flush()
+    FlussWriterCommitMessage()
+  }
+
+  override def abort(): Unit = this.close()
+
+  override def close(): Unit = {
+    if (table != null) {
+      table.close()
+    }
+    if (conn != null) {
+      conn.close()
+    }
+  }
+}
+
+/** Spark-Fluss Append Data Writer. */
+case class FlussAppendDataWriter(
+    tablePath: TablePath,
+    dataSchema: StructType,
+    flussConfig: Configuration)
+  extends FlussDataWriter(tablePath, dataSchema, flussConfig) {
+
+  override val writer: AppendWriter = table.newAppend().createWriter()
+
+  override def write(record: InternalRow): Unit = {
+    writer.append(flussRow.replace(record)).whenComplete {
+      (_, exception) =>
+        {
+          if (exception != null) {
+//            logError("Exception occurs while append row to fluss.", 
exception);
+            throw new RuntimeException("Failed to append record", exception)

Review Comment:
   We must not throw exceptions directly in the completion callback, as they 
won’t propagate to the Spark writer and may be silently ignored.
   
   Instead, we should capture any exception in a volatile field (e.g., 
`asyncWriterException`) within the Spark writer. Then, we can expose a 
`checkAsyncException()` method that throws the captured exception if it’s 
non-null.
   
   This check should be invoked:
   
   - At the beginning of `DataWriter#write`, to catch failures from prior async 
operations before processing new records, and
   - And after `writer.flush()` in `DataWriter#commit`, to ensure any failure 
during flush or finalization is surfaced during commit.
   
   This pattern ensures async errors are properly reported through Spark’s 
writer lifecycle. You can take `FlinkSinkWriter` as an example.



##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussArray.scala:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.row
+
+import org.apache.fluss.row.{BinaryString, Decimal, InternalArray => 
FlussInternalArray, InternalRow => FlussInternalRow, TimestampLtz, TimestampNtz}
+
+import org.apache.spark.sql.catalyst.util.{ArrayData => SparkArrayData, 
SparkDateTimeUtils}
+import org.apache.spark.sql.types.{ArrayType => SparkArrayType, DataType => 
SparkDataType, StructType}
+
+/** Wraps a Spark [[SparkArrayData]] as a Fluss [[FlussInternalArray]]. */
+class SparkAsFlussArray(arrayData: SparkArrayData, elementType: SparkDataType)

Review Comment:
   I saw `SparkAsFlussRow` extends `Serializable` interface, do we need to make 
`SparkAsFlussArray` also extend `Serializable`?



##########
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkWriteTest.scala:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.metadata.{Schema, TableDescriptor}
+import org.apache.fluss.row.{BinaryString, GenericRow, InternalRow}
+import org.apache.fluss.spark.util.TestUtils.FLUSS_ROWTYPE
+import org.apache.fluss.types.DataTypes
+
+import org.assertj.core.api.Assertions.assertThat
+
+import java.sql.Timestamp
+import java.time.Duration
+
+import scala.collection.JavaConverters._
+
+class SparkWriteTest extends FlussSparkTestBase {
+
+  import SparkWriteTest._
+
+  test("Fluss Write: all data types") {
+    val tablePath = createTablePath("test_all_data_types")
+    val tableDescriptor: TableDescriptor = TableDescriptor.builder
+      .schema(Schema.newBuilder().fromRowType(FLUSS_ROWTYPE).build)
+      .build()
+    createFlussTable(tablePath, tableDescriptor)
+
+    spark.sql(s"""
+                 |INSERT INTO $DEFAULT_DATABASE.test_all_data_types
+                 |VALUES (
+                 |  true, 1, 10, 100, 1000L, 12.3F, 45.6D,
+                 |  1234567.89, 12345678900987654321.12,
+                 |  "test",
+                 |  TO_TIMESTAMP('2025-12-31 10:00:00', 'yyyy-MM-dd kk:mm:ss'),
+                 |  array(11.11F, 22.22F), struct(123L, "apache fluss")
+                 |)
+                 |""".stripMargin)
+
+    val table = loadFlussTable(tablePath)
+    val rows = getRowsWithChangeType(table).map(_._2)
+    assertThat(rows.length).isEqualTo(1)
+
+    val row = rows.head
+    assertThat(row.getFieldCount).isEqualTo(13)
+    assertThat(row.getBoolean(0)).isEqualTo(true)
+    assertThat(row.getByte(1)).isEqualTo(1.toByte)
+    assertThat(row.getShort(2)).isEqualTo(10.toShort)
+    assertThat(row.getInt(3)).isEqualTo(100)
+    assertThat(row.getLong(4)).isEqualTo(1000L)
+    assertThat(row.getFloat(5)).isEqualTo(12.3f)
+    assertThat(row.getDouble(6)).isEqualTo(45.6)
+    assertThat(row.getDecimal(7, 10, 
2).toBigDecimal).isEqualTo(BigDecimal("1234567.89").bigDecimal)
+    assertThat(row.getDecimal(8, 38, 2).toBigDecimal)
+      .isEqualTo(BigDecimal("12345678900987654321.12").bigDecimal)
+    assertThat(row.getString(9).toString).isEqualTo("test")
+    assertThat(row.getTimestampLtz(10, 6).toInstant)
+      .isEqualTo(Timestamp.valueOf("2025-12-31 10:00:00.0").toInstant)
+    assertThat(row.getArray(11).toFloatArray).containsExactly(Array(11.11f, 
22.22f): _*)
+    val nestedRow = row.getRow(12, 2)
+    assertThat(nestedRow.getFieldCount).isEqualTo(2)
+    assertThat(nestedRow.getLong(0)).isEqualTo(123L)
+    assertThat(nestedRow.getString(1).toString).isEqualTo("apache fluss")
+  }
+
+  test("Fluss Write: log table") {
+    val tablePath = createTablePath(logTableName)
+    createFlussTable(tablePath, logTableDescriptor)
+
+    sql(s"""
+           |INSERT INTO $DEFAULT_DATABASE.$logTableName VALUES
+           |(600L, 21L, 601, "addr1"), (700L, 22L, 602, "addr2"),
+           |(800L, 23L, 603, "addr3"), (900L, 24L, 604, "addr4"),
+           |(1000L, 25L, 605, "addr5")
+           |""".stripMargin)
+
+    val table = loadFlussTable(tablePath)
+    val flussRows = getRowsWithChangeType(table).map(_._2)
+
+    val expectRows = Array(
+      GenericRowBuilder(4)
+        .setField(0, 600L)
+        .setField(1, 21L)
+        .setField(2, 601)
+        .setField(3, BinaryString.fromString("addr1"))
+        .builder(),
+      GenericRowBuilder(4)
+        .setField(0, 700L)
+        .setField(1, 22L)
+        .setField(2, 602)
+        .setField(3, BinaryString.fromString("addr2"))
+        .builder(),
+      GenericRowBuilder(4)
+        .setField(0, 800L)
+        .setField(1, 23L)
+        .setField(2, 603)
+        .setField(3, BinaryString.fromString("addr3"))
+        .builder(),
+      GenericRowBuilder(4)
+        .setField(0, 900L)
+        .setField(1, 24L)
+        .setField(2, 604)
+        .setField(3, BinaryString.fromString("addr4"))
+        .builder(),
+      GenericRowBuilder(4)
+        .setField(0, 1000L)
+        .setField(1, 25L)
+        .setField(2, 605)
+        .setField(3, BinaryString.fromString("addr5"))
+        .builder()
+    )
+    assertThat(flussRows.length).isEqualTo(5)
+    assertThat(flussRows).containsAll(expectRows.toIterable.asJava)
+  }
+
+  test("Fluss Write: upsert table") {
+    val tablePath = createTablePath(pkTableName)
+    createFlussTable(tablePath, pkTableDescriptor)
+    val table = loadFlussTable(tablePath)
+    val logScanner = table.newScan.createLogScanner
+    (0 until table.getTableInfo.getNumBuckets).foreach(i => 
logScanner.subscribeFromBeginning(i))
+
+    // insert data
+    sql(s"""
+           |INSERT INTO $DEFAULT_DATABASE.$pkTableName VALUES
+           |(600L, 21L, 601, "addr1"), (700L, 22L, 602, "addr2"),
+           |(800L, 23L, 603, "addr3"), (900L, 24L, 604, "addr4"),
+           |(1000L, 25L, 605, "addr5")
+           |""".stripMargin)
+
+    val flussRows1 = getRowsWithChangeType(table, Some(logScanner))
+    val expectRows1 = Array(
+      (
+        "+I",
+        GenericRowBuilder(4)
+          .setField(0, 600L)
+          .setField(1, 21L)
+          .setField(2, 601)
+          .setField(3, BinaryString.fromString("addr1"))
+          .builder()),
+      (
+        "+I",
+        GenericRowBuilder(4)
+          .setField(0, 700L)
+          .setField(1, 22L)
+          .setField(2, 602)
+          .setField(3, BinaryString.fromString("addr2"))
+          .builder()),
+      (
+        "+I",
+        GenericRowBuilder(4)
+          .setField(0, 800L)
+          .setField(1, 23L)
+          .setField(2, 603)
+          .setField(3, BinaryString.fromString("addr3"))
+          .builder()),
+      (
+        "+I",
+        GenericRowBuilder(4)
+          .setField(0, 900L)
+          .setField(1, 24L)
+          .setField(2, 604)
+          .setField(3, BinaryString.fromString("addr4"))
+          .builder()),
+      (
+        "+I",
+        GenericRowBuilder(4)
+          .setField(0, 1000L)
+          .setField(1, 25L)
+          .setField(2, 605)
+          .setField(3, BinaryString.fromString("addr5"))
+          .builder())
+    )
+    assertThat(flussRows1.length).isEqualTo(5)
+    assertThat(flussRows1).containsAll(expectRows1.toIterable.asJava)
+
+    // update data
+    sql(s"""
+           |INSERT INTO $DEFAULT_DATABASE.$pkTableName VALUES
+           |(800L, 230L, 603, "addr3"), (900L, 240L, 604, "addr4")
+           |""".stripMargin)
+
+    val flussRows2 = getRowsWithChangeType(table, Some(logScanner))
+    val expectRows2 = Array(
+      (
+        "-U",
+        GenericRowBuilder(4)
+          .setField(0, 800L)
+          .setField(1, 23L)
+          .setField(2, 603)
+          .setField(3, BinaryString.fromString("addr3"))
+          .builder()),
+      (
+        "-U",
+        GenericRowBuilder(4)
+          .setField(0, 900L)
+          .setField(1, 24L)
+          .setField(2, 604)
+          .setField(3, BinaryString.fromString("addr4"))
+          .builder()),
+      (
+        "+U",
+        GenericRowBuilder(4)
+          .setField(0, 800L)
+          .setField(1, 230L)
+          .setField(2, 603)
+          .setField(3, BinaryString.fromString("addr3"))
+          .builder()),

Review Comment:
   nit: move this `+U` near after the corresponding `-U` message



##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/write/FlussDataWriter.scala:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.write
+
+import org.apache.fluss.client.{Connection, ConnectionFactory}
+import org.apache.fluss.client.table.Table
+import org.apache.fluss.client.table.writer.{AppendWriter, TableWriter, 
UpsertWriter}
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.TablePath
+import org.apache.fluss.spark.row.SparkAsFlussRow
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.write.{DataWriter, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A fluss implementation of Spark [[WriterCommitMessage]]. Fluss, As a 
service, accepts data and
+ * commit inside of it, so client does nothing.
+ */
+case class FlussWriterCommitMessage() extends WriterCommitMessage
+
+/** An abstract class to Spark [[DataWriter]]. */
+abstract class FlussDataWriter(
+    tablePath: TablePath,
+    dataSchema: StructType,
+    flussConfig: Configuration)
+  extends DataWriter[InternalRow]
+  with Logging {
+
+  private lazy val conn: Connection = 
ConnectionFactory.createConnection(flussConfig)
+
+  lazy val table: Table = conn.getTable(tablePath)
+
+  val writer: TableWriter
+
+  protected val flussRow = new SparkAsFlussRow(dataSchema)
+
+  override def commit(): WriterCommitMessage = {
+    writer.flush()
+    FlussWriterCommitMessage()
+  }
+
+  override def abort(): Unit = this.close()
+
+  override def close(): Unit = {
+    if (table != null) {
+      table.close()
+    }
+    if (conn != null) {
+      conn.close()
+    }
+  }
+}
+
+/** Spark-Fluss Append Data Writer. */
+case class FlussAppendDataWriter(
+    tablePath: TablePath,
+    dataSchema: StructType,
+    flussConfig: Configuration)
+  extends FlussDataWriter(tablePath, dataSchema, flussConfig) {
+
+  override val writer: AppendWriter = table.newAppend().createWriter()
+
+  override def write(record: InternalRow): Unit = {
+    writer.append(flussRow.replace(record)).whenComplete {
+      (_, exception) =>
+        {
+          if (exception != null) {
+//            logError("Exception occurs while append row to fluss.", 
exception);
+            throw new RuntimeException("Failed to append record", exception)
+          }
+        }
+    }
+  }
+}
+
+/** Spark-Fluss Upsert Data Writer. */
+case class FlussUpsertDataWriter(
+    tablePath: TablePath,
+    dataSchema: StructType,
+    flussConfig: Configuration)
+  extends FlussDataWriter(tablePath, dataSchema, flussConfig) {
+
+  override val writer: UpsertWriter = table.newUpsert().createWriter()
+
+  override def write(record: InternalRow): Unit = {
+    writer.upsert(flussRow.replace(record)).whenComplete {
+      (_, exception) =>
+        {
+          if (exception != null) {
+            logError("Exception occurs while upsert row to fluss.", exception);
+            throw new RuntimeException("Failed to upsert record", exception)

Review Comment:
   ditto. and we can move the logging to the `checkAsyncException()` method.



##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/SparkAsFlussArray.scala:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.row
+
+import org.apache.fluss.row.{BinaryString, Decimal, InternalArray => 
FlussInternalArray, InternalRow => FlussInternalRow, TimestampLtz, TimestampNtz}
+
+import org.apache.spark.sql.catalyst.util.{ArrayData => SparkArrayData, 
SparkDateTimeUtils}
+import org.apache.spark.sql.types.{ArrayType => SparkArrayType, DataType => 
SparkDataType, StructType}
+
+/** Wraps a Spark [[SparkArrayData]] as a Fluss [[FlussInternalArray]]. */
+class SparkAsFlussArray(arrayData: SparkArrayData, elementType: SparkDataType)
+  extends FlussInternalArray {
+
+  /** Returns the number of elements in this array. */
+  override def size(): Int = arrayData.numElements()
+
+  override def toBooleanArray: Array[Boolean] = arrayData.toBooleanArray()
+
+  override def toByteArray: Array[Byte] = arrayData.toByteArray()
+
+  override def toShortArray: Array[Short] = arrayData.toShortArray()
+
+  override def toIntArray: Array[Int] = arrayData.toIntArray()
+
+  override def toLongArray: Array[Long] = arrayData.toLongArray()
+
+  override def toFloatArray: Array[Float] = arrayData.toFloatArray()
+
+  override def toDoubleArray: Array[Double] = arrayData.toDoubleArray()
+
+  /** Returns true if the element is null at the given position. */
+  override def isNullAt(pos: Int): Boolean = arrayData.isNullAt(pos)
+
+  /** Returns the boolean value at the given position. */
+  override def getBoolean(pos: Int): Boolean = arrayData.getBoolean(pos)
+
+  /** Returns the byte value at the given position. */
+  override def getByte(pos: Int): Byte = arrayData.getByte(pos)
+
+  /** Returns the short value at the given position. */
+  override def getShort(pos: Int): Short = arrayData.getShort(pos)
+
+  /** Returns the integer value at the given position. */
+  override def getInt(pos: Int): Int = arrayData.getInt(pos)
+
+  /** Returns the long value at the given position. */
+  override def getLong(pos: Int): Long = arrayData.getLong(pos)
+
+  /** Returns the float value at the given position. */
+  override def getFloat(pos: Int): Float = arrayData.getFloat(pos)
+
+  /** Returns the double value at the given position. */
+  override def getDouble(pos: Int): Double = arrayData.getDouble(pos)
+
+  /** Returns the string value at the given position with fixed length. */
+  override def getChar(pos: Int, length: Int): BinaryString =
+    BinaryString.fromBytes(arrayData.getUTF8String(pos).getBytes)
+
+  /** Returns the string value at the given position. */
+  override def getString(pos: Int): BinaryString =
+    BinaryString.fromBytes(arrayData.getUTF8String(pos).getBytes)
+
+  /**
+   * Returns the decimal value at the given position.
+   *
+   * <p>The precision and scale are required to determine whether the decimal 
value was stored in a
+   * compact representation (see {@link Decimal}).
+   */
+  override def getDecimal(pos: Int, precision: Int, scale: Int): Decimal = {
+    val sparkDecimal = arrayData.getDecimal(pos, precision, scale)
+    if (sparkDecimal.precision <= 
org.apache.spark.sql.types.Decimal.MAX_LONG_DIGITS)
+      Decimal.fromUnscaledLong(
+        sparkDecimal.toUnscaledLong,
+        sparkDecimal.precision,
+        sparkDecimal.scale)
+    else
+      Decimal.fromBigDecimal(
+        sparkDecimal.toJavaBigDecimal,
+        sparkDecimal.precision,
+        sparkDecimal.scale)
+  }
+
+  /**
+   * Returns the timestamp value at the given position.
+   *
+   * <p>The precision is required to determine whether the timestamp value was 
stored in a compact
+   * representation (see {@link TimestampNtz}).
+   */
+  override def getTimestampNtz(pos: Int, precision: Int): TimestampNtz =
+    
TimestampNtz.fromMillis(SparkDateTimeUtils.microsToMillis(arrayData.getLong(pos)))

Review Comment:
   I think we can introduce a method `TimestampNtz.fromMicros` like 
`TimestampLtz.fromEpochMicros` to suppoert convert from micro to 
`TimestampNtz`. Converting from micro to millis will lose the nano precisions. 



##########
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkWriteTest.scala:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.metadata.{Schema, TableDescriptor}
+import org.apache.fluss.row.{BinaryString, GenericRow, InternalRow}
+import org.apache.fluss.spark.util.TestUtils.FLUSS_ROWTYPE
+import org.apache.fluss.types.DataTypes
+
+import org.assertj.core.api.Assertions.assertThat
+
+import java.sql.Timestamp
+import java.time.Duration
+
+import scala.collection.JavaConverters._
+
+class SparkWriteTest extends FlussSparkTestBase {
+
+  import SparkWriteTest._
+
+  test("Fluss Write: all data types") {
+    val tablePath = createTablePath("test_all_data_types")
+    val tableDescriptor: TableDescriptor = TableDescriptor.builder
+      .schema(Schema.newBuilder().fromRowType(FLUSS_ROWTYPE).build)
+      .build()
+    createFlussTable(tablePath, tableDescriptor)
+
+    spark.sql(s"""
+                 |INSERT INTO $DEFAULT_DATABASE.test_all_data_types
+                 |VALUES (
+                 |  true, 1, 10, 100, 1000L, 12.3F, 45.6D,
+                 |  1234567.89, 12345678900987654321.12,
+                 |  "test",
+                 |  TO_TIMESTAMP('2025-12-31 10:00:00', 'yyyy-MM-dd kk:mm:ss'),
+                 |  array(11.11F, 22.22F), struct(123L, "apache fluss")
+                 |)
+                 |""".stripMargin)
+
+    val table = loadFlussTable(tablePath)
+    val rows = getRowsWithChangeType(table).map(_._2)
+    assertThat(rows.length).isEqualTo(1)
+
+    val row = rows.head
+    assertThat(row.getFieldCount).isEqualTo(13)
+    assertThat(row.getBoolean(0)).isEqualTo(true)
+    assertThat(row.getByte(1)).isEqualTo(1.toByte)
+    assertThat(row.getShort(2)).isEqualTo(10.toShort)
+    assertThat(row.getInt(3)).isEqualTo(100)
+    assertThat(row.getLong(4)).isEqualTo(1000L)
+    assertThat(row.getFloat(5)).isEqualTo(12.3f)
+    assertThat(row.getDouble(6)).isEqualTo(45.6)
+    assertThat(row.getDecimal(7, 10, 
2).toBigDecimal).isEqualTo(BigDecimal("1234567.89").bigDecimal)
+    assertThat(row.getDecimal(8, 38, 2).toBigDecimal)
+      .isEqualTo(BigDecimal("12345678900987654321.12").bigDecimal)
+    assertThat(row.getString(9).toString).isEqualTo("test")
+    assertThat(row.getTimestampLtz(10, 6).toInstant)
+      .isEqualTo(Timestamp.valueOf("2025-12-31 10:00:00.0").toInstant)
+    assertThat(row.getArray(11).toFloatArray).containsExactly(Array(11.11f, 
22.22f): _*)
+    val nestedRow = row.getRow(12, 2)
+    assertThat(nestedRow.getFieldCount).isEqualTo(2)
+    assertThat(nestedRow.getLong(0)).isEqualTo(123L)
+    assertThat(nestedRow.getString(1).toString).isEqualTo("apache fluss")
+  }
+
+  test("Fluss Write: log table") {
+    val tablePath = createTablePath(logTableName)
+    createFlussTable(tablePath, logTableDescriptor)
+
+    sql(s"""
+           |INSERT INTO $DEFAULT_DATABASE.$logTableName VALUES
+           |(600L, 21L, 601, "addr1"), (700L, 22L, 602, "addr2"),
+           |(800L, 23L, 603, "addr3"), (900L, 24L, 604, "addr4"),
+           |(1000L, 25L, 605, "addr5")
+           |""".stripMargin)
+
+    val table = loadFlussTable(tablePath)
+    val flussRows = getRowsWithChangeType(table).map(_._2)
+
+    val expectRows = Array(
+      GenericRowBuilder(4)
+        .setField(0, 600L)
+        .setField(1, 21L)
+        .setField(2, 601)
+        .setField(3, BinaryString.fromString("addr1"))
+        .builder(),
+      GenericRowBuilder(4)
+        .setField(0, 700L)
+        .setField(1, 22L)
+        .setField(2, 602)
+        .setField(3, BinaryString.fromString("addr2"))
+        .builder(),
+      GenericRowBuilder(4)
+        .setField(0, 800L)
+        .setField(1, 23L)
+        .setField(2, 603)
+        .setField(3, BinaryString.fromString("addr3"))
+        .builder(),
+      GenericRowBuilder(4)
+        .setField(0, 900L)
+        .setField(1, 24L)
+        .setField(2, 604)
+        .setField(3, BinaryString.fromString("addr4"))
+        .builder(),
+      GenericRowBuilder(4)
+        .setField(0, 1000L)
+        .setField(1, 25L)
+        .setField(2, 605)
+        .setField(3, BinaryString.fromString("addr5"))
+        .builder()
+    )
+    assertThat(flussRows.length).isEqualTo(5)
+    assertThat(flussRows).containsAll(expectRows.toIterable.asJava)
+  }
+
+  test("Fluss Write: upsert table") {
+    val tablePath = createTablePath(pkTableName)
+    createFlussTable(tablePath, pkTableDescriptor)
+    val table = loadFlussTable(tablePath)
+    val logScanner = table.newScan.createLogScanner
+    (0 until table.getTableInfo.getNumBuckets).foreach(i => 
logScanner.subscribeFromBeginning(i))
+
+    // insert data
+    sql(s"""
+           |INSERT INTO $DEFAULT_DATABASE.$pkTableName VALUES
+           |(600L, 21L, 601, "addr1"), (700L, 22L, 602, "addr2"),
+           |(800L, 23L, 603, "addr3"), (900L, 24L, 604, "addr4"),
+           |(1000L, 25L, 605, "addr5")
+           |""".stripMargin)
+
+    val flussRows1 = getRowsWithChangeType(table, Some(logScanner))
+    val expectRows1 = Array(
+      (
+        "+I",
+        GenericRowBuilder(4)
+          .setField(0, 600L)
+          .setField(1, 21L)
+          .setField(2, 601)
+          .setField(3, BinaryString.fromString("addr1"))
+          .builder()),
+      (
+        "+I",
+        GenericRowBuilder(4)
+          .setField(0, 700L)
+          .setField(1, 22L)
+          .setField(2, 602)
+          .setField(3, BinaryString.fromString("addr2"))
+          .builder()),
+      (
+        "+I",
+        GenericRowBuilder(4)
+          .setField(0, 800L)
+          .setField(1, 23L)
+          .setField(2, 603)
+          .setField(3, BinaryString.fromString("addr3"))
+          .builder()),
+      (
+        "+I",
+        GenericRowBuilder(4)
+          .setField(0, 900L)
+          .setField(1, 24L)
+          .setField(2, 604)
+          .setField(3, BinaryString.fromString("addr4"))
+          .builder()),
+      (
+        "+I",
+        GenericRowBuilder(4)
+          .setField(0, 1000L)
+          .setField(1, 25L)
+          .setField(2, 605)
+          .setField(3, BinaryString.fromString("addr5"))
+          .builder())
+    )
+    assertThat(flussRows1.length).isEqualTo(5)
+    assertThat(flussRows1).containsAll(expectRows1.toIterable.asJava)
+
+    // update data
+    sql(s"""
+           |INSERT INTO $DEFAULT_DATABASE.$pkTableName VALUES
+           |(800L, 230L, 603, "addr3"), (900L, 240L, 604, "addr4")
+           |""".stripMargin)
+
+    val flussRows2 = getRowsWithChangeType(table, Some(logScanner))
+    val expectRows2 = Array(
+      (
+        "-U",
+        GenericRowBuilder(4)
+          .setField(0, 800L)
+          .setField(1, 23L)
+          .setField(2, 603)
+          .setField(3, BinaryString.fromString("addr3"))
+          .builder()),
+      (
+        "-U",
+        GenericRowBuilder(4)
+          .setField(0, 900L)
+          .setField(1, 24L)
+          .setField(2, 604)
+          .setField(3, BinaryString.fromString("addr4"))
+          .builder()),
+      (
+        "+U",
+        GenericRowBuilder(4)
+          .setField(0, 800L)
+          .setField(1, 230L)
+          .setField(2, 603)
+          .setField(3, BinaryString.fromString("addr3"))
+          .builder()),
+      (
+        "+U",
+        GenericRowBuilder(4)
+          .setField(0, 900L)
+          .setField(1, 240L)
+          .setField(2, 604)
+          .setField(3, BinaryString.fromString("addr4"))
+          .builder())
+    )
+    assertThat(flussRows2.length).isEqualTo(4)
+    assertThat(flussRows2).containsAll(expectRows2.toIterable.asJava)

Review Comment:
   As we set the bucket number to `1`, so the changelog is in order globally, 
and we can assert `.containsExactlyElementsOf` here to also check the changelog 
order. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to