This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 29a071bf4 [spark] support batch read from fluss cluster (#2377)
29a071bf4 is described below

commit 29a071bf450434a9b5158732a8018309e0623e33
Author: Yann Byron <[email protected]>
AuthorDate: Thu Jan 22 00:38:48 2026 +0800

    [spark] support batch read from fluss cluster (#2377)
---
 .../java/org/apache/fluss/row/TimestampNtz.java    |   6 +
 .../org/apache/fluss/utils/InternalRowUtils.java   | 102 +++++
 .../org/apache/fluss/spark/SparkFlussConf.scala    |  30 ++
 .../scala/org/apache/fluss/spark/SparkTable.scala  |  24 +-
 .../fluss/spark/catalog/AbstractSparkTable.scala   |   6 +-
 .../spark/read/FlussAppendPartitionReader.scala    |  92 ++++
 .../org/apache/fluss/spark/read/FlussBatch.scala   | 177 ++++++++
 .../fluss/spark/read/FlussInputPartition.scala     |  59 +++
 .../fluss/spark/read/FlussPartitionReader.scala    |  71 +++
 .../spark/read/FlussPartitionReaderFactory.scala   |  63 +++
 .../org/apache/fluss/spark/read/FlussScan.scala    |  65 +++
 .../apache/fluss/spark/read/FlussScanBuilder.scala |  61 +++
 .../spark/read/FlussUpsertPartitionReader.scala    |  90 ++++
 .../org/apache/fluss/spark/row/DataConverter.scala |  86 ++++
 .../apache/fluss/spark/row/FlussAsSparkArray.scala | 121 ++++++
 .../apache/fluss/spark/row/FlussAsSparkRow.scala   | 114 +++++
 .../apache/fluss/spark/FlussSparkTestBase.scala    |   3 +
 .../org/apache/fluss/spark/SparkReadTest.scala     | 342 +++++++++++++++
 .../apache/fluss/spark/row/DataConverterTest.scala | 297 +++++++++++++
 .../fluss/spark/row/FlussAsSparkArrayTest.scala    | 428 ++++++++++++++++++
 .../fluss/spark/row/FlussAsSparkRowTest.scala      | 484 +++++++++++++++++++++
 21 files changed, 2718 insertions(+), 3 deletions(-)

diff --git a/fluss-common/src/main/java/org/apache/fluss/row/TimestampNtz.java 
b/fluss-common/src/main/java/org/apache/fluss/row/TimestampNtz.java
index 56febfd89..c58141da9 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/TimestampNtz.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/TimestampNtz.java
@@ -129,6 +129,12 @@ public class TimestampNtz implements 
Comparable<TimestampNtz>, Serializable {
         return new TimestampNtz(millisecond, nanoOfMillisecond);
     }
 
+    /** Converts this {@link TimestampNtz} object to micros. */
+    public long toEpochMicros() {
+        long micros = Math.multiplyExact(millisecond, MICROS_PER_MILLIS);
+        return micros + nanoOfMillisecond / NANOS_PER_MICROS;
+    }
+
     /** Converts this {@link TimestampNtz} object to a {@link LocalDateTime}. 
*/
     public LocalDateTime toLocalDateTime() {
         int date = (int) (millisecond / MILLIS_PER_DAY);
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java
index 438e17e9f..bfa2a299b 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java
@@ -22,15 +22,117 @@ package org.apache.fluss.utils;
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
  * additional information regarding copyright ownership. */
 
+import org.apache.fluss.row.BinaryMap;
+import org.apache.fluss.row.BinaryRow;
 import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.GenericMap;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.ArrayType;
+import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.DataTypeRoot;
+import org.apache.fluss.types.MapType;
+import org.apache.fluss.types.RowType;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /** Utility class for {@link org.apache.fluss.row.InternalRow} related 
operations. */
 public class InternalRowUtils {
 
+    public static InternalRow copyRow(InternalRow row, RowType rowType) {
+        if (row instanceof BinaryRow) {
+            return ((BinaryRow) row).copy();
+        } else {
+            InternalRow.FieldGetter[] fieldGetters = 
InternalRow.createFieldGetters(rowType);
+            GenericRow genericRow = new GenericRow(row.getFieldCount());
+            for (int i = 0; i < row.getFieldCount(); i++) {
+                genericRow.setField(
+                        i, copyValue(fieldGetters[i].getFieldOrNull(row), 
rowType.getTypeAt(i)));
+            }
+            return genericRow;
+        }
+    }
+
+    public static InternalArray copyArray(InternalArray from, DataType 
eleType) {
+        if (!eleType.isNullable()) {
+            switch (eleType.getTypeRoot()) {
+                case BOOLEAN:
+                    return new GenericArray(from.toBooleanArray());
+                case TINYINT:
+                    return new GenericArray(from.toByteArray());
+                case SMALLINT:
+                    return new GenericArray(from.toShortArray());
+                case INTEGER:
+                case DATE:
+                case TIME_WITHOUT_TIME_ZONE:
+                    return new GenericArray(from.toIntArray());
+                case BIGINT:
+                    return new GenericArray(from.toLongArray());
+                case FLOAT:
+                    return new GenericArray(from.toFloatArray());
+                case DOUBLE:
+                    return new GenericArray(from.toDoubleArray());
+            }
+        }
+
+        InternalArray.ElementGetter elementGetter = 
InternalArray.createElementGetter(eleType);
+        Object[] newArray = new Object[from.size()];
+        for (int i = 0; i < newArray.length; ++i) {
+            if (!from.isNullAt(i)) {
+                newArray[i] = copyValue(elementGetter.getElementOrNull(from, 
i), eleType);
+            } else {
+                newArray[i] = null;
+            }
+        }
+        return new GenericArray(newArray);
+    }
+
+    private static InternalMap copyMap(InternalMap map, DataType keyType, 
DataType valueType) {
+        if (map instanceof BinaryMap) {
+            return ((BinaryMap) map).copy();
+        }
+        InternalArray.ElementGetter keyGetter = 
InternalArray.createElementGetter(keyType);
+        InternalArray.ElementGetter valueGetter = 
InternalArray.createElementGetter(valueType);
+        Map<Object, Object> newMap = new HashMap<>();
+        InternalArray keys = map.keyArray();
+        InternalArray values = map.valueArray();
+        for (int i = 0; i < keys.size(); i++) {
+            newMap.put(
+                    copyValue(keyGetter.getElementOrNull(keys, i), keyType),
+                    copyValue(valueGetter.getElementOrNull(values, i), 
valueType));
+        }
+        return new GenericMap(newMap);
+    }
+
+    private static Object copyValue(Object o, DataType type) {
+        if (o instanceof BinaryString) {
+            return ((BinaryString) o).copy();
+        } else if (o instanceof InternalRow) {
+            return copyRow((InternalRow) o, (RowType) type);
+        } else if (o instanceof InternalArray) {
+            return copyArray((InternalArray) o, ((ArrayType) 
type).getElementType());
+        } else if (o instanceof InternalMap) {
+            return copyMap(
+                    (InternalMap) o,
+                    ((MapType) type).getKeyType(),
+                    ((MapType) type).getValueType());
+        } else if (o instanceof byte[]) {
+            byte[] copy = new byte[((byte[]) o).length];
+            System.arraycopy(((byte[]) o), 0, copy, 0, ((byte[]) o).length);
+            return copy;
+        } else if (o instanceof Decimal) {
+            return ((Decimal) o).copy();
+        }
+        return o;
+    }
+
     /**
      * Compares two objects based on their data type.
      *
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
new file mode 100644
index 000000000..5148e1e1c
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.spark.sql.internal.SQLConf.buildConf
+
+object SparkFlussConf {
+
+  val READ_OPTIMIZED = buildConf("spark.sql.fluss.readOptimized")
+    .internal()
+    .doc("If true, Spark will only read data from data lake snapshot or kv 
snapshot, not execute merge them with log changes. This is a temporary 
configuration that will be deprecated when read-optimized table(e.g. 
`mytbl$ro`) is supported.")
+    .booleanConf
+    .createWithDefault(false)
+
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
index 93c51d1e0..028af2815 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
@@ -21,10 +21,15 @@ import org.apache.fluss.client.admin.Admin
 import org.apache.fluss.config.{Configuration => FlussConfiguration}
 import org.apache.fluss.metadata.{TableInfo, TablePath}
 import org.apache.fluss.spark.catalog.{AbstractSparkTable, 
SupportsFlussPartitionManagement}
+import org.apache.fluss.spark.read.{FlussAppendScanBuilder, 
FlussUpsertScanBuilder}
 import org.apache.fluss.spark.write.{FlussAppendWriteBuilder, 
FlussUpsertWriteBuilder}
 
-import org.apache.spark.sql.connector.catalog.SupportsWrite
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite}
+import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class SparkTable(
     tablePath: TablePath,
@@ -33,7 +38,9 @@ class SparkTable(
     admin: Admin)
   extends AbstractSparkTable(admin, tableInfo)
   with SupportsFlussPartitionManagement
-  with SupportsWrite {
+  with SupportsRead
+  with SupportsWrite
+  with SQLConfHelper {
 
   override def newWriteBuilder(logicalWriteInfo: LogicalWriteInfo): 
WriteBuilder = {
     if (tableInfo.getPrimaryKeys.isEmpty) {
@@ -42,4 +49,17 @@ class SparkTable(
       new FlussUpsertWriteBuilder(tablePath, logicalWriteInfo.schema(), 
flussConfig)
     }
   }
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
+    if (tableInfo.getPrimaryKeys.isEmpty) {
+      new FlussAppendScanBuilder(tablePath, tableInfo, options, flussConfig)
+    } else {
+      if (!conf.getConf(SparkFlussConf.READ_OPTIMIZED, false)) {
+        throw new UnsupportedOperationException(
+          "For now, only data in snapshot can be read, without merging them 
with changes. " +
+            "If you can accept it, please set `spark.sql.fluss.readOptimized` 
true, and execute query again.")
+      }
+      new FlussUpsertScanBuilder(tablePath, tableInfo, options, flussConfig)
+    }
+  }
 }
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
index a1f3e09d4..e64f42a51 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
@@ -43,7 +43,11 @@ abstract class AbstractSparkTable(val admin: Admin, val 
tableInfo: TableInfo) ex
   override def schema(): StructType = _schema
 
   override def capabilities(): util.Set[TableCapability] = {
-    Set(TableCapability.BATCH_WRITE, TableCapability.STREAMING_WRITE).asJava
+    Set(
+      TableCapability.BATCH_READ,
+      TableCapability.BATCH_WRITE,
+      TableCapability.STREAMING_WRITE
+    ).asJava
   }
 
   override def partitioning(): Array[Transform] = {
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
new file mode 100644
index 000000000..075e0a275
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.table.scanner.ScanRecord
+import org.apache.fluss.client.table.scanner.log.ScanRecords
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.{TableBucket, TablePath}
+
+/** Partition reader that reads log data from a single Fluss table bucket. */
+class FlussAppendPartitionReader(
+    tablePath: TablePath,
+    projection: Array[Int],
+    flussPartition: FlussAppendInputPartition,
+    flussConfig: Configuration)
+  extends FlussPartitionReader(tablePath, flussConfig) {
+
+  private val tableBucket: TableBucket = flussPartition.tableBucket
+  private val partitionId = tableBucket.getPartitionId
+  private val bucketId = tableBucket.getBucket
+  private val logScanner = 
table.newScan().project(projection).createLogScanner()
+
+  // Iterator for current batch of records
+  private var currentRecords: java.util.Iterator[ScanRecord] = _
+
+  // initialize log scanner
+  initialize()
+
+  override def next(): Boolean = {
+    if (closed) {
+      return false
+    }
+
+    // If we have records in current batch, return next one
+    if (currentRecords != null && currentRecords.hasNext) {
+      val scanRecord = currentRecords.next()
+      currentRow = convertToSparkRow(scanRecord)
+      return true
+    }
+
+    // Poll for more records
+    val scanRecords = logScanner.poll(POLL_TIMEOUT)
+
+    if (scanRecords == null || scanRecords.isEmpty) {
+      return false
+    }
+
+    // Get records for our bucket
+    val bucketRecords = scanRecords.records(tableBucket)
+    if (bucketRecords.isEmpty) {
+      return false
+    }
+
+    currentRecords = bucketRecords.iterator()
+    if (currentRecords.hasNext) {
+      val scanRecord = currentRecords.next()
+      currentRow = convertToSparkRow(scanRecord)
+      true
+    } else {
+      false
+    }
+  }
+
+  override def close0(): Unit = {
+    if (logScanner != null) {
+      logScanner.close()
+    }
+  }
+
+  private def initialize(): Unit = {
+    if (partitionId != null) {
+      logScanner.subscribeFromBeginning(partitionId, bucketId)
+    } else {
+      logScanner.subscribeFromBeginning(bucketId)
+    }
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala
new file mode 100644
index 000000000..1bcffa6fd
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.{Connection, ConnectionFactory}
+import org.apache.fluss.client.admin.Admin
+import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, 
OffsetsInitializer}
+import org.apache.fluss.client.metadata.KvSnapshots
+import org.apache.fluss.client.table.scanner.log.LogScanner
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.{PartitionInfo, TableBucket, TableInfo, 
TablePath}
+
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, 
PartitionReaderFactory}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+abstract class FlussBatch(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    readSchema: StructType,
+    flussConfig: Configuration)
+  extends Batch
+  with AutoCloseable {
+
+  lazy val conn: Connection = ConnectionFactory.createConnection(flussConfig)
+
+  lazy val admin: Admin = conn.getAdmin
+
+  lazy val partitionInfos: util.List[PartitionInfo] = 
admin.listPartitionInfos(tablePath).get()
+
+  protected def projection: Array[Int] = {
+    val columnNameToIndex = 
tableInfo.getSchema.getColumnNames.asScala.zipWithIndex.toMap
+    readSchema.fields.map {
+      field =>
+        columnNameToIndex.getOrElse(
+          field.name,
+          throw new IllegalArgumentException(s"Invalid field name: 
${field.name}"))
+    }
+  }
+
+  override def close(): Unit = {
+    if (admin != null) {
+      admin.close()
+    }
+    if (conn != null) {
+      conn.close()
+    }
+  }
+}
+
+/** Batch for reading log table (append-only table). */
+class FlussAppendBatch(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    readSchema: StructType,
+    options: CaseInsensitiveStringMap,
+    flussConfig: Configuration)
+  extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
+
+  override def planInputPartitions(): Array[InputPartition] = {
+    def createPartitions(partitionId: Option[Long]): Array[InputPartition] = {
+      (0 until tableInfo.getNumBuckets).map {
+        bucketId =>
+          val tableBucket = partitionId match {
+            case Some(partitionId) =>
+              new TableBucket(tableInfo.getTableId, partitionId, bucketId)
+            case None =>
+              new TableBucket(tableInfo.getTableId, bucketId)
+          }
+          FlussAppendInputPartition(tableBucket).asInstanceOf[InputPartition]
+      }.toArray
+    }
+
+    if (tableInfo.isPartitioned) {
+      partitionInfos.asScala.flatMap {
+        partitionInfo => createPartitions(Some(partitionInfo.getPartitionId))
+      }.toArray
+    } else {
+      createPartitions(None)
+    }
+  }
+
+  override def createReaderFactory(): PartitionReaderFactory = {
+    new FlussAppendPartitionReaderFactory(tablePath, projection, options, 
flussConfig)
+  }
+
+}
+
+/** Batch for reading primary key table (upsert table). */
+class FlussUpsertBatch(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    readSchema: StructType,
+    options: CaseInsensitiveStringMap,
+    flussConfig: Configuration)
+  extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
+
+  private val latestOffsetsInitializer = OffsetsInitializer.latest()
+  private val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, 
tablePath)
+
+  override def planInputPartitions(): Array[InputPartition] = {
+    def createPartitions(partitionName: String, kvSnapshots: KvSnapshots): 
Array[InputPartition] = {
+      val tableId = kvSnapshots.getTableId
+      val partitionId = kvSnapshots.getPartitionId
+      val bucketIds = kvSnapshots.getBucketIds
+      val bucketIdToLogOffset =
+        latestOffsetsInitializer.getBucketOffsets(partitionName, bucketIds, 
bucketOffsetsRetriever)
+      bucketIds.asScala
+        .map {
+          bucketId =>
+            val tableBucket = new TableBucket(tableId, partitionId, bucketId)
+            val snapshotIdOpt = kvSnapshots.getSnapshotId(bucketId)
+            val logStartingOffsetOpt = kvSnapshots.getLogOffset(bucketId)
+            val logEndingOffset = bucketIdToLogOffset.get(bucketId)
+
+            if (snapshotIdOpt.isPresent) {
+              assert(
+                logStartingOffsetOpt.isPresent,
+                "Log offset must be present when snapshot id is present")
+
+              // Create hybrid partition
+              FlussUpsertInputPartition(
+                tableBucket,
+                snapshotIdOpt.getAsLong,
+                logStartingOffsetOpt.getAsLong,
+                logEndingOffset
+              )
+            } else {
+              // No snapshot yet, only read log from beginning
+              FlussUpsertInputPartition(
+                tableBucket,
+                -1L,
+                LogScanner.EARLIEST_OFFSET,
+                logEndingOffset)
+            }
+        }
+        .map(_.asInstanceOf[InputPartition])
+        .toArray
+    }
+
+    if (tableInfo.isPartitioned) {
+      partitionInfos.asScala.flatMap {
+        partitionInfo =>
+          val partitionName = partitionInfo.getPartitionName
+          val kvSnapshots =
+            admin.getLatestKvSnapshots(tablePath, partitionName).get()
+          createPartitions(partitionName, kvSnapshots)
+      }.toArray
+    } else {
+      val kvSnapshots = admin.getLatestKvSnapshots(tablePath).get()
+      createPartitions(null, kvSnapshots)
+    }
+  }
+
+  override def createReaderFactory(): PartitionReaderFactory = {
+    new FlussUpsertPartitionReaderFactory(tablePath, projection, options, 
flussConfig)
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
new file mode 100644
index 000000000..75bb056ce
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.metadata.TableBucket
+
+import org.apache.spark.sql.connector.read.InputPartition
+
+trait FlussInputPartition extends InputPartition {
+
+  override def preferredLocations(): Array[String] = {
+    // Could return tablet server locations for data locality
+    Array.empty[String]
+  }
+
+}
+
+/**
+ * Represents an input partition for reading data from a Fluss table bucket.
+ *
+ * @param tableBucket
+ *   the table bucket to read from
+ */
+case class FlussAppendInputPartition(tableBucket: TableBucket) extends 
FlussInputPartition
+
+/**
+ * Represents an input partition for reading data from a primary key table 
bucket. This partition
+ * includes snapshot information for hybrid snapshot-log reading.
+ *
+ * @param tableBucket
+ *   the table bucket to read from
+ * @param snapshotId
+ *   the snapshot ID to read from, -1 if no snapshot
+ * @param logStartingOffset
+ *   the log offset where incremental reading should start
+ * @param logStoppingOffset
+ *   the log offset where incremental reading should end
+ */
+case class FlussUpsertInputPartition(
+    tableBucket: TableBucket,
+    snapshotId: Long,
+    logStartingOffset: Long,
+    logStoppingOffset: Long)
+  extends FlussInputPartition
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala
new file mode 100644
index 000000000..10203898f
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.{Connection, ConnectionFactory}
+import org.apache.fluss.client.table.Table
+import org.apache.fluss.client.table.scanner.ScanRecord
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.{TableInfo, TablePath}
+import org.apache.fluss.row.{InternalRow => FlussInternalRow}
+import org.apache.fluss.spark.row.DataConverter
+import org.apache.fluss.types.RowType
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.read.PartitionReader
+
+import java.time.Duration
+
+abstract class FlussPartitionReader(tablePath: TablePath, flussConfig: 
Configuration)
+  extends PartitionReader[InternalRow] {
+
+  protected val POLL_TIMEOUT: Duration = Duration.ofMillis(100)
+  protected lazy val conn: Connection = 
ConnectionFactory.createConnection(flussConfig)
+  protected lazy val table: Table = conn.getTable(tablePath)
+  protected lazy val tableInfo: TableInfo = table.getTableInfo
+  protected val rowType: RowType = tableInfo.getRowType
+
+  protected var currentRow: InternalRow = _
+  protected var closed = false
+
+  override def get(): InternalRow = currentRow
+
+  def close0(): Unit
+
+  override def close(): Unit = {
+    if (!closed) {
+      closed = true
+      close0()
+
+      if (table != null) {
+        table.close()
+      }
+      if (conn != null) {
+        conn.close()
+      }
+    }
+  }
+
+  protected def convertToSparkRow(scanRecord: ScanRecord): InternalRow = {
+    convertToSparkRow(scanRecord.getRow)
+  }
+
+  protected def convertToSparkRow(flussRow: FlussInternalRow): InternalRow = {
+    DataConverter.toSparkInternalRow(flussRow, rowType)
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReaderFactory.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReaderFactory.scala
new file mode 100644
index 000000000..13d374d38
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReaderFactory.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.TablePath
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/** Factory for creating partition readers to read data from Fluss. */
+class FlussAppendPartitionReaderFactory(
+    tablePath: TablePath,
+    projection: Array[Int],
+    options: CaseInsensitiveStringMap,
+    flussConfig: Configuration)
+  extends PartitionReaderFactory {
+
+  override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
+    val flussPartition = partition.asInstanceOf[FlussAppendInputPartition]
+    new FlussAppendPartitionReader(
+      tablePath,
+      projection,
+      flussPartition,
+      flussConfig
+    )
+  }
+}
+
+/** Factory for creating partition readers to read primary key table data from 
Fluss. */
+class FlussUpsertPartitionReaderFactory(
+    tablePath: TablePath,
+    projection: Array[Int],
+    options: CaseInsensitiveStringMap,
+    flussConfig: Configuration)
+  extends PartitionReaderFactory {
+
+  override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
+    val upsertPartition = partition.asInstanceOf[FlussUpsertInputPartition]
+    new FlussUpsertPartitionReader(
+      tablePath,
+      projection,
+      upsertPartition,
+      flussConfig
+    )
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
new file mode 100644
index 000000000..77ad081ff
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.{TableInfo, TablePath}
+import org.apache.fluss.spark.SparkConversions
+
+import org.apache.spark.sql.connector.read.{Batch, Scan}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/** An interface that extends from Spark [[Scan]]. */
+trait FlussScan extends Scan {
+  def tableInfo: TableInfo
+
+  def requiredSchema: Option[StructType]
+
+  override def readSchema(): StructType = {
+    
requiredSchema.getOrElse(SparkConversions.toSparkDataType(tableInfo.getRowType))
+  }
+}
+
+/** Fluss Append Scan. */
+case class FlussAppendScan(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    requiredSchema: Option[StructType],
+    options: CaseInsensitiveStringMap,
+    flussConfig: Configuration)
+  extends FlussScan {
+
+  override def toBatch: Batch = {
+    new FlussAppendBatch(tablePath, tableInfo, readSchema, options, 
flussConfig)
+  }
+}
+
+/** Fluss Upsert Scan. */
+case class FlussUpsertScan(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    requiredSchema: Option[StructType],
+    options: CaseInsensitiveStringMap,
+    flussConfig: Configuration)
+  extends FlussScan {
+
+  override def toBatch: Batch = {
+    new FlussUpsertBatch(tablePath, tableInfo, readSchema, options, 
flussConfig)
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
new file mode 100644
index 000000000..cd4d17ec0
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.config.{Configuration => FlussConfiguration}
+import org.apache.fluss.metadata.{TableInfo, TablePath}
+
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, 
SupportsPushDownRequiredColumns}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/** An interface that extends from Spark [[ScanBuilder]]. */
+trait FlussScanBuilder extends ScanBuilder with 
SupportsPushDownRequiredColumns {
+
+  protected var requiredSchema: Option[StructType] = _
+
+  override def pruneColumns(requiredSchema: StructType): Unit = {
+    this.requiredSchema = Some(requiredSchema)
+  }
+}
+
+/** Fluss Append Scan Builder. */
+class FlussAppendScanBuilder(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    options: CaseInsensitiveStringMap,
+    flussConfig: FlussConfiguration)
+  extends FlussScanBuilder {
+
+  override def build(): Scan = {
+    FlussAppendScan(tablePath, tableInfo, requiredSchema, options, flussConfig)
+  }
+}
+
+/** Fluss Upsert Scan Builder. */
+class FlussUpsertScanBuilder(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    options: CaseInsensitiveStringMap,
+    flussConfig: FlussConfiguration)
+  extends FlussScanBuilder {
+
+  override def build(): Scan = {
+    FlussUpsertScan(tablePath, tableInfo, requiredSchema, options, flussConfig)
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
new file mode 100644
index 000000000..52cd46652
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.table.scanner.batch.BatchScanner
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.{TableBucket, TablePath}
+
+import java.util
+
+/**
+ * Partition reader that reads primary key table data.
+ *
+ * For now, only data in snapshot can be read, without merging them with 
changes.
+ */
+class FlussUpsertPartitionReader(
+    tablePath: TablePath,
+    projection: Array[Int],
+    flussPartition: FlussUpsertInputPartition,
+    flussConfig: Configuration)
+  extends FlussPartitionReader(tablePath, flussConfig) {
+
+  private val tableBucket: TableBucket = flussPartition.tableBucket
+  private val snapshotId: Long = flussPartition.snapshotId
+
+  // KV Snapshot Reader (if snapshot exists)
+  private var snapshotScanner: BatchScanner = _
+  private var snapshotIterator: 
util.Iterator[org.apache.fluss.row.InternalRow] = _
+
+  // initialize scanners
+  initialize()
+
+  override def next(): Boolean = {
+    if (closed) {
+      return false
+    }
+
+    if (snapshotIterator == null || !snapshotIterator.hasNext) {
+      // Try to get next batch from snapshot scanner
+      val batch = snapshotScanner.pollBatch(POLL_TIMEOUT)
+      if (batch == null) {
+        // No more data fetched.
+        false
+      } else {
+        snapshotIterator = batch
+        if (snapshotIterator.hasNext) {
+          currentRow = convertToSparkRow(snapshotIterator.next())
+          true
+        } else {
+          // Poll a new batch
+          next()
+        }
+      }
+    } else {
+      // Get data from current snapshot batch
+      currentRow = convertToSparkRow(snapshotIterator.next())
+      true
+    }
+  }
+
+  private def initialize(): Unit = {
+    // Initialize Scanners
+    if (snapshotId >= 0) {
+      // Create batch scanner
+      snapshotScanner =
+        table.newScan().project(projection).createBatchScanner(tableBucket, 
snapshotId)
+    }
+  }
+
+  override def close0(): Unit = {
+    if (snapshotScanner != null) {
+      snapshotScanner.close()
+    }
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/DataConverter.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/DataConverter.scala
new file mode 100644
index 000000000..cb9b30900
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/DataConverter.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.row
+
+import org.apache.fluss.row.{BinaryString => FlussBinaryString, Decimal => 
FlussDecimal, InternalArray => FlussInternalArray, InternalMap => 
FlussInternalMap, InternalRow => FlussInternalRow, TimestampLtz => 
FlussTimestampLtz, TimestampNtz => FlussTimestampNtz}
+import org.apache.fluss.types.{ArrayType => FlussArrayType, DataType => 
FlussDataType, MapType => FlussMapType, RowType}
+import org.apache.fluss.types.DataTypeRoot._
+import org.apache.fluss.utils.InternalRowUtils
+
+import org.apache.spark.sql.catalyst.{InternalRow => SparkInteralRow}
+import org.apache.spark.sql.catalyst.util.{ArrayData => SparkArrayData, 
MapData => SparkMapData}
+import org.apache.spark.sql.types.{Decimal => SparkDecimal}
+import org.apache.spark.unsafe.types.UTF8String
+
+object DataConverter {
+
+  def toSparkObject(o: Object, dataType: FlussDataType): Any = {
+    if (o == null) {
+      return null
+    }
+
+    dataType.getTypeRoot match {
+      case TIMESTAMP_WITHOUT_TIME_ZONE =>
+        toSparkTimestamp(o.asInstanceOf[FlussTimestampNtz])
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
+        toSparkTimestamp(o.asInstanceOf[FlussTimestampLtz])
+      case CHAR =>
+        toSparkUTF8String(o.asInstanceOf[FlussBinaryString])
+      case DECIMAL =>
+        toSparkDecimal(o.asInstanceOf[FlussDecimal])
+      case ARRAY =>
+        toSparkArray(o.asInstanceOf[FlussInternalArray], 
dataType.asInstanceOf[FlussArrayType])
+      case MAP =>
+        toSparkMap(o.asInstanceOf[FlussInternalMap], 
dataType.asInstanceOf[FlussMapType])
+      case ROW =>
+        toSparkInternalRow(o.asInstanceOf[FlussInternalRow], 
dataType.asInstanceOf[RowType])
+      case _ => o
+    }
+  }
+
+  def toSparkTimestamp(timestamp: FlussTimestampNtz): Long = {
+    timestamp.toEpochMicros
+  }
+
+  def toSparkTimestamp(timestamp: FlussTimestampLtz): Long = {
+    timestamp.toEpochMicros
+  }
+
+  def toSparkDecimal(flussDecimal: FlussDecimal): SparkDecimal = {
+    SparkDecimal(flussDecimal.toBigDecimal)
+  }
+
+  def toSparkUTF8String(flussBinaryString: FlussBinaryString): UTF8String = {
+    UTF8String.fromBytes(flussBinaryString.toBytes)
+  }
+
+  def toSparkArray(flussArray: FlussInternalArray, arrayType: FlussArrayType): 
SparkArrayData = {
+    val elementType = arrayType.getElementType
+    new FlussAsSparkArray(elementType)
+      .replace(InternalRowUtils.copyArray(flussArray, elementType))
+  }
+
+  def toSparkMap(flussMap: FlussInternalMap, mapType: FlussMapType): 
SparkMapData = {
+    // TODO: support map type in fluss-spark
+    throw new UnsupportedOperationException()
+  }
+
+  def toSparkInternalRow(flussRow: FlussInternalRow, rowType: RowType): 
SparkInteralRow = {
+    new FlussAsSparkRow(rowType).replace(flussRow)
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkArray.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkArray.scala
new file mode 100644
index 000000000..c67ae1e7a
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkArray.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.row
+
+import org.apache.fluss.row.{InternalArray => FlussInternalArray}
+import org.apache.fluss.types.{ArrayType => FlussArrayType, BigIntType => 
FlussBigIntType, BinaryType => FlussBinaryType, DataType => FlussDataType, 
LocalZonedTimestampType => FlussTimestampLTZType, MapType => FlussMapType, 
RowType, TimestampType => FlussTimestampNTZType}
+import org.apache.fluss.utils.InternalRowUtils
+
+import org.apache.spark.sql.catalyst.{InternalRow => SparkInternalRow}
+import org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader
+import org.apache.spark.sql.catalyst.util.{ArrayData => SparkArrayData, 
MapData => SparkMapData}
+import org.apache.spark.sql.types.{DataType => SparkDataType, Decimal => 
SparkDecimal}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+class FlussAsSparkArray(elementType: FlussDataType) extends SparkArrayData {
+
+  var flussArray: FlussInternalArray = _
+
+  def replace(array: FlussInternalArray): SparkArrayData = {
+    this.flussArray = array
+    this
+  }
+
+  override def numElements(): Int = flussArray.size()
+
+  override def copy(): SparkArrayData = {
+    new 
FlussAsSparkArray(elementType).replace(InternalRowUtils.copyArray(flussArray, 
elementType))
+  }
+
+  override def array: Array[Any] = {
+    val elementGetter = FlussInternalArray.createElementGetter(elementType);
+    Array.range(0, numElements()).map {
+      ordinal =>
+        DataConverter.toSparkObject(
+          elementGetter.getElementOrNull(flussArray, ordinal),
+          elementType)
+    }
+  }
+
+  override def setNullAt(ordinal: Int): Unit = throw new 
UnsupportedOperationException()
+
+  override def update(ordinal: Int, value: Any): Unit = throw new 
UnsupportedOperationException()
+
+  override def isNullAt(ordinal: Int): Boolean = flussArray.isNullAt(ordinal)
+
+  override def getBoolean(ordinal: Int): Boolean = 
flussArray.getBoolean(ordinal)
+
+  override def getByte(ordinal: Int): Byte = flussArray.getByte(ordinal)
+
+  override def getShort(ordinal: Int): Short = flussArray.getShort(ordinal)
+
+  override def getInt(ordinal: Int): Int = flussArray.getInt(ordinal)
+
+  override def getLong(ordinal: Int): Long = {
+    elementType match {
+      case _: FlussBigIntType =>
+        flussArray.getLong(ordinal)
+      case ntz: FlussTimestampNTZType =>
+        flussArray.getTimestampNtz(ordinal, ntz.getPrecision).toEpochMicros
+      case ltz: FlussTimestampLTZType =>
+        flussArray.getTimestampLtz(ordinal, ltz.getPrecision).toEpochMicros
+      case _ =>
+        throw new UnsupportedOperationException("Unsupported type: " + 
elementType)
+    }
+  }
+
+  override def getFloat(ordinal: Int): Float = flussArray.getFloat(ordinal)
+
+  override def getDouble(ordinal: Int): Double = flussArray.getDouble(ordinal)
+
+  override def getDecimal(ordinal: Int, precision: Int, scale: Int): 
SparkDecimal = {
+    DataConverter.toSparkDecimal(flussArray.getDecimal(ordinal, precision, 
scale))
+  }
+
+  override def getUTF8String(ordinal: Int): UTF8String = {
+    DataConverter.toSparkUTF8String(flussArray.getString(ordinal))
+  }
+
+  override def getBinary(ordinal: Int): Array[Byte] = {
+    val binaryType = elementType.asInstanceOf[FlussBinaryType]
+    flussArray.getBinary(ordinal, binaryType.getLength)
+  }
+
+  override def getInterval(ordinal: Int): CalendarInterval =
+    throw new UnsupportedOperationException()
+
+  override def getStruct(ordinal: Int, numFields: Int): SparkInternalRow = {
+    DataConverter.toSparkInternalRow(
+      flussArray.getRow(ordinal, numFields),
+      elementType.asInstanceOf[RowType])
+  }
+
+  override def getArray(ordinal: Int): SparkArrayData = {
+    DataConverter.toSparkArray(
+      flussArray.getArray(ordinal),
+      elementType.asInstanceOf[FlussArrayType])
+  }
+
+  override def getMap(ordinal: Int): SparkMapData = {
+    DataConverter.toSparkMap(flussArray.getMap(ordinal), 
elementType.asInstanceOf[FlussMapType])
+  }
+
+  override def get(ordinal: Int, dataType: SparkDataType): AnyRef = {
+    SpecializedGettersReader.read(this, ordinal, dataType, true, true)
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkRow.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkRow.scala
new file mode 100644
index 000000000..175900cd1
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkRow.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.row
+
+import org.apache.fluss.row.{InternalRow => FlussInternalRow}
+import org.apache.fluss.types.{ArrayType => FlussArrayType, BinaryType => 
FlussBinaryType, LocalZonedTimestampType, RowType, TimestampType}
+import org.apache.fluss.utils.InternalRowUtils
+
+import org.apache.spark.sql.catalyst.{InternalRow => SparkInteralRow}
+import org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader
+import org.apache.spark.sql.catalyst.util.{ArrayData => SparkArrayData, 
MapData => SparkMapData}
+import org.apache.spark.sql.types.{DataType => SparkDataType, Decimal => 
SparkDecimal}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+class FlussAsSparkRow(rowType: RowType) extends SparkInteralRow {
+
+  val fieldCount: Int = rowType.getFieldCount
+
+  var row: FlussInternalRow = _
+
+  def replace(row: FlussInternalRow): FlussAsSparkRow = {
+    this.row = row
+    this
+  }
+
+  override def numFields: Int = fieldCount
+
+  override def setNullAt(ordinal: Int): Unit = throw new 
UnsupportedOperationException()
+
+  override def update(ordinal: Int, value: Any): Unit = throw new 
UnsupportedOperationException()
+
+  override def copy(): SparkInteralRow = {
+    new FlussAsSparkRow(rowType).replace(InternalRowUtils.copyRow(row, 
rowType))
+  }
+
+  override def isNullAt(ordinal: Int): Boolean = row.isNullAt(ordinal)
+
+  override def getBoolean(ordinal: Int): Boolean = row.getBoolean(ordinal)
+
+  override def getByte(ordinal: Int): Byte = row.getByte(ordinal)
+
+  override def getShort(ordinal: Int): Short = row.getShort(ordinal)
+
+  override def getInt(ordinal: Int): Int = row.getInt(ordinal)
+
+  override def getLong(ordinal: Int): Long = {
+    val flussType = rowType.getTypeAt(ordinal)
+    flussType match {
+      case ltz: LocalZonedTimestampType =>
+        row.getTimestampLtz(ordinal, ltz.getPrecision).toEpochMicros
+      case ntz: TimestampType =>
+        row.getTimestampNtz(ordinal, ntz.getPrecision).toEpochMicros
+      case _ =>
+        row.getLong(ordinal)
+    }
+  }
+
+  override def getFloat(ordinal: Int): Float = row.getFloat(ordinal)
+
+  override def getDouble(ordinal: Int): Double = row.getDouble(ordinal)
+
+  override def getDecimal(ordinal: Int, precision: Int, scale: Int): 
SparkDecimal = {
+    val flussDecimal = row.getDecimal(ordinal, precision, scale)
+    DataConverter.toSparkDecimal(flussDecimal)
+  }
+
+  override def getUTF8String(ordinal: Int): UTF8String = {
+    DataConverter.toSparkUTF8String(row.getString(ordinal))
+  }
+
+  override def getBinary(ordinal: Int): Array[Byte] = {
+    val binaryType = rowType.getTypeAt(ordinal).asInstanceOf[FlussBinaryType]
+    row.getBinary(ordinal, binaryType.getLength)
+  }
+
+  override def getInterval(ordinal: Int): CalendarInterval =
+    throw new UnsupportedOperationException()
+
+  override def getStruct(ordinal: Int, numFields: Int): SparkInteralRow = {
+    val subRowType = rowType.getTypeAt(ordinal).asInstanceOf[RowType]
+    val flussSubRow = row.getRow(ordinal, numFields)
+    DataConverter.toSparkInternalRow(flussSubRow, subRowType)
+  }
+
+  override def getArray(ordinal: Int): SparkArrayData = {
+    val arrayType = rowType.getTypeAt(ordinal).asInstanceOf[FlussArrayType]
+    val flussArray = row.getArray(ordinal)
+    DataConverter.toSparkArray(flussArray, arrayType)
+  }
+
+  override def getMap(ordinal: Int): SparkMapData = {
+    // TODO: support map type in fluss-spark
+    throw new UnsupportedOperationException()
+  }
+
+  override def get(ordinal: Int, dataType: SparkDataType): AnyRef = {
+    SpecializedGettersReader.read(this, ordinal, dataType, true, true)
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
index 2de158b7d..f9b98fe7c 100644
--- 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
@@ -50,6 +50,9 @@ class FlussSparkTestBase extends QueryTest with 
SharedSparkSession {
       .set(s"spark.sql.catalog.$DEFAULT_CATALOG", 
classOf[SparkCatalog].getName)
       .set(s"spark.sql.catalog.$DEFAULT_CATALOG.bootstrap.servers", 
bootstrapServers)
       .set("spark.sql.defaultCatalog", DEFAULT_CATALOG)
+      // Enable read optimized by default temporarily.
+      // TODO: remove this when https://github.com/apache/fluss/issues/2427 is 
done.
+      .set("spark.sql.fluss.readOptimized", "true")
   }
 
   override protected def beforeAll(): Unit = {
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkReadTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkReadTest.scala
new file mode 100644
index 000000000..834cec7ad
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkReadTest.scala
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.spark.sql.Row
+
+class SparkReadTest extends FlussSparkTestBase {
+
+  test("Spark Read: log table") {
+    withTable("t") {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT, 
amount INT, address STRING)
+             |""".stripMargin)
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t
+             |VALUES
+             |(600L, 21L, 601, "addr1"), (700L, 22L, 602, "addr2"),
+             |(800L, 23L, 603, "addr3"), (900L, 24L, 604, "addr4"),
+             |(1000L, 25L, 605, "addr5")
+             |""".stripMargin)
+
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+        Row(600L, 21L, 601, "addr1") ::
+          Row(700L, 22L, 602, "addr2") ::
+          Row(800L, 23L, 603, "addr3") ::
+          Row(900L, 24L, 604, "addr4") ::
+          Row(1000L, 25L, 605, "addr5") :: Nil
+      )
+
+      // projection
+      checkAnswer(
+        sql(s"SELECT address, itemId FROM $DEFAULT_DATABASE.t ORDER BY 
orderId"),
+        Row("addr1", 21L) ::
+          Row("addr2", 22L) ::
+          Row("addr3", 23L) ::
+          Row("addr4", 24L) ::
+          Row("addr5", 25L) :: Nil
+      )
+
+      // filter
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t WHERE amount % 2 = 0 ORDER BY 
orderId"),
+        Row(700L, 22L, 602, "addr2") ::
+          Row(900L, 24L, 604, "addr4") :: Nil
+      )
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(700L, 220L, 602, "addr2"),
+             |(900L, 240L, 604, "addr4"),
+             |(1100L, 260L, 606, "addr6")
+             |""".stripMargin)
+      // projection + filter
+      checkAnswer(
+        sql(s"""
+               |SELECT orderId, itemId FROM $DEFAULT_DATABASE.t
+               |WHERE orderId >= 900 ORDER BY orderId, itemId""".stripMargin),
+        Row(900L, 24L) ::
+          Row(900L, 240L) ::
+          Row(1000L, 25L) ::
+          Row(1100L, 260L) :: Nil
+      )
+    }
+  }
+
+  test("Spark Read: primary key table") {
+    withTable("t") {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT, 
amount INT, address STRING)
+             |TBLPROPERTIES("primary.key" = "orderId")
+             |""".stripMargin)
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(600L, 21L, 601, "addr1"), (700L, 22L, 602, "addr2"),
+             |(800L, 23L, 603, "addr3"), (900L, 24L, 604, "addr4"),
+             |(1000L, 25L, 605, "addr5")
+             |""".stripMargin)
+
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+        Row(600L, 21L, 601, "addr1") ::
+          Row(700L, 22L, 602, "addr2") ::
+          Row(800L, 23L, 603, "addr3") ::
+          Row(900L, 24L, 604, "addr4") ::
+          Row(1000L, 25L, 605, "addr5") :: Nil
+      )
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(700L, 220L, 602, "addr2"),
+             |(900L, 240L, 604, "addr4"),
+             |(1100L, 260L, 606, "addr6")
+             |""".stripMargin)
+
+      checkAnswer(
+        sql(s"""
+               |SELECT orderId, itemId, address FROM $DEFAULT_DATABASE.t
+               |WHERE amount <= 603 ORDER BY orderId""".stripMargin),
+        Row(600L, 21L, "addr1") ::
+          Row(700L, 220L, "addr2") ::
+          Row(800L, 23L, "addr3") ::
+          Nil
+      )
+    }
+  }
+
+  test("Spark Read: partitioned log table") {
+    withTable("t") {
+      sql(
+        s"""
+           |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT, 
amount INT, address STRING, dt STRING)
+           |PARTITIONED BY (dt)
+           |""".stripMargin
+      )
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(600L, 21L, 601, "addr1", "2026-01-01"), (700L, 22L, 602, 
"addr2", "2026-01-01"),
+             |(800L, 23L, 603, "addr3", "2026-01-02"), (900L, 24L, 604, 
"addr4", "2026-01-02"),
+             |(1000L, 25L, 605, "addr5", "2026-01-03")
+             |""".stripMargin)
+
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+        Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+          Row(700L, 22L, 602, "addr2", "2026-01-01") ::
+          Row(800L, 23L, 603, "addr3", "2026-01-02") ::
+          Row(900L, 24L, 604, "addr4", "2026-01-02") ::
+          Row(1000L, 25L, 605, "addr5", "2026-01-03") :: Nil
+      )
+
+      // Read with partition filter
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t WHERE dt = '2026-01-01' ORDER 
BY orderId"),
+        Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+          Row(700L, 22L, 602, "addr2", "2026-01-01") :: Nil
+      )
+
+      // Read with multiple partitions filter
+      checkAnswer(
+        sql(s"""
+               |SELECT orderId, address, dt FROM $DEFAULT_DATABASE.t
+               |WHERE dt IN ('2026-01-01', '2026-01-02')
+               |ORDER BY orderId""".stripMargin),
+        Row(600L, "addr1", "2026-01-01") ::
+          Row(700L, "addr2", "2026-01-01") ::
+          Row(800L, "addr3", "2026-01-02") ::
+          Row(900L, "addr4", "2026-01-02") :: Nil
+      )
+    }
+  }
+
+  test("Spark Read: partitioned primary key table") {
+    withTable("t") {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT, 
amount INT, address STRING, dt STRING)
+             |PARTITIONED BY (dt)
+             |TBLPROPERTIES("primary.key" = "orderId,dt")
+             |""".stripMargin)
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(600L, 21L, 601, "addr1", "2026-01-01"), (700L, 22L, 602, 
"addr2", "2026-01-01"),
+             |(800L, 23L, 603, "addr3", "2026-01-02"), (900L, 24L, 604, 
"addr4", "2026-01-02"),
+             |(1000L, 25L, 605, "addr5", "2026-01-03")
+             |""".stripMargin)
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(700L, 220L, 602, "addr2_updated", "2026-01-01"),
+             |(900L, 240L, 604, "addr4_updated", "2026-01-02"),
+             |(1100L, 260L, 606, "addr6", "2026-01-03")
+             |""".stripMargin)
+
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+        Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+          Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
+          Row(800L, 23L, 603, "addr3", "2026-01-02") ::
+          Row(900L, 240L, 604, "addr4_updated", "2026-01-02") ::
+          Row(1000L, 25L, 605, "addr5", "2026-01-03") ::
+          Row(1100L, 260L, 606, "addr6", "2026-01-03") ::
+          Nil
+      )
+
+      // Read with partition filter
+      checkAnswer(
+        sql(s"""
+               |SELECT * FROM $DEFAULT_DATABASE.t
+               |WHERE dt = '2026-01-01'
+               |ORDER BY orderId""".stripMargin),
+        Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+          Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
+          Nil
+      )
+
+      // Read with multiple partition filters
+      checkAnswer(
+        sql(
+          s"SELECT * FROM $DEFAULT_DATABASE.t WHERE dt IN ('2026-01-01', 
'2026-01-02') ORDER BY orderId"),
+        Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+          Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
+          Row(800L, 23L, 603, "addr3", "2026-01-02") ::
+          Row(900L, 240L, 604, "addr4_updated", "2026-01-02") ::
+          Nil
+      )
+    }
+  }
+
+  test("Spark: all data types") {
+    withTable("t") {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.t (
+             |id INT,
+             |flag BOOLEAN,
+             |small SHORT,
+             |value INT,
+             |big BIGINT,
+             |real FLOAT,
+             |amount DOUBLE,
+             |name STRING,
+             |decimal_val DECIMAL(10, 2),
+             |date_val DATE,
+             |timestamp_ntz_val TIMESTAMP,
+             |timestamp_ltz_val TIMESTAMP_LTZ
+             |)""".stripMargin)
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(1, true, 100s, 1000, 10000L, 12.34f, 56.78, "string_val",
+             | 123.45, DATE "2026-01-01", TIMESTAMP "2026-01-01 12:00:00", 
TIMESTAMP "2026-01-01 12:00:00"),
+             |(2, false, 200s, 2000, 20000L, 23.45f, 67.89, "another_str",
+             | 223.45, DATE "2026-01-02", TIMESTAMP "2026-01-02 12:00:00", 
TIMESTAMP "2026-01-02 12:00:00")
+             |""".stripMargin)
+
+      // Read all data types
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t"),
+        Row(
+          1,
+          true,
+          100.toShort,
+          1000,
+          10000L,
+          12.34f,
+          56.78,
+          "string_val",
+          java.math.BigDecimal.valueOf(123.45),
+          java.sql.Date.valueOf("2026-01-01"),
+          java.sql.Timestamp.valueOf("2026-01-01 12:00:00"),
+          java.sql.Timestamp.valueOf("2026-01-01 12:00:00")
+        ) :: Row(
+          2,
+          false,
+          200.toShort,
+          2000,
+          20000L,
+          23.45f,
+          67.89,
+          "another_str",
+          java.math.BigDecimal.valueOf(223.45),
+          java.sql.Date.valueOf("2026-01-02"),
+          java.sql.Timestamp.valueOf("2026-01-02 12:00:00"),
+          java.sql.Timestamp.valueOf("2026-01-02 12:00:00")
+        ) :: Nil
+      )
+
+      // Test projection on selected columns
+      checkAnswer(
+        sql(s"SELECT id, name, amount FROM $DEFAULT_DATABASE.t ORDER BY id"),
+        Row(1, "string_val", java.math.BigDecimal.valueOf(56.78)) ::
+          Row(2, "another_str", java.math.BigDecimal.valueOf(67.89)) :: Nil
+      )
+
+      // Filter by boolean field
+      checkAnswer(
+        sql(s"SELECT id, flag, name FROM $DEFAULT_DATABASE.t WHERE flag = true 
ORDER BY id"),
+        Row(1, true, "string_val") :: Nil
+      )
+
+      // Filter by numeric field
+      checkAnswer(
+        sql(s"SELECT id, value, name FROM $DEFAULT_DATABASE.t WHERE value > 
1500 ORDER BY id"),
+        Row(2, 2000, "another_str") :: Nil
+      )
+
+      // Filter by string field
+      checkAnswer(
+        sql(s"SELECT id, name FROM $DEFAULT_DATABASE.t WHERE name LIKE 
'%another%' ORDER BY id"),
+        Row(2, "another_str") :: Nil
+      )
+    }
+  }
+
+  test("Spark Read: nested data types table") {
+    withTable("t") {
+      // TODO: support map type
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.t (
+             |id INT,
+             |arr ARRAY<INT>,
+             |struct_col STRUCT<col1: INT, col2: STRING>
+             |)""".stripMargin)
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(1, ARRAY(1, 2, 3), STRUCT(100, 'nested_value')),
+             |(2, ARRAY(7, 8, 9), STRUCT(200, 'nested_value2'))
+             |""".stripMargin)
+
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY id"),
+        Row(
+          1,
+          Seq(1, 2, 3),
+          Row(100, "nested_value")
+        ) :: Row(
+          2,
+          Seq(7, 8, 9),
+          Row(200, "nested_value2")
+        ) :: Nil
+      )
+    }
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/DataConverterTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/DataConverterTest.scala
new file mode 100644
index 000000000..6866ba7b6
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/DataConverterTest.scala
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.row
+
+import org.apache.fluss.row.{BinaryString, Decimal => FlussDecimal, 
GenericArray, GenericRow, TimestampLtz, TimestampNtz}
+import org.apache.fluss.types.{ArrayType, CharType, DataTypes, DecimalType, 
LocalZonedTimestampType, RowType, TimestampType}
+
+import org.apache.spark.sql.types.{Decimal => SparkDecimal}
+import org.apache.spark.unsafe.types.UTF8String
+import org.assertj.core.api.Assertions.assertThat
+import org.scalatest.funsuite.AnyFunSuite
+
+class DataConverterTest extends AnyFunSuite {
+
+  test("toSparkObject: null value") {
+    val result = DataConverter.toSparkObject(null, DataTypes.INT)
+    assertThat(result).isNull()
+  }
+
+  test("toSparkObject: primitive types") {
+    // Integer
+    val intValue = Integer.valueOf(42)
+    val intResult = DataConverter.toSparkObject(intValue, DataTypes.INT)
+    assertThat(intResult).isEqualTo(42)
+
+    // Long
+    val longValue = java.lang.Long.valueOf(12345L)
+    val longResult = DataConverter.toSparkObject(longValue, DataTypes.BIGINT)
+    assertThat(longResult).isEqualTo(12345L)
+
+    // Boolean
+    val boolValue = java.lang.Boolean.valueOf(true)
+    val boolResult = DataConverter.toSparkObject(boolValue, DataTypes.BOOLEAN)
+    assertThat(boolResult).isEqualTo(true)
+
+    // Float
+    val floatValue = java.lang.Float.valueOf(3.14f)
+    val floatResult = DataConverter.toSparkObject(floatValue, DataTypes.FLOAT)
+    assertThat(floatResult).isEqualTo(3.14f)
+
+    // Double
+    val doubleValue = java.lang.Double.valueOf(2.718)
+    val doubleResult = DataConverter.toSparkObject(doubleValue, 
DataTypes.DOUBLE)
+    assertThat(doubleResult).isEqualTo(2.718)
+  }
+
+  test("toSparkTimestamp: TimestampNtz") {
+    val timestamp = TimestampNtz.fromMillis(1234567890123L)
+    val result = DataConverter.toSparkTimestamp(timestamp)
+    // Fluss stores in milliseconds, Spark uses microseconds
+    assertThat(result).isEqualTo(1234567890123000L)
+  }
+
+  test("toSparkTimestamp: TimestampLtz") {
+    val timestamp = TimestampLtz.fromEpochMillis(1234567890123L)
+    val result = DataConverter.toSparkTimestamp(timestamp)
+    assertThat(result).isEqualTo(1234567890123000L)
+  }
+
+  test("toSparkDecimal: positive decimal") {
+    val flussDecimal = FlussDecimal.fromBigDecimal(new 
java.math.BigDecimal("123.45"), 5, 2)
+    val sparkDecimal = DataConverter.toSparkDecimal(flussDecimal)
+
+    assertThat(sparkDecimal).isInstanceOf(classOf[SparkDecimal])
+    assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(123.45)
+  }
+
+  test("toSparkDecimal: negative decimal") {
+    val flussDecimal = FlussDecimal.fromBigDecimal(new 
java.math.BigDecimal("-999.99"), 5, 2)
+    val sparkDecimal = DataConverter.toSparkDecimal(flussDecimal)
+
+    assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(-999.99)
+  }
+
+  test("toSparkDecimal: zero") {
+    val flussDecimal = FlussDecimal.fromBigDecimal(java.math.BigDecimal.ZERO, 
5, 2)
+    val sparkDecimal = DataConverter.toSparkDecimal(flussDecimal)
+
+    assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(0.0)
+  }
+
+  test("toSparkUTF8String: ASCII string") {
+    val binaryString = BinaryString.fromString("Hello World")
+    val utf8String = DataConverter.toSparkUTF8String(binaryString)
+
+    assertThat(utf8String).isInstanceOf(classOf[UTF8String])
+    assertThat(utf8String.toString).isEqualTo("Hello World")
+  }
+
+  test("toSparkUTF8String: UTF-8 Chinese characters") {
+    val binaryString = BinaryString.fromString("你好世界")
+    val utf8String = DataConverter.toSparkUTF8String(binaryString)
+
+    assertThat(utf8String.toString).isEqualTo("你好世界")
+  }
+
+  test("toSparkUTF8String: empty string") {
+    val binaryString = BinaryString.fromString("")
+    val utf8String = DataConverter.toSparkUTF8String(binaryString)
+
+    assertThat(utf8String.toString).isEqualTo("")
+  }
+
+  test("toSparkUTF8String: special characters") {
+    val binaryString = BinaryString.fromString("Test\n\t\"Special\"")
+    val utf8String = DataConverter.toSparkUTF8String(binaryString)
+
+    assertThat(utf8String.toString).isEqualTo("Test\n\t\"Special\"")
+  }
+
+  test("toSparkArray: integer array") {
+    val flussArray = new GenericArray(
+      Array[Object](
+        Integer.valueOf(1),
+        Integer.valueOf(2),
+        Integer.valueOf(3),
+        Integer.valueOf(4),
+        Integer.valueOf(5)))
+    val arrayType = new ArrayType(DataTypes.INT)
+    val sparkArray = DataConverter.toSparkArray(flussArray, arrayType)
+
+    assertThat(sparkArray.numElements()).isEqualTo(5)
+    assertThat(sparkArray.getInt(0)).isEqualTo(1)
+    assertThat(sparkArray.getInt(4)).isEqualTo(5)
+  }
+
+  test("toSparkArray: string array") {
+    val strings = Array[Object](
+      BinaryString.fromString("a"),
+      BinaryString.fromString("b"),
+      BinaryString.fromString("c"))
+    val flussArray = new GenericArray(strings)
+    val arrayType = new ArrayType(DataTypes.STRING)
+    val sparkArray = DataConverter.toSparkArray(flussArray, arrayType)
+
+    assertThat(sparkArray.numElements()).isEqualTo(3)
+    assertThat(sparkArray.getUTF8String(0).toString).isEqualTo("a")
+    assertThat(sparkArray.getUTF8String(2).toString).isEqualTo("c")
+  }
+
+  test("toSparkArray: empty array") {
+    val flussArray = new GenericArray(Array.empty[Object])
+    val arrayType = new ArrayType(DataTypes.INT)
+    val sparkArray = DataConverter.toSparkArray(flussArray, arrayType)
+
+    assertThat(sparkArray.numElements()).isEqualTo(0)
+  }
+
+  test("toSparkArray: array with null elements") {
+    val flussArray = new GenericArray(Array[Object](Integer.valueOf(1), null, 
Integer.valueOf(3)))
+    val arrayType = new ArrayType(DataTypes.INT)
+    val sparkArray = DataConverter.toSparkArray(flussArray, arrayType)
+
+    assertThat(sparkArray.numElements()).isEqualTo(3)
+    assertThat(sparkArray.getInt(0)).isEqualTo(1)
+    assertThat(sparkArray.isNullAt(1)).isTrue()
+    assertThat(sparkArray.getInt(2)).isEqualTo(3)
+  }
+
+  test("toSparkInternalRow: simple row") {
+    val rowType = RowType
+      .builder()
+      .field("id", DataTypes.INT)
+      .field("name", DataTypes.STRING)
+      .field("age", DataTypes.INT)
+      .build()
+
+    val flussRow = new GenericRow(3)
+    flussRow.setField(0, Integer.valueOf(1))
+    flussRow.setField(1, BinaryString.fromString("Alice"))
+    flussRow.setField(2, Integer.valueOf(25))
+
+    val sparkRow = DataConverter.toSparkInternalRow(flussRow, rowType)
+
+    assertThat(sparkRow.numFields).isEqualTo(3)
+    assertThat(sparkRow.getInt(0)).isEqualTo(1)
+    assertThat(sparkRow.getUTF8String(1).toString).isEqualTo("Alice")
+    assertThat(sparkRow.getInt(2)).isEqualTo(25)
+  }
+
+  test("toSparkInternalRow: row with null fields") {
+    val rowType = RowType
+      .builder()
+      .field("id", DataTypes.INT)
+      .field("name", DataTypes.STRING)
+      .build()
+
+    val flussRow = new GenericRow(2)
+    flussRow.setField(0, Integer.valueOf(1))
+    flussRow.setField(1, null)
+
+    val sparkRow = DataConverter.toSparkInternalRow(flussRow, rowType)
+
+    assertThat(sparkRow.getInt(0)).isEqualTo(1)
+    assertThat(sparkRow.isNullAt(1)).isTrue()
+  }
+
+  test("toSparkInternalRow: nested row") {
+    val innerRowType = RowType
+      .builder()
+      .field("city", DataTypes.STRING)
+      .field("code", DataTypes.INT)
+      .build()
+
+    val outerRowType = RowType
+      .builder()
+      .field("id", DataTypes.INT)
+      .field("address", innerRowType)
+      .build()
+
+    val innerRow = new GenericRow(2)
+    innerRow.setField(0, BinaryString.fromString("Beijing"))
+    innerRow.setField(1, Integer.valueOf(100))
+
+    val outerRow = new GenericRow(2)
+    outerRow.setField(0, Integer.valueOf(1))
+    outerRow.setField(1, innerRow)
+
+    val sparkRow = DataConverter.toSparkInternalRow(outerRow, outerRowType)
+
+    assertThat(sparkRow.getInt(0)).isEqualTo(1)
+    val sparkInnerRow = sparkRow.getStruct(1, 2)
+    assertThat(sparkInnerRow.getUTF8String(0).toString).isEqualTo("Beijing")
+    assertThat(sparkInnerRow.getInt(1)).isEqualTo(100)
+  }
+
+  test("toSparkObject: CHAR type") {
+    val binaryString = BinaryString.fromString("test")
+    val charType = new CharType(10)
+    val result = DataConverter.toSparkObject(binaryString, charType)
+
+    assertThat(result).isInstanceOf(classOf[UTF8String])
+    assertThat(result.asInstanceOf[UTF8String].toString).isEqualTo("test")
+  }
+
+  test("toSparkObject: DECIMAL type") {
+    val flussDecimal = FlussDecimal.fromBigDecimal(new 
java.math.BigDecimal("12.34"), 4, 2)
+    val decimalType = new DecimalType(4, 2)
+    val result = DataConverter.toSparkObject(flussDecimal, decimalType)
+
+    assertThat(result).isInstanceOf(classOf[SparkDecimal])
+  }
+
+  test("toSparkObject: TIMESTAMP_WITHOUT_TIME_ZONE type") {
+    val timestamp = TimestampNtz.fromMillis(1000000L)
+    val timestampType = new TimestampType(3)
+    val result = DataConverter.toSparkObject(timestamp, timestampType)
+
+    assertThat(result).isInstanceOf(classOf[java.lang.Long])
+    assertThat(result.asInstanceOf[Long]).isEqualTo(1000000000L) // 
microseconds
+  }
+
+  test("toSparkObject: TIMESTAMP_WITH_LOCAL_TIME_ZONE type") {
+    val timestamp = TimestampLtz.fromEpochMillis(2000000L)
+    val timestampType = new LocalZonedTimestampType(3)
+    val result = DataConverter.toSparkObject(timestamp, timestampType)
+
+    assertThat(result).isInstanceOf(classOf[java.lang.Long])
+    assertThat(result.asInstanceOf[Long]).isEqualTo(2000000000L) // 
microseconds
+  }
+
+  test("toSparkObject: ROW type") {
+    val rowType = RowType
+      .builder()
+      .field("x", DataTypes.INT)
+      .build()
+
+    val flussRow = new GenericRow(1)
+    flussRow.setField(0, Integer.valueOf(42))
+
+    val result = DataConverter.toSparkObject(flussRow, rowType)
+
+    assertThat(result).isNotNull()
+    assertThat(result.asInstanceOf[FlussAsSparkRow].getInt(0)).isEqualTo(42)
+  }
+
+  test("toSparkMap: unsupported") {
+    assertThrows[UnsupportedOperationException] {
+      DataConverter.toSparkMap(null, null)
+    }
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkArrayTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkArrayTest.scala
new file mode 100644
index 000000000..029104e5b
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkArrayTest.scala
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.row
+
+import org.apache.fluss.row.{BinaryString, Decimal => FlussDecimal, 
GenericArray, GenericMap, GenericRow, TimestampLtz, TimestampNtz}
+import org.apache.fluss.types.{ArrayType, BinaryType, DataTypes, IntType, 
LocalZonedTimestampType, MapType, RowType, StringType, TimestampType}
+
+import org.assertj.core.api.Assertions.assertThat
+import org.scalatest.funsuite.AnyFunSuite
+
+import scala.collection.JavaConverters._
+
+class FlussAsSparkArrayTest extends AnyFunSuite {
+
+  test("numElements: empty array") {
+    val elementType = DataTypes.INT
+    val flussArray = new GenericArray(Array.empty[Object])
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThat(sparkArray.numElements()).isEqualTo(0)
+  }
+
+  test("numElements: non-empty array") {
+    val elementType = DataTypes.INT
+    val flussArray = new GenericArray(
+      Array[Object](
+        Integer.valueOf(1),
+        Integer.valueOf(2),
+        Integer.valueOf(3),
+        Integer.valueOf(4),
+        Integer.valueOf(5)))
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThat(sparkArray.numElements()).isEqualTo(5)
+  }
+
+  test("isNullAt: check null elements") {
+    val elementType = DataTypes.INT
+    val flussArray = new GenericArray(Array[Object](Integer.valueOf(1), null, 
Integer.valueOf(3)))
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThat(sparkArray.isNullAt(0)).isFalse()
+    assertThat(sparkArray.isNullAt(1)).isTrue()
+    assertThat(sparkArray.isNullAt(2)).isFalse()
+  }
+
+  test("getBoolean: read boolean array") {
+    val elementType = DataTypes.BOOLEAN
+    val flussArray = new GenericArray(
+      Array[Object](
+        java.lang.Boolean.TRUE,
+        java.lang.Boolean.FALSE,
+        java.lang.Boolean.TRUE
+      ))
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThat(sparkArray.getBoolean(0)).isTrue()
+    assertThat(sparkArray.getBoolean(1)).isFalse()
+    assertThat(sparkArray.getBoolean(2)).isTrue()
+  }
+
+  test("getByte: read byte array") {
+    val elementType = DataTypes.TINYINT
+    val flussArray = new GenericArray(
+      Array[Object](
+        java.lang.Byte.valueOf(1.toByte),
+        java.lang.Byte.valueOf(10.toByte),
+        java.lang.Byte.valueOf((-5).toByte)
+      ))
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThat(sparkArray.getByte(0)).isEqualTo(1.toByte)
+    assertThat(sparkArray.getByte(1)).isEqualTo(10.toByte)
+    assertThat(sparkArray.getByte(2)).isEqualTo((-5).toByte)
+  }
+
+  test("getShort: read short array") {
+    val elementType = DataTypes.SMALLINT
+    val flussArray = new GenericArray(
+      Array[Object](
+        java.lang.Short.valueOf(100.toShort),
+        java.lang.Short.valueOf(200.toShort),
+        java.lang.Short.valueOf((-50).toShort)
+      ))
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThat(sparkArray.getShort(0)).isEqualTo(100.toShort)
+    assertThat(sparkArray.getShort(1)).isEqualTo(200.toShort)
+    assertThat(sparkArray.getShort(2)).isEqualTo((-50).toShort)
+  }
+
+  test("getInt: read integer array") {
+    val elementType = DataTypes.INT
+    val flussArray = new GenericArray(
+      Array[Object](
+        Integer.valueOf(10),
+        Integer.valueOf(20),
+        Integer.valueOf(30),
+        Integer.valueOf(40),
+        Integer.valueOf(50)))
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThat(sparkArray.getInt(0)).isEqualTo(10)
+    assertThat(sparkArray.getInt(2)).isEqualTo(30)
+    assertThat(sparkArray.getInt(4)).isEqualTo(50)
+  }
+
+  test("getLong: read BIGINT array") {
+    val elementType = DataTypes.BIGINT
+    val flussArray = new GenericArray(
+      Array[Object](
+        java.lang.Long.valueOf(100L),
+        java.lang.Long.valueOf(200L),
+        java.lang.Long.valueOf(300L)))
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThat(sparkArray.getLong(0)).isEqualTo(100L)
+    assertThat(sparkArray.getLong(1)).isEqualTo(200L)
+    assertThat(sparkArray.getLong(2)).isEqualTo(300L)
+  }
+
+  test("getLong: read TIMESTAMP_WITHOUT_TIME_ZONE array") {
+    val timestampType = new TimestampType(3)
+    val timestamps = Array[Object](
+      TimestampNtz.fromMillis(1000L),
+      TimestampNtz.fromMillis(2000L),
+      TimestampNtz.fromMillis(3000L)
+    )
+    val flussArray = new GenericArray(timestamps)
+    val sparkArray = new FlussAsSparkArray(timestampType).replace(flussArray)
+
+    // Spark expects microseconds
+    assertThat(sparkArray.getLong(0)).isEqualTo(1000000L)
+    assertThat(sparkArray.getLong(1)).isEqualTo(2000000L)
+    assertThat(sparkArray.getLong(2)).isEqualTo(3000000L)
+  }
+
+  test("getLong: read TIMESTAMP_WITH_LOCAL_TIME_ZONE array") {
+    val timestampType = new LocalZonedTimestampType(3)
+    val timestamps = Array[Object](
+      TimestampLtz.fromEpochMillis(5000L),
+      TimestampLtz.fromEpochMillis(6000L)
+    )
+    val flussArray = new GenericArray(timestamps)
+    val sparkArray = new FlussAsSparkArray(timestampType).replace(flussArray)
+
+    // Spark expects microseconds
+    assertThat(sparkArray.getLong(0)).isEqualTo(5000000L)
+    assertThat(sparkArray.getLong(1)).isEqualTo(6000000L)
+  }
+
+  test("getLong: unsupported type throws exception") {
+    val elementType = DataTypes.STRING
+    val flussArray = new 
GenericArray(Array[Object](BinaryString.fromString("test")))
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThrows[UnsupportedOperationException] {
+      sparkArray.getLong(0)
+    }
+  }
+
+  test("getFloat: read float array") {
+    val elementType = DataTypes.FLOAT
+    val flussArray = new GenericArray(
+      Array[Object](
+        java.lang.Float.valueOf(1.1f),
+        java.lang.Float.valueOf(2.2f),
+        java.lang.Float.valueOf(3.3f)))
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThat(sparkArray.getFloat(0)).isEqualTo(1.1f)
+    assertThat(sparkArray.getFloat(1)).isEqualTo(2.2f)
+    assertThat(sparkArray.getFloat(2)).isEqualTo(3.3f)
+  }
+
+  test("getDouble: read double array") {
+    val elementType = DataTypes.DOUBLE
+    val flussArray = new GenericArray(
+      Array[Object](
+        java.lang.Double.valueOf(1.11),
+        java.lang.Double.valueOf(2.22),
+        java.lang.Double.valueOf(3.33)))
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThat(sparkArray.getDouble(0)).isEqualTo(1.11)
+    assertThat(sparkArray.getDouble(1)).isEqualTo(2.22)
+    assertThat(sparkArray.getDouble(2)).isEqualTo(3.33)
+  }
+
+  test("getDecimal: read decimal array") {
+    val elementType = DataTypes.DECIMAL(10, 2)
+    val decimals = Array[Object](
+      FlussDecimal.fromBigDecimal(new java.math.BigDecimal("10.50"), 10, 2),
+      FlussDecimal.fromBigDecimal(new java.math.BigDecimal("20.75"), 10, 2),
+      FlussDecimal.fromBigDecimal(new java.math.BigDecimal("30.99"), 10, 2)
+    )
+    val flussArray = new GenericArray(decimals)
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThat(sparkArray.getDecimal(0, 10, 
2).toBigDecimal.doubleValue()).isEqualTo(10.50)
+    assertThat(sparkArray.getDecimal(1, 10, 
2).toBigDecimal.doubleValue()).isEqualTo(20.75)
+    assertThat(sparkArray.getDecimal(2, 10, 
2).toBigDecimal.doubleValue()).isEqualTo(30.99)
+  }
+
+  test("getUTF8String: read string array") {
+    val elementType = DataTypes.STRING
+    val strings = Array[Object](
+      BinaryString.fromString("apple"),
+      BinaryString.fromString("banana"),
+      BinaryString.fromString("cherry"))
+    val flussArray = new GenericArray(strings)
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThat(sparkArray.getUTF8String(0).toString).isEqualTo("apple")
+    assertThat(sparkArray.getUTF8String(1).toString).isEqualTo("banana")
+    assertThat(sparkArray.getUTF8String(2).toString).isEqualTo("cherry")
+  }
+
+  test("getUTF8String: read UTF-8 characters array") {
+    val elementType = DataTypes.STRING
+    val strings = Array[Object](
+      BinaryString.fromString("你好"),
+      BinaryString.fromString("世界"),
+      BinaryString.fromString("😊"))
+    val flussArray = new GenericArray(strings)
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThat(sparkArray.getUTF8String(0).toString).isEqualTo("你好")
+    assertThat(sparkArray.getUTF8String(1).toString).isEqualTo("世界")
+    assertThat(sparkArray.getUTF8String(2).toString).isEqualTo("😊")
+  }
+
+  test("getBinary: read binary array") {
+    val binaryType = new BinaryType(3)
+    val binaries = Array[Object](
+      Array[Byte](1, 2, 3),
+      Array[Byte](4, 5, 6),
+      Array[Byte](7, 8, 9)
+    )
+    val flussArray = new GenericArray(binaries)
+    val sparkArray = new FlussAsSparkArray(binaryType).replace(flussArray)
+
+    assertThat(sparkArray.getBinary(0)).isEqualTo(Array[Byte](1, 2, 3))
+    assertThat(sparkArray.getBinary(1)).isEqualTo(Array[Byte](4, 5, 6))
+    assertThat(sparkArray.getBinary(2)).isEqualTo(Array[Byte](7, 8, 9))
+  }
+
+  test("getStruct: read row array") {
+    val rowType = RowType
+      .builder()
+      .field("id", DataTypes.INT)
+      .field("name", DataTypes.STRING)
+      .build()
+
+    val row1 = new GenericRow(2)
+    row1.setField(0, Integer.valueOf(1))
+    row1.setField(1, BinaryString.fromString("Alice"))
+
+    val row2 = new GenericRow(2)
+    row2.setField(0, Integer.valueOf(2))
+    row2.setField(1, BinaryString.fromString("Bob"))
+
+    val flussArray = new GenericArray(Array[Object](row1, row2))
+    val sparkArray = new FlussAsSparkArray(rowType).replace(flussArray)
+
+    val sparkRow1 = sparkArray.getStruct(0, 2)
+    assertThat(sparkRow1.getInt(0)).isEqualTo(1)
+    assertThat(sparkRow1.getUTF8String(1).toString).isEqualTo("Alice")
+
+    val sparkRow2 = sparkArray.getStruct(1, 2)
+    assertThat(sparkRow2.getInt(0)).isEqualTo(2)
+    assertThat(sparkRow2.getUTF8String(1).toString).isEqualTo("Bob")
+  }
+
+  test("getArray: read nested array") {
+    val innerArrayType = new ArrayType(DataTypes.INT)
+    val innerArray1 =
+      new GenericArray(Array[Object](Integer.valueOf(1), Integer.valueOf(2), 
Integer.valueOf(3)))
+    val innerArray2 =
+      new GenericArray(Array[Object](Integer.valueOf(4), Integer.valueOf(5), 
Integer.valueOf(6)))
+
+    val outerArray = new GenericArray(Array[Object](innerArray1, innerArray2))
+    val sparkOuterArray = new 
FlussAsSparkArray(innerArrayType).replace(outerArray)
+
+    val sparkInnerArray1 = sparkOuterArray.getArray(0)
+    assertThat(sparkInnerArray1.numElements()).isEqualTo(3)
+    assertThat(sparkInnerArray1.getInt(0)).isEqualTo(1)
+    assertThat(sparkInnerArray1.getInt(2)).isEqualTo(3)
+
+    val sparkInnerArray2 = sparkOuterArray.getArray(1)
+    assertThat(sparkInnerArray2.numElements()).isEqualTo(3)
+    assertThat(sparkInnerArray2.getInt(0)).isEqualTo(4)
+    assertThat(sparkInnerArray2.getInt(2)).isEqualTo(6)
+  }
+
+  test("getMap: unsupported operation") {
+    val mapType = DataTypes.MAP(DataTypes.INT, DataTypes.STRING)
+    val flussArray = GenericArray.of(new GenericMap(Map(1 -> "map").asJava))
+    val sparkArray = new FlussAsSparkArray(mapType).replace(flussArray)
+
+    assertThrows[UnsupportedOperationException] {
+      sparkArray.getMap(0)
+    }
+  }
+
+  test("getInterval: unsupported operation") {
+    val elementType = DataTypes.INT
+    val flussArray = new GenericArray(Array[Object](Integer.valueOf(1)))
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThrows[UnsupportedOperationException] {
+      sparkArray.getInterval(0)
+    }
+  }
+
+  test("setNullAt: unsupported operation") {
+    val elementType = DataTypes.INT
+    val flussArray = new GenericArray(Array[Object](Integer.valueOf(1)))
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThrows[UnsupportedOperationException] {
+      sparkArray.setNullAt(0)
+    }
+  }
+
+  test("update: unsupported operation") {
+    val elementType = DataTypes.INT
+    val flussArray = new GenericArray(Array[Object](Integer.valueOf(1)))
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThrows[UnsupportedOperationException] {
+      sparkArray.update(0, Integer.valueOf(2))
+    }
+  }
+
+  test("copy: creates deep copy") {
+    val elementType = DataTypes.INT
+    val flussArray =
+      new GenericArray(Array[Object](Integer.valueOf(1), Integer.valueOf(2), 
Integer.valueOf(3)))
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    val copiedArray = sparkArray.copy()
+
+    assertThat(copiedArray.numElements()).isEqualTo(3)
+    assertThat(copiedArray.getInt(0)).isEqualTo(1)
+    assertThat(copiedArray.getInt(1)).isEqualTo(2)
+    assertThat(copiedArray.getInt(2)).isEqualTo(3)
+
+    // Verify it's a different instance
+    assertThat(copiedArray).isNotSameAs(sparkArray)
+  }
+
+  test("replace: reuses wrapper instance") {
+    val elementType = DataTypes.INT
+
+    val array1 = new GenericArray(Array[Object](Integer.valueOf(1), 
Integer.valueOf(2)))
+    val array2 =
+      new GenericArray(Array[Object](Integer.valueOf(10), Integer.valueOf(20), 
Integer.valueOf(30)))
+
+    val sparkArray = new FlussAsSparkArray(elementType)
+    sparkArray.replace(array1)
+    assertThat(sparkArray.numElements()).isEqualTo(2)
+    assertThat(sparkArray.getInt(0)).isEqualTo(1)
+
+    sparkArray.replace(array2)
+    assertThat(sparkArray.numElements()).isEqualTo(3)
+    assertThat(sparkArray.getInt(0)).isEqualTo(10)
+    assertThat(sparkArray.getInt(2)).isEqualTo(30)
+  }
+
+  test("array with all nulls") {
+    val elementType = DataTypes.INT
+    val flussArray = new GenericArray(Array[Object](null, null, null))
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThat(sparkArray.numElements()).isEqualTo(3)
+    assertThat(sparkArray.isNullAt(0)).isTrue()
+    assertThat(sparkArray.isNullAt(1)).isTrue()
+    assertThat(sparkArray.isNullAt(2)).isTrue()
+  }
+
+  test("large array: 1000 elements") {
+    val elementType = DataTypes.INT
+    val elements = (0 until 1000).map(i => Integer.valueOf(i)).toArray[Object]
+    val flussArray = new GenericArray(elements)
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThat(sparkArray.numElements()).isEqualTo(1000)
+    assertThat(sparkArray.getInt(0)).isEqualTo(0)
+    assertThat(sparkArray.getInt(500)).isEqualTo(500)
+    assertThat(sparkArray.getInt(999)).isEqualTo(999)
+  }
+
+  test("mixed nulls and values in array") {
+    val elementType = DataTypes.STRING
+    val elements = Array[Object](
+      BinaryString.fromString("first"),
+      null,
+      BinaryString.fromString("third"),
+      null,
+      BinaryString.fromString("fifth")
+    )
+    val flussArray = new GenericArray(elements)
+    val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+    assertThat(sparkArray.numElements()).isEqualTo(5)
+    assertThat(sparkArray.getUTF8String(0).toString).isEqualTo("first")
+    assertThat(sparkArray.isNullAt(1)).isTrue()
+    assertThat(sparkArray.getUTF8String(2).toString).isEqualTo("third")
+    assertThat(sparkArray.isNullAt(3)).isTrue()
+    assertThat(sparkArray.getUTF8String(4).toString).isEqualTo("fifth")
+  }
+}
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkRowTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkRowTest.scala
new file mode 100644
index 000000000..03ccad4ef
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkRowTest.scala
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.row
+
+import org.apache.fluss.row.{BinaryString, Decimal => FlussDecimal, 
GenericArray, GenericRow, TimestampLtz, TimestampNtz}
+import org.apache.fluss.types.{ArrayType, BinaryType, DataTypes, 
LocalZonedTimestampType, RowType, TimestampType}
+
+import org.assertj.core.api.Assertions.assertThat
+import org.scalatest.funsuite.AnyFunSuite
+
+class FlussAsSparkRowTest extends AnyFunSuite {
+
+  test("basic row operations: numFields and fieldCount") {
+    val rowType = RowType
+      .builder()
+      .field("id", DataTypes.INT)
+      .field("name", DataTypes.STRING)
+      .field("age", DataTypes.INT)
+      .build()
+
+    val flussRow = new GenericRow(3)
+    flussRow.setField(0, Integer.valueOf(1))
+    flussRow.setField(1, BinaryString.fromString("Alice"))
+    flussRow.setField(2, Integer.valueOf(30))
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    assertThat(sparkRow.numFields).isEqualTo(3)
+    assertThat(sparkRow.fieldCount).isEqualTo(3)
+  }
+
+  test("isNullAt: check null fields") {
+    val rowType = RowType
+      .builder()
+      .field("col1", DataTypes.INT)
+      .field("col2", DataTypes.STRING)
+      .field("col3", DataTypes.INT)
+      .build()
+
+    val flussRow = new GenericRow(3)
+    flussRow.setField(0, Integer.valueOf(1))
+    flussRow.setField(1, null)
+    flussRow.setField(2, Integer.valueOf(3))
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    assertThat(sparkRow.isNullAt(0)).isFalse()
+    assertThat(sparkRow.isNullAt(1)).isTrue()
+    assertThat(sparkRow.isNullAt(2)).isFalse()
+  }
+
+  test("getBoolean: read boolean values") {
+    val rowType = RowType
+      .builder()
+      .field("flag1", DataTypes.BOOLEAN)
+      .field("flag2", DataTypes.BOOLEAN)
+      .build()
+
+    val flussRow = new GenericRow(2)
+    flussRow.setField(0, java.lang.Boolean.TRUE)
+    flussRow.setField(1, java.lang.Boolean.FALSE)
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    assertThat(sparkRow.getBoolean(0)).isTrue()
+    assertThat(sparkRow.getBoolean(1)).isFalse()
+  }
+
+  test("getByte: read byte values") {
+    val rowType = RowType
+      .builder()
+      .field("b1", DataTypes.TINYINT)
+      .field("b2", DataTypes.TINYINT)
+      .build()
+
+    val flussRow = new GenericRow(2)
+    flussRow.setField(0, java.lang.Byte.valueOf(10.toByte))
+    flussRow.setField(1, java.lang.Byte.valueOf((-5).toByte))
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    assertThat(sparkRow.getByte(0)).isEqualTo(10.toByte)
+    assertThat(sparkRow.getByte(1)).isEqualTo((-5).toByte)
+  }
+
+  test("getShort: read short values") {
+    val rowType = RowType
+      .builder()
+      .field("s1", DataTypes.SMALLINT)
+      .field("s2", DataTypes.SMALLINT)
+      .build()
+
+    val flussRow = new GenericRow(2)
+    flussRow.setField(0, java.lang.Short.valueOf(100.toShort))
+    flussRow.setField(1, java.lang.Short.valueOf((-200).toShort))
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    assertThat(sparkRow.getShort(0)).isEqualTo(100.toShort)
+    assertThat(sparkRow.getShort(1)).isEqualTo((-200).toShort)
+  }
+
+  test("getInt: read integer values") {
+    val rowType = RowType
+      .builder()
+      .field("num1", DataTypes.INT)
+      .field("num2", DataTypes.INT)
+      .build()
+
+    val flussRow = new GenericRow(2)
+    flussRow.setField(0, Integer.valueOf(42))
+    flussRow.setField(1, Integer.valueOf(-999))
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    assertThat(sparkRow.getInt(0)).isEqualTo(42)
+    assertThat(sparkRow.getInt(1)).isEqualTo(-999)
+  }
+
+  test("getLong: read BIGINT values") {
+    val rowType = RowType
+      .builder()
+      .field("big1", DataTypes.BIGINT)
+      .field("big2", DataTypes.BIGINT)
+      .build()
+
+    val flussRow = new GenericRow(2)
+    flussRow.setField(0, java.lang.Long.valueOf(123456789L))
+    flussRow.setField(1, java.lang.Long.valueOf(-987654321L))
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    assertThat(sparkRow.getLong(0)).isEqualTo(123456789L)
+    assertThat(sparkRow.getLong(1)).isEqualTo(-987654321L)
+  }
+
+  test("getLong: read TIMESTAMP_WITHOUT_TIME_ZONE") {
+    val timestampType = new TimestampType(3)
+    val rowType = RowType
+      .builder()
+      .field("ts", timestampType)
+      .build()
+
+    val timestamp = TimestampNtz.fromMillis(1234567890L)
+    val flussRow = new GenericRow(1)
+    flussRow.setField(0, timestamp)
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    // Spark expects microseconds
+    assertThat(sparkRow.getLong(0)).isEqualTo(1234567890000L)
+  }
+
+  test("getLong: read TIMESTAMP_WITH_LOCAL_TIME_ZONE") {
+    val timestampType = new LocalZonedTimestampType(3)
+    val rowType = RowType
+      .builder()
+      .field("ts_ltz", timestampType)
+      .build()
+
+    val timestamp = TimestampLtz.fromEpochMillis(9876543210L)
+    val flussRow = new GenericRow(1)
+    flussRow.setField(0, timestamp)
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    // Spark expects microseconds
+    assertThat(sparkRow.getLong(0)).isEqualTo(9876543210000L)
+  }
+
+  test("getFloat: read float values") {
+    val rowType = RowType
+      .builder()
+      .field("f1", DataTypes.FLOAT)
+      .field("f2", DataTypes.FLOAT)
+      .build()
+
+    val flussRow = new GenericRow(2)
+    flussRow.setField(0, java.lang.Float.valueOf(3.14f))
+    flussRow.setField(1, java.lang.Float.valueOf(-2.5f))
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    assertThat(sparkRow.getFloat(0)).isEqualTo(3.14f)
+    assertThat(sparkRow.getFloat(1)).isEqualTo(-2.5f)
+  }
+
+  test("getDouble: read double values") {
+    val rowType = RowType
+      .builder()
+      .field("d1", DataTypes.DOUBLE)
+      .field("d2", DataTypes.DOUBLE)
+      .build()
+
+    val flussRow = new GenericRow(2)
+    flussRow.setField(0, java.lang.Double.valueOf(2.718281828))
+    flussRow.setField(1, java.lang.Double.valueOf(-1.414))
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    assertThat(sparkRow.getDouble(0)).isEqualTo(2.718281828)
+    assertThat(sparkRow.getDouble(1)).isEqualTo(-1.414)
+  }
+
+  test("getDecimal: read decimal values") {
+    val rowType = RowType
+      .builder()
+      .field("price", DataTypes.DECIMAL(10, 2))
+      .build()
+
+    val decimal = FlussDecimal.fromBigDecimal(new 
java.math.BigDecimal("123.45"), 10, 2)
+    val flussRow = new GenericRow(1)
+    flussRow.setField(0, decimal)
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    val sparkDecimal = sparkRow.getDecimal(0, 10, 2)
+    assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(123.45)
+  }
+
+  test("getUTF8String: read string values") {
+    val rowType = RowType
+      .builder()
+      .field("name", DataTypes.STRING)
+      .field("desc", DataTypes.STRING)
+      .build()
+
+    val flussRow = new GenericRow(2)
+    flussRow.setField(0, BinaryString.fromString("Hello"))
+    flussRow.setField(1, BinaryString.fromString("World"))
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    assertThat(sparkRow.getUTF8String(0).toString).isEqualTo("Hello")
+    assertThat(sparkRow.getUTF8String(1).toString).isEqualTo("World")
+  }
+
+  test("getUTF8String: read UTF-8 characters") {
+    val rowType = RowType
+      .builder()
+      .field("chinese", DataTypes.STRING)
+      .field("emoji", DataTypes.STRING)
+      .build()
+
+    val flussRow = new GenericRow(2)
+    flussRow.setField(0, BinaryString.fromString("你好世界"))
+    flussRow.setField(1, BinaryString.fromString("😊🎉"))
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    assertThat(sparkRow.getUTF8String(0).toString).isEqualTo("你好世界")
+    assertThat(sparkRow.getUTF8String(1).toString).isEqualTo("😊🎉")
+  }
+
+  test("getBinary: read binary values") {
+    val binaryType = new BinaryType(5)
+    val rowType = RowType
+      .builder()
+      .field("data", binaryType)
+      .build()
+
+    val binaryData = Array[Byte](1, 2, 3, 4, 5)
+    val flussRow = new GenericRow(1)
+    flussRow.setField(0, binaryData)
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    val result = sparkRow.getBinary(0)
+    assertThat(result).isEqualTo(binaryData)
+  }
+
+  test("getStruct: read nested row") {
+    val innerRowType = RowType
+      .builder()
+      .field("city", DataTypes.STRING)
+      .field("zipcode", DataTypes.INT)
+      .build()
+
+    val outerRowType = RowType
+      .builder()
+      .field("id", DataTypes.INT)
+      .field("address", innerRowType)
+      .build()
+
+    val innerRow = new GenericRow(2)
+    innerRow.setField(0, BinaryString.fromString("Beijing"))
+    innerRow.setField(1, Integer.valueOf(100000))
+
+    val outerRow = new GenericRow(2)
+    outerRow.setField(0, Integer.valueOf(1))
+    outerRow.setField(1, innerRow)
+
+    val sparkRow = new FlussAsSparkRow(outerRowType).replace(outerRow)
+
+    assertThat(sparkRow.getInt(0)).isEqualTo(1)
+
+    val innerSparkRow = sparkRow.getStruct(1, 2)
+    assertThat(innerSparkRow.getUTF8String(0).toString).isEqualTo("Beijing")
+    assertThat(innerSparkRow.getInt(1)).isEqualTo(100000)
+  }
+
+  test("getArray: read array field") {
+    val arrayType = new ArrayType(DataTypes.INT)
+    val rowType = RowType
+      .builder()
+      .field("numbers", arrayType)
+      .build()
+
+    val array = new GenericArray(
+      Array[Object](
+        Integer.valueOf(1),
+        Integer.valueOf(2),
+        Integer.valueOf(3),
+        Integer.valueOf(4),
+        Integer.valueOf(5)))
+    val flussRow = new GenericRow(1)
+    flussRow.setField(0, array)
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    val sparkArray = sparkRow.getArray(0)
+    assertThat(sparkArray.numElements()).isEqualTo(5)
+    assertThat(sparkArray.getInt(0)).isEqualTo(1)
+    assertThat(sparkArray.getInt(4)).isEqualTo(5)
+  }
+
+  test("getMap: unsupported operation") {
+    val rowType = RowType
+      .builder()
+      .field("dummy", DataTypes.INT)
+      .build()
+
+    val flussRow = new GenericRow(1)
+    flussRow.setField(0, Integer.valueOf(1))
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    assertThrows[UnsupportedOperationException] {
+      sparkRow.getMap(0)
+    }
+  }
+
+  test("getInterval: unsupported operation") {
+    val rowType = RowType
+      .builder()
+      .field("dummy", DataTypes.INT)
+      .build()
+
+    val flussRow = new GenericRow(1)
+    flussRow.setField(0, Integer.valueOf(1))
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    assertThrows[UnsupportedOperationException] {
+      sparkRow.getInterval(0)
+    }
+  }
+
+  test("setNullAt: unsupported operation") {
+    val rowType = RowType
+      .builder()
+      .field("col", DataTypes.INT)
+      .build()
+
+    val flussRow = new GenericRow(1)
+    flussRow.setField(0, Integer.valueOf(1))
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    assertThrows[UnsupportedOperationException] {
+      sparkRow.setNullAt(0)
+    }
+  }
+
+  test("update: unsupported operation") {
+    val rowType = RowType
+      .builder()
+      .field("col", DataTypes.INT)
+      .build()
+
+    val flussRow = new GenericRow(1)
+    flussRow.setField(0, Integer.valueOf(1))
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    assertThrows[UnsupportedOperationException] {
+      sparkRow.update(0, Integer.valueOf(2))
+    }
+  }
+
+  test("copy: creates deep copy") {
+    val rowType = RowType
+      .builder()
+      .field("id", DataTypes.INT)
+      .field("name", DataTypes.STRING)
+      .build()
+
+    val flussRow = new GenericRow(2)
+    flussRow.setField(0, Integer.valueOf(1))
+    flussRow.setField(1, BinaryString.fromString("Original"))
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+    val copiedRow = sparkRow.copy()
+
+    assertThat(copiedRow.getInt(0)).isEqualTo(1)
+    assertThat(copiedRow.getUTF8String(1).toString).isEqualTo("Original")
+
+    // Verify it's a different instance
+    assertThat(copiedRow).isNotSameAs(sparkRow)
+  }
+
+  test("replace: reuses wrapper instance") {
+    val rowType = RowType
+      .builder()
+      .field("val", DataTypes.INT)
+      .build()
+
+    val row1 = new GenericRow(1)
+    row1.setField(0, Integer.valueOf(1))
+
+    val row2 = new GenericRow(1)
+    row2.setField(0, Integer.valueOf(2))
+
+    val sparkRow = new FlussAsSparkRow(rowType)
+    sparkRow.replace(row1)
+    assertThat(sparkRow.getInt(0)).isEqualTo(1)
+
+    sparkRow.replace(row2)
+    assertThat(sparkRow.getInt(0)).isEqualTo(2)
+  }
+
+  test("complex row: all data types") {
+    val rowType = RowType
+      .builder()
+      .field("bool_col", DataTypes.BOOLEAN)
+      .field("byte_col", DataTypes.TINYINT)
+      .field("short_col", DataTypes.SMALLINT)
+      .field("int_col", DataTypes.INT)
+      .field("long_col", DataTypes.BIGINT)
+      .field("float_col", DataTypes.FLOAT)
+      .field("double_col", DataTypes.DOUBLE)
+      .field("string_col", DataTypes.STRING)
+      .field("decimal_col", DataTypes.DECIMAL(10, 2))
+      .build()
+
+    val flussRow = new GenericRow(9)
+    flussRow.setField(0, java.lang.Boolean.TRUE)
+    flussRow.setField(1, java.lang.Byte.valueOf(10.toByte))
+    flussRow.setField(2, java.lang.Short.valueOf(100.toShort))
+    flussRow.setField(3, Integer.valueOf(1000))
+    flussRow.setField(4, java.lang.Long.valueOf(10000L))
+    flussRow.setField(5, java.lang.Float.valueOf(3.14f))
+    flussRow.setField(6, java.lang.Double.valueOf(2.718))
+    flussRow.setField(7, BinaryString.fromString("test"))
+    flussRow.setField(8, FlussDecimal.fromBigDecimal(new 
java.math.BigDecimal("99.99"), 10, 2))
+
+    val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+    assertThat(sparkRow.getBoolean(0)).isTrue()
+    assertThat(sparkRow.getByte(1)).isEqualTo(10.toByte)
+    assertThat(sparkRow.getShort(2)).isEqualTo(100.toShort)
+    assertThat(sparkRow.getInt(3)).isEqualTo(1000)
+    assertThat(sparkRow.getLong(4)).isEqualTo(10000L)
+    assertThat(sparkRow.getFloat(5)).isEqualTo(3.14f)
+    assertThat(sparkRow.getDouble(6)).isEqualTo(2.718)
+    assertThat(sparkRow.getUTF8String(7).toString).isEqualTo("test")
+    assertThat(sparkRow.getDecimal(8, 10, 
2).toBigDecimal.doubleValue()).isEqualTo(99.99)
+  }
+}

Reply via email to