This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 29a071bf4 [spark] support batch read from fluss cluster (#2377)
29a071bf4 is described below
commit 29a071bf450434a9b5158732a8018309e0623e33
Author: Yann Byron <[email protected]>
AuthorDate: Thu Jan 22 00:38:48 2026 +0800
[spark] support batch read from fluss cluster (#2377)
---
.../java/org/apache/fluss/row/TimestampNtz.java | 6 +
.../org/apache/fluss/utils/InternalRowUtils.java | 102 +++++
.../org/apache/fluss/spark/SparkFlussConf.scala | 30 ++
.../scala/org/apache/fluss/spark/SparkTable.scala | 24 +-
.../fluss/spark/catalog/AbstractSparkTable.scala | 6 +-
.../spark/read/FlussAppendPartitionReader.scala | 92 ++++
.../org/apache/fluss/spark/read/FlussBatch.scala | 177 ++++++++
.../fluss/spark/read/FlussInputPartition.scala | 59 +++
.../fluss/spark/read/FlussPartitionReader.scala | 71 +++
.../spark/read/FlussPartitionReaderFactory.scala | 63 +++
.../org/apache/fluss/spark/read/FlussScan.scala | 65 +++
.../apache/fluss/spark/read/FlussScanBuilder.scala | 61 +++
.../spark/read/FlussUpsertPartitionReader.scala | 90 ++++
.../org/apache/fluss/spark/row/DataConverter.scala | 86 ++++
.../apache/fluss/spark/row/FlussAsSparkArray.scala | 121 ++++++
.../apache/fluss/spark/row/FlussAsSparkRow.scala | 114 +++++
.../apache/fluss/spark/FlussSparkTestBase.scala | 3 +
.../org/apache/fluss/spark/SparkReadTest.scala | 342 +++++++++++++++
.../apache/fluss/spark/row/DataConverterTest.scala | 297 +++++++++++++
.../fluss/spark/row/FlussAsSparkArrayTest.scala | 428 ++++++++++++++++++
.../fluss/spark/row/FlussAsSparkRowTest.scala | 484 +++++++++++++++++++++
21 files changed, 2718 insertions(+), 3 deletions(-)
diff --git a/fluss-common/src/main/java/org/apache/fluss/row/TimestampNtz.java
b/fluss-common/src/main/java/org/apache/fluss/row/TimestampNtz.java
index 56febfd89..c58141da9 100644
--- a/fluss-common/src/main/java/org/apache/fluss/row/TimestampNtz.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/TimestampNtz.java
@@ -129,6 +129,12 @@ public class TimestampNtz implements
Comparable<TimestampNtz>, Serializable {
return new TimestampNtz(millisecond, nanoOfMillisecond);
}
+ /** Converts this {@link TimestampNtz} object to micros. */
+ public long toEpochMicros() {
+ long micros = Math.multiplyExact(millisecond, MICROS_PER_MILLIS);
+ return micros + nanoOfMillisecond / NANOS_PER_MICROS;
+ }
+
/** Converts this {@link TimestampNtz} object to a {@link LocalDateTime}.
*/
public LocalDateTime toLocalDateTime() {
int date = (int) (millisecond / MILLIS_PER_DAY);
diff --git
a/fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java
b/fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java
index 438e17e9f..bfa2a299b 100644
--- a/fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java
@@ -22,15 +22,117 @@ package org.apache.fluss.utils;
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
* additional information regarding copyright ownership. */
+import org.apache.fluss.row.BinaryMap;
+import org.apache.fluss.row.BinaryRow;
import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.GenericMap;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.TimestampLtz;
import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.ArrayType;
+import org.apache.fluss.types.DataType;
import org.apache.fluss.types.DataTypeRoot;
+import org.apache.fluss.types.MapType;
+import org.apache.fluss.types.RowType;
+
+import java.util.HashMap;
+import java.util.Map;
/** Utility class for {@link org.apache.fluss.row.InternalRow} related
operations. */
public class InternalRowUtils {
+ public static InternalRow copyRow(InternalRow row, RowType rowType) {
+ if (row instanceof BinaryRow) {
+ return ((BinaryRow) row).copy();
+ } else {
+ InternalRow.FieldGetter[] fieldGetters =
InternalRow.createFieldGetters(rowType);
+ GenericRow genericRow = new GenericRow(row.getFieldCount());
+ for (int i = 0; i < row.getFieldCount(); i++) {
+ genericRow.setField(
+ i, copyValue(fieldGetters[i].getFieldOrNull(row),
rowType.getTypeAt(i)));
+ }
+ return genericRow;
+ }
+ }
+
+ public static InternalArray copyArray(InternalArray from, DataType
eleType) {
+ if (!eleType.isNullable()) {
+ switch (eleType.getTypeRoot()) {
+ case BOOLEAN:
+ return new GenericArray(from.toBooleanArray());
+ case TINYINT:
+ return new GenericArray(from.toByteArray());
+ case SMALLINT:
+ return new GenericArray(from.toShortArray());
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ return new GenericArray(from.toIntArray());
+ case BIGINT:
+ return new GenericArray(from.toLongArray());
+ case FLOAT:
+ return new GenericArray(from.toFloatArray());
+ case DOUBLE:
+ return new GenericArray(from.toDoubleArray());
+ }
+ }
+
+ InternalArray.ElementGetter elementGetter =
InternalArray.createElementGetter(eleType);
+ Object[] newArray = new Object[from.size()];
+ for (int i = 0; i < newArray.length; ++i) {
+ if (!from.isNullAt(i)) {
+ newArray[i] = copyValue(elementGetter.getElementOrNull(from,
i), eleType);
+ } else {
+ newArray[i] = null;
+ }
+ }
+ return new GenericArray(newArray);
+ }
+
+ private static InternalMap copyMap(InternalMap map, DataType keyType,
DataType valueType) {
+ if (map instanceof BinaryMap) {
+ return ((BinaryMap) map).copy();
+ }
+ InternalArray.ElementGetter keyGetter =
InternalArray.createElementGetter(keyType);
+ InternalArray.ElementGetter valueGetter =
InternalArray.createElementGetter(valueType);
+ Map<Object, Object> newMap = new HashMap<>();
+ InternalArray keys = map.keyArray();
+ InternalArray values = map.valueArray();
+ for (int i = 0; i < keys.size(); i++) {
+ newMap.put(
+ copyValue(keyGetter.getElementOrNull(keys, i), keyType),
+ copyValue(valueGetter.getElementOrNull(values, i),
valueType));
+ }
+ return new GenericMap(newMap);
+ }
+
+ private static Object copyValue(Object o, DataType type) {
+ if (o instanceof BinaryString) {
+ return ((BinaryString) o).copy();
+ } else if (o instanceof InternalRow) {
+ return copyRow((InternalRow) o, (RowType) type);
+ } else if (o instanceof InternalArray) {
+ return copyArray((InternalArray) o, ((ArrayType)
type).getElementType());
+ } else if (o instanceof InternalMap) {
+ return copyMap(
+ (InternalMap) o,
+ ((MapType) type).getKeyType(),
+ ((MapType) type).getValueType());
+ } else if (o instanceof byte[]) {
+ byte[] copy = new byte[((byte[]) o).length];
+ System.arraycopy(((byte[]) o), 0, copy, 0, ((byte[]) o).length);
+ return copy;
+ } else if (o instanceof Decimal) {
+ return ((Decimal) o).copy();
+ }
+ return o;
+ }
+
/**
* Compares two objects based on their data type.
*
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
new file mode 100644
index 000000000..5148e1e1c
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.spark.sql.internal.SQLConf.buildConf
+
+object SparkFlussConf {
+
+ val READ_OPTIMIZED = buildConf("spark.sql.fluss.readOptimized")
+ .internal()
+ .doc("If true, Spark will only read data from data lake snapshot or kv
snapshot, not execute merge them with log changes. This is a temporary
configuration that will be deprecated when read-optimized table(e.g.
`mytbl$ro`) is supported.")
+ .booleanConf
+ .createWithDefault(false)
+
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
index 93c51d1e0..028af2815 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
@@ -21,10 +21,15 @@ import org.apache.fluss.client.admin.Admin
import org.apache.fluss.config.{Configuration => FlussConfiguration}
import org.apache.fluss.metadata.{TableInfo, TablePath}
import org.apache.fluss.spark.catalog.{AbstractSparkTable,
SupportsFlussPartitionManagement}
+import org.apache.fluss.spark.read.{FlussAppendScanBuilder,
FlussUpsertScanBuilder}
import org.apache.fluss.spark.write.{FlussAppendWriteBuilder,
FlussUpsertWriteBuilder}
-import org.apache.spark.sql.connector.catalog.SupportsWrite
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite}
+import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
class SparkTable(
tablePath: TablePath,
@@ -33,7 +38,9 @@ class SparkTable(
admin: Admin)
extends AbstractSparkTable(admin, tableInfo)
with SupportsFlussPartitionManagement
- with SupportsWrite {
+ with SupportsRead
+ with SupportsWrite
+ with SQLConfHelper {
override def newWriteBuilder(logicalWriteInfo: LogicalWriteInfo):
WriteBuilder = {
if (tableInfo.getPrimaryKeys.isEmpty) {
@@ -42,4 +49,17 @@ class SparkTable(
new FlussUpsertWriteBuilder(tablePath, logicalWriteInfo.schema(),
flussConfig)
}
}
+
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
+ if (tableInfo.getPrimaryKeys.isEmpty) {
+ new FlussAppendScanBuilder(tablePath, tableInfo, options, flussConfig)
+ } else {
+ if (!conf.getConf(SparkFlussConf.READ_OPTIMIZED, false)) {
+ throw new UnsupportedOperationException(
+ "For now, only data in snapshot can be read, without merging them
with changes. " +
+ "If you can accept it, please set `spark.sql.fluss.readOptimized`
true, and execute query again.")
+ }
+ new FlussUpsertScanBuilder(tablePath, tableInfo, options, flussConfig)
+ }
+ }
}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
index a1f3e09d4..e64f42a51 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/catalog/AbstractSparkTable.scala
@@ -43,7 +43,11 @@ abstract class AbstractSparkTable(val admin: Admin, val
tableInfo: TableInfo) ex
override def schema(): StructType = _schema
override def capabilities(): util.Set[TableCapability] = {
- Set(TableCapability.BATCH_WRITE, TableCapability.STREAMING_WRITE).asJava
+ Set(
+ TableCapability.BATCH_READ,
+ TableCapability.BATCH_WRITE,
+ TableCapability.STREAMING_WRITE
+ ).asJava
}
override def partitioning(): Array[Transform] = {
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
new file mode 100644
index 000000000..075e0a275
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussAppendPartitionReader.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.table.scanner.ScanRecord
+import org.apache.fluss.client.table.scanner.log.ScanRecords
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.{TableBucket, TablePath}
+
+/** Partition reader that reads log data from a single Fluss table bucket. */
+class FlussAppendPartitionReader(
+ tablePath: TablePath,
+ projection: Array[Int],
+ flussPartition: FlussAppendInputPartition,
+ flussConfig: Configuration)
+ extends FlussPartitionReader(tablePath, flussConfig) {
+
+ private val tableBucket: TableBucket = flussPartition.tableBucket
+ private val partitionId = tableBucket.getPartitionId
+ private val bucketId = tableBucket.getBucket
+ private val logScanner =
table.newScan().project(projection).createLogScanner()
+
+ // Iterator for current batch of records
+ private var currentRecords: java.util.Iterator[ScanRecord] = _
+
+ // initialize log scanner
+ initialize()
+
+ override def next(): Boolean = {
+ if (closed) {
+ return false
+ }
+
+ // If we have records in current batch, return next one
+ if (currentRecords != null && currentRecords.hasNext) {
+ val scanRecord = currentRecords.next()
+ currentRow = convertToSparkRow(scanRecord)
+ return true
+ }
+
+ // Poll for more records
+ val scanRecords = logScanner.poll(POLL_TIMEOUT)
+
+ if (scanRecords == null || scanRecords.isEmpty) {
+ return false
+ }
+
+ // Get records for our bucket
+ val bucketRecords = scanRecords.records(tableBucket)
+ if (bucketRecords.isEmpty) {
+ return false
+ }
+
+ currentRecords = bucketRecords.iterator()
+ if (currentRecords.hasNext) {
+ val scanRecord = currentRecords.next()
+ currentRow = convertToSparkRow(scanRecord)
+ true
+ } else {
+ false
+ }
+ }
+
+ override def close0(): Unit = {
+ if (logScanner != null) {
+ logScanner.close()
+ }
+ }
+
+ private def initialize(): Unit = {
+ if (partitionId != null) {
+ logScanner.subscribeFromBeginning(partitionId, bucketId)
+ } else {
+ logScanner.subscribeFromBeginning(bucketId)
+ }
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala
new file mode 100644
index 000000000..1bcffa6fd
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.{Connection, ConnectionFactory}
+import org.apache.fluss.client.admin.Admin
+import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl,
OffsetsInitializer}
+import org.apache.fluss.client.metadata.KvSnapshots
+import org.apache.fluss.client.table.scanner.log.LogScanner
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.{PartitionInfo, TableBucket, TableInfo,
TablePath}
+
+import org.apache.spark.sql.connector.read.{Batch, InputPartition,
PartitionReaderFactory}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+abstract class FlussBatch(
+ tablePath: TablePath,
+ tableInfo: TableInfo,
+ readSchema: StructType,
+ flussConfig: Configuration)
+ extends Batch
+ with AutoCloseable {
+
+ lazy val conn: Connection = ConnectionFactory.createConnection(flussConfig)
+
+ lazy val admin: Admin = conn.getAdmin
+
+ lazy val partitionInfos: util.List[PartitionInfo] =
admin.listPartitionInfos(tablePath).get()
+
+ protected def projection: Array[Int] = {
+ val columnNameToIndex =
tableInfo.getSchema.getColumnNames.asScala.zipWithIndex.toMap
+ readSchema.fields.map {
+ field =>
+ columnNameToIndex.getOrElse(
+ field.name,
+ throw new IllegalArgumentException(s"Invalid field name:
${field.name}"))
+ }
+ }
+
+ override def close(): Unit = {
+ if (admin != null) {
+ admin.close()
+ }
+ if (conn != null) {
+ conn.close()
+ }
+ }
+}
+
+/** Batch for reading log table (append-only table). */
+class FlussAppendBatch(
+ tablePath: TablePath,
+ tableInfo: TableInfo,
+ readSchema: StructType,
+ options: CaseInsensitiveStringMap,
+ flussConfig: Configuration)
+ extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
+
+ override def planInputPartitions(): Array[InputPartition] = {
+ def createPartitions(partitionId: Option[Long]): Array[InputPartition] = {
+ (0 until tableInfo.getNumBuckets).map {
+ bucketId =>
+ val tableBucket = partitionId match {
+ case Some(partitionId) =>
+ new TableBucket(tableInfo.getTableId, partitionId, bucketId)
+ case None =>
+ new TableBucket(tableInfo.getTableId, bucketId)
+ }
+ FlussAppendInputPartition(tableBucket).asInstanceOf[InputPartition]
+ }.toArray
+ }
+
+ if (tableInfo.isPartitioned) {
+ partitionInfos.asScala.flatMap {
+ partitionInfo => createPartitions(Some(partitionInfo.getPartitionId))
+ }.toArray
+ } else {
+ createPartitions(None)
+ }
+ }
+
+ override def createReaderFactory(): PartitionReaderFactory = {
+ new FlussAppendPartitionReaderFactory(tablePath, projection, options,
flussConfig)
+ }
+
+}
+
+/** Batch for reading primary key table (upsert table). */
+class FlussUpsertBatch(
+ tablePath: TablePath,
+ tableInfo: TableInfo,
+ readSchema: StructType,
+ options: CaseInsensitiveStringMap,
+ flussConfig: Configuration)
+ extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
+
+ private val latestOffsetsInitializer = OffsetsInitializer.latest()
+ private val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin,
tablePath)
+
+ override def planInputPartitions(): Array[InputPartition] = {
+ def createPartitions(partitionName: String, kvSnapshots: KvSnapshots):
Array[InputPartition] = {
+ val tableId = kvSnapshots.getTableId
+ val partitionId = kvSnapshots.getPartitionId
+ val bucketIds = kvSnapshots.getBucketIds
+ val bucketIdToLogOffset =
+ latestOffsetsInitializer.getBucketOffsets(partitionName, bucketIds,
bucketOffsetsRetriever)
+ bucketIds.asScala
+ .map {
+ bucketId =>
+ val tableBucket = new TableBucket(tableId, partitionId, bucketId)
+ val snapshotIdOpt = kvSnapshots.getSnapshotId(bucketId)
+ val logStartingOffsetOpt = kvSnapshots.getLogOffset(bucketId)
+ val logEndingOffset = bucketIdToLogOffset.get(bucketId)
+
+ if (snapshotIdOpt.isPresent) {
+ assert(
+ logStartingOffsetOpt.isPresent,
+ "Log offset must be present when snapshot id is present")
+
+ // Create hybrid partition
+ FlussUpsertInputPartition(
+ tableBucket,
+ snapshotIdOpt.getAsLong,
+ logStartingOffsetOpt.getAsLong,
+ logEndingOffset
+ )
+ } else {
+ // No snapshot yet, only read log from beginning
+ FlussUpsertInputPartition(
+ tableBucket,
+ -1L,
+ LogScanner.EARLIEST_OFFSET,
+ logEndingOffset)
+ }
+ }
+ .map(_.asInstanceOf[InputPartition])
+ .toArray
+ }
+
+ if (tableInfo.isPartitioned) {
+ partitionInfos.asScala.flatMap {
+ partitionInfo =>
+ val partitionName = partitionInfo.getPartitionName
+ val kvSnapshots =
+ admin.getLatestKvSnapshots(tablePath, partitionName).get()
+ createPartitions(partitionName, kvSnapshots)
+ }.toArray
+ } else {
+ val kvSnapshots = admin.getLatestKvSnapshots(tablePath).get()
+ createPartitions(null, kvSnapshots)
+ }
+ }
+
+ override def createReaderFactory(): PartitionReaderFactory = {
+ new FlussUpsertPartitionReaderFactory(tablePath, projection, options,
flussConfig)
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
new file mode 100644
index 000000000..75bb056ce
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussInputPartition.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.metadata.TableBucket
+
+import org.apache.spark.sql.connector.read.InputPartition
+
+trait FlussInputPartition extends InputPartition {
+
+ override def preferredLocations(): Array[String] = {
+ // Could return tablet server locations for data locality
+ Array.empty[String]
+ }
+
+}
+
+/**
+ * Represents an input partition for reading data from a Fluss table bucket.
+ *
+ * @param tableBucket
+ * the table bucket to read from
+ */
+case class FlussAppendInputPartition(tableBucket: TableBucket) extends
FlussInputPartition
+
+/**
+ * Represents an input partition for reading data from a primary key table
bucket. This partition
+ * includes snapshot information for hybrid snapshot-log reading.
+ *
+ * @param tableBucket
+ * the table bucket to read from
+ * @param snapshotId
+ * the snapshot ID to read from, -1 if no snapshot
+ * @param logStartingOffset
+ * the log offset where incremental reading should start
+ * @param logStoppingOffset
+ * the log offset where incremental reading should end
+ */
+case class FlussUpsertInputPartition(
+ tableBucket: TableBucket,
+ snapshotId: Long,
+ logStartingOffset: Long,
+ logStoppingOffset: Long)
+ extends FlussInputPartition
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala
new file mode 100644
index 000000000..10203898f
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.{Connection, ConnectionFactory}
+import org.apache.fluss.client.table.Table
+import org.apache.fluss.client.table.scanner.ScanRecord
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.{TableInfo, TablePath}
+import org.apache.fluss.row.{InternalRow => FlussInternalRow}
+import org.apache.fluss.spark.row.DataConverter
+import org.apache.fluss.types.RowType
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.read.PartitionReader
+
+import java.time.Duration
+
+abstract class FlussPartitionReader(tablePath: TablePath, flussConfig:
Configuration)
+ extends PartitionReader[InternalRow] {
+
+ protected val POLL_TIMEOUT: Duration = Duration.ofMillis(100)
+ protected lazy val conn: Connection =
ConnectionFactory.createConnection(flussConfig)
+ protected lazy val table: Table = conn.getTable(tablePath)
+ protected lazy val tableInfo: TableInfo = table.getTableInfo
+ protected val rowType: RowType = tableInfo.getRowType
+
+ protected var currentRow: InternalRow = _
+ protected var closed = false
+
+ override def get(): InternalRow = currentRow
+
+ def close0(): Unit
+
+ override def close(): Unit = {
+ if (!closed) {
+ closed = true
+ close0()
+
+ if (table != null) {
+ table.close()
+ }
+ if (conn != null) {
+ conn.close()
+ }
+ }
+ }
+
+ protected def convertToSparkRow(scanRecord: ScanRecord): InternalRow = {
+ convertToSparkRow(scanRecord.getRow)
+ }
+
+ protected def convertToSparkRow(flussRow: FlussInternalRow): InternalRow = {
+ DataConverter.toSparkInternalRow(flussRow, rowType)
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReaderFactory.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReaderFactory.scala
new file mode 100644
index 000000000..13d374d38
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReaderFactory.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.TablePath
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader,
PartitionReaderFactory}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/** Factory for creating partition readers to read data from Fluss. */
+class FlussAppendPartitionReaderFactory(
+ tablePath: TablePath,
+ projection: Array[Int],
+ options: CaseInsensitiveStringMap,
+ flussConfig: Configuration)
+ extends PartitionReaderFactory {
+
+ override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
+ val flussPartition = partition.asInstanceOf[FlussAppendInputPartition]
+ new FlussAppendPartitionReader(
+ tablePath,
+ projection,
+ flussPartition,
+ flussConfig
+ )
+ }
+}
+
+/** Factory for creating partition readers to read primary key table data from
Fluss. */
+class FlussUpsertPartitionReaderFactory(
+ tablePath: TablePath,
+ projection: Array[Int],
+ options: CaseInsensitiveStringMap,
+ flussConfig: Configuration)
+ extends PartitionReaderFactory {
+
+ override def createReader(partition: InputPartition):
PartitionReader[InternalRow] = {
+ val upsertPartition = partition.asInstanceOf[FlussUpsertInputPartition]
+ new FlussUpsertPartitionReader(
+ tablePath,
+ projection,
+ upsertPartition,
+ flussConfig
+ )
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
new file mode 100644
index 000000000..77ad081ff
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScan.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.{TableInfo, TablePath}
+import org.apache.fluss.spark.SparkConversions
+
+import org.apache.spark.sql.connector.read.{Batch, Scan}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/** An interface that extends from Spark [[Scan]]. */
+trait FlussScan extends Scan {
+ def tableInfo: TableInfo
+
+ def requiredSchema: Option[StructType]
+
+ override def readSchema(): StructType = {
+
requiredSchema.getOrElse(SparkConversions.toSparkDataType(tableInfo.getRowType))
+ }
+}
+
+/** Fluss Append Scan. */
+case class FlussAppendScan(
+ tablePath: TablePath,
+ tableInfo: TableInfo,
+ requiredSchema: Option[StructType],
+ options: CaseInsensitiveStringMap,
+ flussConfig: Configuration)
+ extends FlussScan {
+
+ override def toBatch: Batch = {
+ new FlussAppendBatch(tablePath, tableInfo, readSchema, options,
flussConfig)
+ }
+}
+
+/** Fluss Upsert Scan. */
+case class FlussUpsertScan(
+ tablePath: TablePath,
+ tableInfo: TableInfo,
+ requiredSchema: Option[StructType],
+ options: CaseInsensitiveStringMap,
+ flussConfig: Configuration)
+ extends FlussScan {
+
+ override def toBatch: Batch = {
+ new FlussUpsertBatch(tablePath, tableInfo, readSchema, options,
flussConfig)
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
new file mode 100644
index 000000000..cd4d17ec0
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussScanBuilder.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.config.{Configuration => FlussConfiguration}
+import org.apache.fluss.metadata.{TableInfo, TablePath}
+
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder,
SupportsPushDownRequiredColumns}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/** An interface that extends from Spark [[ScanBuilder]]. */
+trait FlussScanBuilder extends ScanBuilder with
SupportsPushDownRequiredColumns {
+
+ protected var requiredSchema: Option[StructType] = _
+
+ override def pruneColumns(requiredSchema: StructType): Unit = {
+ this.requiredSchema = Some(requiredSchema)
+ }
+}
+
+/** Fluss Append Scan Builder. */
+class FlussAppendScanBuilder(
+ tablePath: TablePath,
+ tableInfo: TableInfo,
+ options: CaseInsensitiveStringMap,
+ flussConfig: FlussConfiguration)
+ extends FlussScanBuilder {
+
+ override def build(): Scan = {
+ FlussAppendScan(tablePath, tableInfo, requiredSchema, options, flussConfig)
+ }
+}
+
+/** Fluss Upsert Scan Builder. */
+class FlussUpsertScanBuilder(
+ tablePath: TablePath,
+ tableInfo: TableInfo,
+ options: CaseInsensitiveStringMap,
+ flussConfig: FlussConfiguration)
+ extends FlussScanBuilder {
+
+ override def build(): Scan = {
+ FlussUpsertScan(tablePath, tableInfo, requiredSchema, options, flussConfig)
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
new file mode 100644
index 000000000..52cd46652
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.table.scanner.batch.BatchScanner
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.{TableBucket, TablePath}
+
+import java.util
+
+/**
+ * Partition reader that reads primary key table data.
+ *
+ * For now, only data in snapshot can be read, without merging them with
changes.
+ */
+class FlussUpsertPartitionReader(
+ tablePath: TablePath,
+ projection: Array[Int],
+ flussPartition: FlussUpsertInputPartition,
+ flussConfig: Configuration)
+ extends FlussPartitionReader(tablePath, flussConfig) {
+
+ private val tableBucket: TableBucket = flussPartition.tableBucket
+ private val snapshotId: Long = flussPartition.snapshotId
+
+ // KV Snapshot Reader (if snapshot exists)
+ private var snapshotScanner: BatchScanner = _
+ private var snapshotIterator:
util.Iterator[org.apache.fluss.row.InternalRow] = _
+
+ // initialize scanners
+ initialize()
+
+ override def next(): Boolean = {
+ if (closed) {
+ return false
+ }
+
+ if (snapshotIterator == null || !snapshotIterator.hasNext) {
+ // Try to get next batch from snapshot scanner
+ val batch = snapshotScanner.pollBatch(POLL_TIMEOUT)
+ if (batch == null) {
+ // No more data fetched.
+ false
+ } else {
+ snapshotIterator = batch
+ if (snapshotIterator.hasNext) {
+ currentRow = convertToSparkRow(snapshotIterator.next())
+ true
+ } else {
+ // Poll a new batch
+ next()
+ }
+ }
+ } else {
+ // Get data from current snapshot batch
+ currentRow = convertToSparkRow(snapshotIterator.next())
+ true
+ }
+ }
+
+ private def initialize(): Unit = {
+ // Initialize Scanners
+ if (snapshotId >= 0) {
+ // Create batch scanner
+ snapshotScanner =
+ table.newScan().project(projection).createBatchScanner(tableBucket,
snapshotId)
+ }
+ }
+
+ override def close0(): Unit = {
+ if (snapshotScanner != null) {
+ snapshotScanner.close()
+ }
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/DataConverter.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/DataConverter.scala
new file mode 100644
index 000000000..cb9b30900
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/DataConverter.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.row
+
+import org.apache.fluss.row.{BinaryString => FlussBinaryString, Decimal =>
FlussDecimal, InternalArray => FlussInternalArray, InternalMap =>
FlussInternalMap, InternalRow => FlussInternalRow, TimestampLtz =>
FlussTimestampLtz, TimestampNtz => FlussTimestampNtz}
+import org.apache.fluss.types.{ArrayType => FlussArrayType, DataType =>
FlussDataType, MapType => FlussMapType, RowType}
+import org.apache.fluss.types.DataTypeRoot._
+import org.apache.fluss.utils.InternalRowUtils
+
+import org.apache.spark.sql.catalyst.{InternalRow => SparkInteralRow}
+import org.apache.spark.sql.catalyst.util.{ArrayData => SparkArrayData,
MapData => SparkMapData}
+import org.apache.spark.sql.types.{Decimal => SparkDecimal}
+import org.apache.spark.unsafe.types.UTF8String
+
+object DataConverter {
+
+ def toSparkObject(o: Object, dataType: FlussDataType): Any = {
+ if (o == null) {
+ return null
+ }
+
+ dataType.getTypeRoot match {
+ case TIMESTAMP_WITHOUT_TIME_ZONE =>
+ toSparkTimestamp(o.asInstanceOf[FlussTimestampNtz])
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
+ toSparkTimestamp(o.asInstanceOf[FlussTimestampLtz])
+ case CHAR =>
+ toSparkUTF8String(o.asInstanceOf[FlussBinaryString])
+ case DECIMAL =>
+ toSparkDecimal(o.asInstanceOf[FlussDecimal])
+ case ARRAY =>
+ toSparkArray(o.asInstanceOf[FlussInternalArray],
dataType.asInstanceOf[FlussArrayType])
+ case MAP =>
+ toSparkMap(o.asInstanceOf[FlussInternalMap],
dataType.asInstanceOf[FlussMapType])
+ case ROW =>
+ toSparkInternalRow(o.asInstanceOf[FlussInternalRow],
dataType.asInstanceOf[RowType])
+ case _ => o
+ }
+ }
+
+ def toSparkTimestamp(timestamp: FlussTimestampNtz): Long = {
+ timestamp.toEpochMicros
+ }
+
+ def toSparkTimestamp(timestamp: FlussTimestampLtz): Long = {
+ timestamp.toEpochMicros
+ }
+
+ def toSparkDecimal(flussDecimal: FlussDecimal): SparkDecimal = {
+ SparkDecimal(flussDecimal.toBigDecimal)
+ }
+
+ def toSparkUTF8String(flussBinaryString: FlussBinaryString): UTF8String = {
+ UTF8String.fromBytes(flussBinaryString.toBytes)
+ }
+
+ def toSparkArray(flussArray: FlussInternalArray, arrayType: FlussArrayType):
SparkArrayData = {
+ val elementType = arrayType.getElementType
+ new FlussAsSparkArray(elementType)
+ .replace(InternalRowUtils.copyArray(flussArray, elementType))
+ }
+
+ def toSparkMap(flussMap: FlussInternalMap, mapType: FlussMapType):
SparkMapData = {
+ // TODO: support map type in fluss-spark
+ throw new UnsupportedOperationException()
+ }
+
+ def toSparkInternalRow(flussRow: FlussInternalRow, rowType: RowType):
SparkInteralRow = {
+ new FlussAsSparkRow(rowType).replace(flussRow)
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkArray.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkArray.scala
new file mode 100644
index 000000000..c67ae1e7a
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkArray.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.row
+
+import org.apache.fluss.row.{InternalArray => FlussInternalArray}
+import org.apache.fluss.types.{ArrayType => FlussArrayType, BigIntType =>
FlussBigIntType, BinaryType => FlussBinaryType, DataType => FlussDataType,
LocalZonedTimestampType => FlussTimestampLTZType, MapType => FlussMapType,
RowType, TimestampType => FlussTimestampNTZType}
+import org.apache.fluss.utils.InternalRowUtils
+
+import org.apache.spark.sql.catalyst.{InternalRow => SparkInternalRow}
+import org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader
+import org.apache.spark.sql.catalyst.util.{ArrayData => SparkArrayData,
MapData => SparkMapData}
+import org.apache.spark.sql.types.{DataType => SparkDataType, Decimal =>
SparkDecimal}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+class FlussAsSparkArray(elementType: FlussDataType) extends SparkArrayData {
+
+ var flussArray: FlussInternalArray = _
+
+ def replace(array: FlussInternalArray): SparkArrayData = {
+ this.flussArray = array
+ this
+ }
+
+ override def numElements(): Int = flussArray.size()
+
+ override def copy(): SparkArrayData = {
+ new
FlussAsSparkArray(elementType).replace(InternalRowUtils.copyArray(flussArray,
elementType))
+ }
+
+ override def array: Array[Any] = {
+ val elementGetter = FlussInternalArray.createElementGetter(elementType);
+ Array.range(0, numElements()).map {
+ ordinal =>
+ DataConverter.toSparkObject(
+ elementGetter.getElementOrNull(flussArray, ordinal),
+ elementType)
+ }
+ }
+
+ override def setNullAt(ordinal: Int): Unit = throw new
UnsupportedOperationException()
+
+ override def update(ordinal: Int, value: Any): Unit = throw new
UnsupportedOperationException()
+
+ override def isNullAt(ordinal: Int): Boolean = flussArray.isNullAt(ordinal)
+
+ override def getBoolean(ordinal: Int): Boolean =
flussArray.getBoolean(ordinal)
+
+ override def getByte(ordinal: Int): Byte = flussArray.getByte(ordinal)
+
+ override def getShort(ordinal: Int): Short = flussArray.getShort(ordinal)
+
+ override def getInt(ordinal: Int): Int = flussArray.getInt(ordinal)
+
+ override def getLong(ordinal: Int): Long = {
+ elementType match {
+ case _: FlussBigIntType =>
+ flussArray.getLong(ordinal)
+ case ntz: FlussTimestampNTZType =>
+ flussArray.getTimestampNtz(ordinal, ntz.getPrecision).toEpochMicros
+ case ltz: FlussTimestampLTZType =>
+ flussArray.getTimestampLtz(ordinal, ltz.getPrecision).toEpochMicros
+ case _ =>
+ throw new UnsupportedOperationException("Unsupported type: " +
elementType)
+ }
+ }
+
+ override def getFloat(ordinal: Int): Float = flussArray.getFloat(ordinal)
+
+ override def getDouble(ordinal: Int): Double = flussArray.getDouble(ordinal)
+
+ override def getDecimal(ordinal: Int, precision: Int, scale: Int):
SparkDecimal = {
+ DataConverter.toSparkDecimal(flussArray.getDecimal(ordinal, precision,
scale))
+ }
+
+ override def getUTF8String(ordinal: Int): UTF8String = {
+ DataConverter.toSparkUTF8String(flussArray.getString(ordinal))
+ }
+
+ override def getBinary(ordinal: Int): Array[Byte] = {
+ val binaryType = elementType.asInstanceOf[FlussBinaryType]
+ flussArray.getBinary(ordinal, binaryType.getLength)
+ }
+
+ override def getInterval(ordinal: Int): CalendarInterval =
+ throw new UnsupportedOperationException()
+
+ override def getStruct(ordinal: Int, numFields: Int): SparkInternalRow = {
+ DataConverter.toSparkInternalRow(
+ flussArray.getRow(ordinal, numFields),
+ elementType.asInstanceOf[RowType])
+ }
+
+ override def getArray(ordinal: Int): SparkArrayData = {
+ DataConverter.toSparkArray(
+ flussArray.getArray(ordinal),
+ elementType.asInstanceOf[FlussArrayType])
+ }
+
+ override def getMap(ordinal: Int): SparkMapData = {
+ DataConverter.toSparkMap(flussArray.getMap(ordinal),
elementType.asInstanceOf[FlussMapType])
+ }
+
+ override def get(ordinal: Int, dataType: SparkDataType): AnyRef = {
+ SpecializedGettersReader.read(this, ordinal, dataType, true, true)
+ }
+}
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkRow.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkRow.scala
new file mode 100644
index 000000000..175900cd1
--- /dev/null
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/row/FlussAsSparkRow.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.row
+
+import org.apache.fluss.row.{InternalRow => FlussInternalRow}
+import org.apache.fluss.types.{ArrayType => FlussArrayType, BinaryType =>
FlussBinaryType, LocalZonedTimestampType, RowType, TimestampType}
+import org.apache.fluss.utils.InternalRowUtils
+
+import org.apache.spark.sql.catalyst.{InternalRow => SparkInteralRow}
+import org.apache.spark.sql.catalyst.expressions.SpecializedGettersReader
+import org.apache.spark.sql.catalyst.util.{ArrayData => SparkArrayData,
MapData => SparkMapData}
+import org.apache.spark.sql.types.{DataType => SparkDataType, Decimal =>
SparkDecimal}
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+class FlussAsSparkRow(rowType: RowType) extends SparkInteralRow {
+
+ val fieldCount: Int = rowType.getFieldCount
+
+ var row: FlussInternalRow = _
+
+ def replace(row: FlussInternalRow): FlussAsSparkRow = {
+ this.row = row
+ this
+ }
+
+ override def numFields: Int = fieldCount
+
+ override def setNullAt(ordinal: Int): Unit = throw new
UnsupportedOperationException()
+
+ override def update(ordinal: Int, value: Any): Unit = throw new
UnsupportedOperationException()
+
+ override def copy(): SparkInteralRow = {
+ new FlussAsSparkRow(rowType).replace(InternalRowUtils.copyRow(row,
rowType))
+ }
+
+ override def isNullAt(ordinal: Int): Boolean = row.isNullAt(ordinal)
+
+ override def getBoolean(ordinal: Int): Boolean = row.getBoolean(ordinal)
+
+ override def getByte(ordinal: Int): Byte = row.getByte(ordinal)
+
+ override def getShort(ordinal: Int): Short = row.getShort(ordinal)
+
+ override def getInt(ordinal: Int): Int = row.getInt(ordinal)
+
+ override def getLong(ordinal: Int): Long = {
+ val flussType = rowType.getTypeAt(ordinal)
+ flussType match {
+ case ltz: LocalZonedTimestampType =>
+ row.getTimestampLtz(ordinal, ltz.getPrecision).toEpochMicros
+ case ntz: TimestampType =>
+ row.getTimestampNtz(ordinal, ntz.getPrecision).toEpochMicros
+ case _ =>
+ row.getLong(ordinal)
+ }
+ }
+
+ override def getFloat(ordinal: Int): Float = row.getFloat(ordinal)
+
+ override def getDouble(ordinal: Int): Double = row.getDouble(ordinal)
+
+ override def getDecimal(ordinal: Int, precision: Int, scale: Int):
SparkDecimal = {
+ val flussDecimal = row.getDecimal(ordinal, precision, scale)
+ DataConverter.toSparkDecimal(flussDecimal)
+ }
+
+ override def getUTF8String(ordinal: Int): UTF8String = {
+ DataConverter.toSparkUTF8String(row.getString(ordinal))
+ }
+
+ override def getBinary(ordinal: Int): Array[Byte] = {
+ val binaryType = rowType.getTypeAt(ordinal).asInstanceOf[FlussBinaryType]
+ row.getBinary(ordinal, binaryType.getLength)
+ }
+
+ override def getInterval(ordinal: Int): CalendarInterval =
+ throw new UnsupportedOperationException()
+
+ override def getStruct(ordinal: Int, numFields: Int): SparkInteralRow = {
+ val subRowType = rowType.getTypeAt(ordinal).asInstanceOf[RowType]
+ val flussSubRow = row.getRow(ordinal, numFields)
+ DataConverter.toSparkInternalRow(flussSubRow, subRowType)
+ }
+
+ override def getArray(ordinal: Int): SparkArrayData = {
+ val arrayType = rowType.getTypeAt(ordinal).asInstanceOf[FlussArrayType]
+ val flussArray = row.getArray(ordinal)
+ DataConverter.toSparkArray(flussArray, arrayType)
+ }
+
+ override def getMap(ordinal: Int): SparkMapData = {
+ // TODO: support map type in fluss-spark
+ throw new UnsupportedOperationException()
+ }
+
+ override def get(ordinal: Int, dataType: SparkDataType): AnyRef = {
+ SpecializedGettersReader.read(this, ordinal, dataType, true, true)
+ }
+}
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
index 2de158b7d..f9b98fe7c 100644
---
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
@@ -50,6 +50,9 @@ class FlussSparkTestBase extends QueryTest with
SharedSparkSession {
.set(s"spark.sql.catalog.$DEFAULT_CATALOG",
classOf[SparkCatalog].getName)
.set(s"spark.sql.catalog.$DEFAULT_CATALOG.bootstrap.servers",
bootstrapServers)
.set("spark.sql.defaultCatalog", DEFAULT_CATALOG)
+ // Enable read optimized by default temporarily.
+ // TODO: remove this when https://github.com/apache/fluss/issues/2427 is
done.
+ .set("spark.sql.fluss.readOptimized", "true")
}
override protected def beforeAll(): Unit = {
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkReadTest.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkReadTest.scala
new file mode 100644
index 000000000..834cec7ad
--- /dev/null
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkReadTest.scala
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.spark.sql.Row
+
+class SparkReadTest extends FlussSparkTestBase {
+
+ test("Spark Read: log table") {
+ withTable("t") {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT,
amount INT, address STRING)
+ |""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t
+ |VALUES
+ |(600L, 21L, 601, "addr1"), (700L, 22L, 602, "addr2"),
+ |(800L, 23L, 603, "addr3"), (900L, 24L, 604, "addr4"),
+ |(1000L, 25L, 605, "addr5")
+ |""".stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+ Row(600L, 21L, 601, "addr1") ::
+ Row(700L, 22L, 602, "addr2") ::
+ Row(800L, 23L, 603, "addr3") ::
+ Row(900L, 24L, 604, "addr4") ::
+ Row(1000L, 25L, 605, "addr5") :: Nil
+ )
+
+ // projection
+ checkAnswer(
+ sql(s"SELECT address, itemId FROM $DEFAULT_DATABASE.t ORDER BY
orderId"),
+ Row("addr1", 21L) ::
+ Row("addr2", 22L) ::
+ Row("addr3", 23L) ::
+ Row("addr4", 24L) ::
+ Row("addr5", 25L) :: Nil
+ )
+
+ // filter
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t WHERE amount % 2 = 0 ORDER BY
orderId"),
+ Row(700L, 22L, 602, "addr2") ::
+ Row(900L, 24L, 604, "addr4") :: Nil
+ )
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t VALUES
+ |(700L, 220L, 602, "addr2"),
+ |(900L, 240L, 604, "addr4"),
+ |(1100L, 260L, 606, "addr6")
+ |""".stripMargin)
+ // projection + filter
+ checkAnswer(
+ sql(s"""
+ |SELECT orderId, itemId FROM $DEFAULT_DATABASE.t
+ |WHERE orderId >= 900 ORDER BY orderId, itemId""".stripMargin),
+ Row(900L, 24L) ::
+ Row(900L, 240L) ::
+ Row(1000L, 25L) ::
+ Row(1100L, 260L) :: Nil
+ )
+ }
+ }
+
+ test("Spark Read: primary key table") {
+ withTable("t") {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT,
amount INT, address STRING)
+ |TBLPROPERTIES("primary.key" = "orderId")
+ |""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t VALUES
+ |(600L, 21L, 601, "addr1"), (700L, 22L, 602, "addr2"),
+ |(800L, 23L, 603, "addr3"), (900L, 24L, 604, "addr4"),
+ |(1000L, 25L, 605, "addr5")
+ |""".stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+ Row(600L, 21L, 601, "addr1") ::
+ Row(700L, 22L, 602, "addr2") ::
+ Row(800L, 23L, 603, "addr3") ::
+ Row(900L, 24L, 604, "addr4") ::
+ Row(1000L, 25L, 605, "addr5") :: Nil
+ )
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t VALUES
+ |(700L, 220L, 602, "addr2"),
+ |(900L, 240L, 604, "addr4"),
+ |(1100L, 260L, 606, "addr6")
+ |""".stripMargin)
+
+ checkAnswer(
+ sql(s"""
+ |SELECT orderId, itemId, address FROM $DEFAULT_DATABASE.t
+ |WHERE amount <= 603 ORDER BY orderId""".stripMargin),
+ Row(600L, 21L, "addr1") ::
+ Row(700L, 220L, "addr2") ::
+ Row(800L, 23L, "addr3") ::
+ Nil
+ )
+ }
+ }
+
+ test("Spark Read: partitioned log table") {
+ withTable("t") {
+ sql(
+ s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT,
amount INT, address STRING, dt STRING)
+ |PARTITIONED BY (dt)
+ |""".stripMargin
+ )
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t VALUES
+ |(600L, 21L, 601, "addr1", "2026-01-01"), (700L, 22L, 602,
"addr2", "2026-01-01"),
+ |(800L, 23L, 603, "addr3", "2026-01-02"), (900L, 24L, 604,
"addr4", "2026-01-02"),
+ |(1000L, 25L, 605, "addr5", "2026-01-03")
+ |""".stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+ Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+ Row(700L, 22L, 602, "addr2", "2026-01-01") ::
+ Row(800L, 23L, 603, "addr3", "2026-01-02") ::
+ Row(900L, 24L, 604, "addr4", "2026-01-02") ::
+ Row(1000L, 25L, 605, "addr5", "2026-01-03") :: Nil
+ )
+
+ // Read with partition filter
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t WHERE dt = '2026-01-01' ORDER
BY orderId"),
+ Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+ Row(700L, 22L, 602, "addr2", "2026-01-01") :: Nil
+ )
+
+ // Read with multiple partitions filter
+ checkAnswer(
+ sql(s"""
+ |SELECT orderId, address, dt FROM $DEFAULT_DATABASE.t
+ |WHERE dt IN ('2026-01-01', '2026-01-02')
+ |ORDER BY orderId""".stripMargin),
+ Row(600L, "addr1", "2026-01-01") ::
+ Row(700L, "addr2", "2026-01-01") ::
+ Row(800L, "addr3", "2026-01-02") ::
+ Row(900L, "addr4", "2026-01-02") :: Nil
+ )
+ }
+ }
+
+ test("Spark Read: partitioned primary key table") {
+ withTable("t") {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT,
amount INT, address STRING, dt STRING)
+ |PARTITIONED BY (dt)
+ |TBLPROPERTIES("primary.key" = "orderId,dt")
+ |""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t VALUES
+ |(600L, 21L, 601, "addr1", "2026-01-01"), (700L, 22L, 602,
"addr2", "2026-01-01"),
+ |(800L, 23L, 603, "addr3", "2026-01-02"), (900L, 24L, 604,
"addr4", "2026-01-02"),
+ |(1000L, 25L, 605, "addr5", "2026-01-03")
+ |""".stripMargin)
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t VALUES
+ |(700L, 220L, 602, "addr2_updated", "2026-01-01"),
+ |(900L, 240L, 604, "addr4_updated", "2026-01-02"),
+ |(1100L, 260L, 606, "addr6", "2026-01-03")
+ |""".stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+ Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+ Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
+ Row(800L, 23L, 603, "addr3", "2026-01-02") ::
+ Row(900L, 240L, 604, "addr4_updated", "2026-01-02") ::
+ Row(1000L, 25L, 605, "addr5", "2026-01-03") ::
+ Row(1100L, 260L, 606, "addr6", "2026-01-03") ::
+ Nil
+ )
+
+ // Read with partition filter
+ checkAnswer(
+ sql(s"""
+ |SELECT * FROM $DEFAULT_DATABASE.t
+ |WHERE dt = '2026-01-01'
+ |ORDER BY orderId""".stripMargin),
+ Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+ Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
+ Nil
+ )
+
+ // Read with multiple partition filters
+ checkAnswer(
+ sql(
+ s"SELECT * FROM $DEFAULT_DATABASE.t WHERE dt IN ('2026-01-01',
'2026-01-02') ORDER BY orderId"),
+ Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+ Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
+ Row(800L, 23L, 603, "addr3", "2026-01-02") ::
+ Row(900L, 240L, 604, "addr4_updated", "2026-01-02") ::
+ Nil
+ )
+ }
+ }
+
+ test("Spark: all data types") {
+ withTable("t") {
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t (
+ |id INT,
+ |flag BOOLEAN,
+ |small SHORT,
+ |value INT,
+ |big BIGINT,
+ |real FLOAT,
+ |amount DOUBLE,
+ |name STRING,
+ |decimal_val DECIMAL(10, 2),
+ |date_val DATE,
+ |timestamp_ntz_val TIMESTAMP,
+ |timestamp_ltz_val TIMESTAMP_LTZ
+ |)""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t VALUES
+ |(1, true, 100s, 1000, 10000L, 12.34f, 56.78, "string_val",
+ | 123.45, DATE "2026-01-01", TIMESTAMP "2026-01-01 12:00:00",
TIMESTAMP "2026-01-01 12:00:00"),
+ |(2, false, 200s, 2000, 20000L, 23.45f, 67.89, "another_str",
+ | 223.45, DATE "2026-01-02", TIMESTAMP "2026-01-02 12:00:00",
TIMESTAMP "2026-01-02 12:00:00")
+ |""".stripMargin)
+
+ // Read all data types
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t"),
+ Row(
+ 1,
+ true,
+ 100.toShort,
+ 1000,
+ 10000L,
+ 12.34f,
+ 56.78,
+ "string_val",
+ java.math.BigDecimal.valueOf(123.45),
+ java.sql.Date.valueOf("2026-01-01"),
+ java.sql.Timestamp.valueOf("2026-01-01 12:00:00"),
+ java.sql.Timestamp.valueOf("2026-01-01 12:00:00")
+ ) :: Row(
+ 2,
+ false,
+ 200.toShort,
+ 2000,
+ 20000L,
+ 23.45f,
+ 67.89,
+ "another_str",
+ java.math.BigDecimal.valueOf(223.45),
+ java.sql.Date.valueOf("2026-01-02"),
+ java.sql.Timestamp.valueOf("2026-01-02 12:00:00"),
+ java.sql.Timestamp.valueOf("2026-01-02 12:00:00")
+ ) :: Nil
+ )
+
+ // Test projection on selected columns
+ checkAnswer(
+ sql(s"SELECT id, name, amount FROM $DEFAULT_DATABASE.t ORDER BY id"),
+ Row(1, "string_val", java.math.BigDecimal.valueOf(56.78)) ::
+ Row(2, "another_str", java.math.BigDecimal.valueOf(67.89)) :: Nil
+ )
+
+ // Filter by boolean field
+ checkAnswer(
+ sql(s"SELECT id, flag, name FROM $DEFAULT_DATABASE.t WHERE flag = true
ORDER BY id"),
+ Row(1, true, "string_val") :: Nil
+ )
+
+ // Filter by numeric field
+ checkAnswer(
+ sql(s"SELECT id, value, name FROM $DEFAULT_DATABASE.t WHERE value >
1500 ORDER BY id"),
+ Row(2, 2000, "another_str") :: Nil
+ )
+
+ // Filter by string field
+ checkAnswer(
+ sql(s"SELECT id, name FROM $DEFAULT_DATABASE.t WHERE name LIKE
'%another%' ORDER BY id"),
+ Row(2, "another_str") :: Nil
+ )
+ }
+ }
+
+ test("Spark Read: nested data types table") {
+ withTable("t") {
+ // TODO: support map type
+ sql(s"""
+ |CREATE TABLE $DEFAULT_DATABASE.t (
+ |id INT,
+ |arr ARRAY<INT>,
+ |struct_col STRUCT<col1: INT, col2: STRING>
+ |)""".stripMargin)
+
+ sql(s"""
+ |INSERT INTO $DEFAULT_DATABASE.t VALUES
+ |(1, ARRAY(1, 2, 3), STRUCT(100, 'nested_value')),
+ |(2, ARRAY(7, 8, 9), STRUCT(200, 'nested_value2'))
+ |""".stripMargin)
+
+ checkAnswer(
+ sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY id"),
+ Row(
+ 1,
+ Seq(1, 2, 3),
+ Row(100, "nested_value")
+ ) :: Row(
+ 2,
+ Seq(7, 8, 9),
+ Row(200, "nested_value2")
+ ) :: Nil
+ )
+ }
+ }
+}
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/DataConverterTest.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/DataConverterTest.scala
new file mode 100644
index 000000000..6866ba7b6
--- /dev/null
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/DataConverterTest.scala
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.row
+
+import org.apache.fluss.row.{BinaryString, Decimal => FlussDecimal,
GenericArray, GenericRow, TimestampLtz, TimestampNtz}
+import org.apache.fluss.types.{ArrayType, CharType, DataTypes, DecimalType,
LocalZonedTimestampType, RowType, TimestampType}
+
+import org.apache.spark.sql.types.{Decimal => SparkDecimal}
+import org.apache.spark.unsafe.types.UTF8String
+import org.assertj.core.api.Assertions.assertThat
+import org.scalatest.funsuite.AnyFunSuite
+
+class DataConverterTest extends AnyFunSuite {
+
+ test("toSparkObject: null value") {
+ val result = DataConverter.toSparkObject(null, DataTypes.INT)
+ assertThat(result).isNull()
+ }
+
+ test("toSparkObject: primitive types") {
+ // Integer
+ val intValue = Integer.valueOf(42)
+ val intResult = DataConverter.toSparkObject(intValue, DataTypes.INT)
+ assertThat(intResult).isEqualTo(42)
+
+ // Long
+ val longValue = java.lang.Long.valueOf(12345L)
+ val longResult = DataConverter.toSparkObject(longValue, DataTypes.BIGINT)
+ assertThat(longResult).isEqualTo(12345L)
+
+ // Boolean
+ val boolValue = java.lang.Boolean.valueOf(true)
+ val boolResult = DataConverter.toSparkObject(boolValue, DataTypes.BOOLEAN)
+ assertThat(boolResult).isEqualTo(true)
+
+ // Float
+ val floatValue = java.lang.Float.valueOf(3.14f)
+ val floatResult = DataConverter.toSparkObject(floatValue, DataTypes.FLOAT)
+ assertThat(floatResult).isEqualTo(3.14f)
+
+ // Double
+ val doubleValue = java.lang.Double.valueOf(2.718)
+ val doubleResult = DataConverter.toSparkObject(doubleValue,
DataTypes.DOUBLE)
+ assertThat(doubleResult).isEqualTo(2.718)
+ }
+
+ test("toSparkTimestamp: TimestampNtz") {
+ val timestamp = TimestampNtz.fromMillis(1234567890123L)
+ val result = DataConverter.toSparkTimestamp(timestamp)
+ // Fluss stores in milliseconds, Spark uses microseconds
+ assertThat(result).isEqualTo(1234567890123000L)
+ }
+
+ test("toSparkTimestamp: TimestampLtz") {
+ val timestamp = TimestampLtz.fromEpochMillis(1234567890123L)
+ val result = DataConverter.toSparkTimestamp(timestamp)
+ assertThat(result).isEqualTo(1234567890123000L)
+ }
+
+ test("toSparkDecimal: positive decimal") {
+ val flussDecimal = FlussDecimal.fromBigDecimal(new
java.math.BigDecimal("123.45"), 5, 2)
+ val sparkDecimal = DataConverter.toSparkDecimal(flussDecimal)
+
+ assertThat(sparkDecimal).isInstanceOf(classOf[SparkDecimal])
+ assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(123.45)
+ }
+
+ test("toSparkDecimal: negative decimal") {
+ val flussDecimal = FlussDecimal.fromBigDecimal(new
java.math.BigDecimal("-999.99"), 5, 2)
+ val sparkDecimal = DataConverter.toSparkDecimal(flussDecimal)
+
+ assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(-999.99)
+ }
+
+ test("toSparkDecimal: zero") {
+ val flussDecimal = FlussDecimal.fromBigDecimal(java.math.BigDecimal.ZERO,
5, 2)
+ val sparkDecimal = DataConverter.toSparkDecimal(flussDecimal)
+
+ assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(0.0)
+ }
+
+ test("toSparkUTF8String: ASCII string") {
+ val binaryString = BinaryString.fromString("Hello World")
+ val utf8String = DataConverter.toSparkUTF8String(binaryString)
+
+ assertThat(utf8String).isInstanceOf(classOf[UTF8String])
+ assertThat(utf8String.toString).isEqualTo("Hello World")
+ }
+
+ test("toSparkUTF8String: UTF-8 Chinese characters") {
+ val binaryString = BinaryString.fromString("你好世界")
+ val utf8String = DataConverter.toSparkUTF8String(binaryString)
+
+ assertThat(utf8String.toString).isEqualTo("你好世界")
+ }
+
+ test("toSparkUTF8String: empty string") {
+ val binaryString = BinaryString.fromString("")
+ val utf8String = DataConverter.toSparkUTF8String(binaryString)
+
+ assertThat(utf8String.toString).isEqualTo("")
+ }
+
+ test("toSparkUTF8String: special characters") {
+ val binaryString = BinaryString.fromString("Test\n\t\"Special\"")
+ val utf8String = DataConverter.toSparkUTF8String(binaryString)
+
+ assertThat(utf8String.toString).isEqualTo("Test\n\t\"Special\"")
+ }
+
+ test("toSparkArray: integer array") {
+ val flussArray = new GenericArray(
+ Array[Object](
+ Integer.valueOf(1),
+ Integer.valueOf(2),
+ Integer.valueOf(3),
+ Integer.valueOf(4),
+ Integer.valueOf(5)))
+ val arrayType = new ArrayType(DataTypes.INT)
+ val sparkArray = DataConverter.toSparkArray(flussArray, arrayType)
+
+ assertThat(sparkArray.numElements()).isEqualTo(5)
+ assertThat(sparkArray.getInt(0)).isEqualTo(1)
+ assertThat(sparkArray.getInt(4)).isEqualTo(5)
+ }
+
+ test("toSparkArray: string array") {
+ val strings = Array[Object](
+ BinaryString.fromString("a"),
+ BinaryString.fromString("b"),
+ BinaryString.fromString("c"))
+ val flussArray = new GenericArray(strings)
+ val arrayType = new ArrayType(DataTypes.STRING)
+ val sparkArray = DataConverter.toSparkArray(flussArray, arrayType)
+
+ assertThat(sparkArray.numElements()).isEqualTo(3)
+ assertThat(sparkArray.getUTF8String(0).toString).isEqualTo("a")
+ assertThat(sparkArray.getUTF8String(2).toString).isEqualTo("c")
+ }
+
+ test("toSparkArray: empty array") {
+ val flussArray = new GenericArray(Array.empty[Object])
+ val arrayType = new ArrayType(DataTypes.INT)
+ val sparkArray = DataConverter.toSparkArray(flussArray, arrayType)
+
+ assertThat(sparkArray.numElements()).isEqualTo(0)
+ }
+
+ test("toSparkArray: array with null elements") {
+ val flussArray = new GenericArray(Array[Object](Integer.valueOf(1), null,
Integer.valueOf(3)))
+ val arrayType = new ArrayType(DataTypes.INT)
+ val sparkArray = DataConverter.toSparkArray(flussArray, arrayType)
+
+ assertThat(sparkArray.numElements()).isEqualTo(3)
+ assertThat(sparkArray.getInt(0)).isEqualTo(1)
+ assertThat(sparkArray.isNullAt(1)).isTrue()
+ assertThat(sparkArray.getInt(2)).isEqualTo(3)
+ }
+
+ test("toSparkInternalRow: simple row") {
+ val rowType = RowType
+ .builder()
+ .field("id", DataTypes.INT)
+ .field("name", DataTypes.STRING)
+ .field("age", DataTypes.INT)
+ .build()
+
+ val flussRow = new GenericRow(3)
+ flussRow.setField(0, Integer.valueOf(1))
+ flussRow.setField(1, BinaryString.fromString("Alice"))
+ flussRow.setField(2, Integer.valueOf(25))
+
+ val sparkRow = DataConverter.toSparkInternalRow(flussRow, rowType)
+
+ assertThat(sparkRow.numFields).isEqualTo(3)
+ assertThat(sparkRow.getInt(0)).isEqualTo(1)
+ assertThat(sparkRow.getUTF8String(1).toString).isEqualTo("Alice")
+ assertThat(sparkRow.getInt(2)).isEqualTo(25)
+ }
+
+ test("toSparkInternalRow: row with null fields") {
+ val rowType = RowType
+ .builder()
+ .field("id", DataTypes.INT)
+ .field("name", DataTypes.STRING)
+ .build()
+
+ val flussRow = new GenericRow(2)
+ flussRow.setField(0, Integer.valueOf(1))
+ flussRow.setField(1, null)
+
+ val sparkRow = DataConverter.toSparkInternalRow(flussRow, rowType)
+
+ assertThat(sparkRow.getInt(0)).isEqualTo(1)
+ assertThat(sparkRow.isNullAt(1)).isTrue()
+ }
+
+ test("toSparkInternalRow: nested row") {
+ val innerRowType = RowType
+ .builder()
+ .field("city", DataTypes.STRING)
+ .field("code", DataTypes.INT)
+ .build()
+
+ val outerRowType = RowType
+ .builder()
+ .field("id", DataTypes.INT)
+ .field("address", innerRowType)
+ .build()
+
+ val innerRow = new GenericRow(2)
+ innerRow.setField(0, BinaryString.fromString("Beijing"))
+ innerRow.setField(1, Integer.valueOf(100))
+
+ val outerRow = new GenericRow(2)
+ outerRow.setField(0, Integer.valueOf(1))
+ outerRow.setField(1, innerRow)
+
+ val sparkRow = DataConverter.toSparkInternalRow(outerRow, outerRowType)
+
+ assertThat(sparkRow.getInt(0)).isEqualTo(1)
+ val sparkInnerRow = sparkRow.getStruct(1, 2)
+ assertThat(sparkInnerRow.getUTF8String(0).toString).isEqualTo("Beijing")
+ assertThat(sparkInnerRow.getInt(1)).isEqualTo(100)
+ }
+
+ test("toSparkObject: CHAR type") {
+ val binaryString = BinaryString.fromString("test")
+ val charType = new CharType(10)
+ val result = DataConverter.toSparkObject(binaryString, charType)
+
+ assertThat(result).isInstanceOf(classOf[UTF8String])
+ assertThat(result.asInstanceOf[UTF8String].toString).isEqualTo("test")
+ }
+
+ test("toSparkObject: DECIMAL type") {
+ val flussDecimal = FlussDecimal.fromBigDecimal(new
java.math.BigDecimal("12.34"), 4, 2)
+ val decimalType = new DecimalType(4, 2)
+ val result = DataConverter.toSparkObject(flussDecimal, decimalType)
+
+ assertThat(result).isInstanceOf(classOf[SparkDecimal])
+ }
+
+ test("toSparkObject: TIMESTAMP_WITHOUT_TIME_ZONE type") {
+ val timestamp = TimestampNtz.fromMillis(1000000L)
+ val timestampType = new TimestampType(3)
+ val result = DataConverter.toSparkObject(timestamp, timestampType)
+
+ assertThat(result).isInstanceOf(classOf[java.lang.Long])
+ assertThat(result.asInstanceOf[Long]).isEqualTo(1000000000L) //
microseconds
+ }
+
+ test("toSparkObject: TIMESTAMP_WITH_LOCAL_TIME_ZONE type") {
+ val timestamp = TimestampLtz.fromEpochMillis(2000000L)
+ val timestampType = new LocalZonedTimestampType(3)
+ val result = DataConverter.toSparkObject(timestamp, timestampType)
+
+ assertThat(result).isInstanceOf(classOf[java.lang.Long])
+ assertThat(result.asInstanceOf[Long]).isEqualTo(2000000000L) //
microseconds
+ }
+
+ test("toSparkObject: ROW type") {
+ val rowType = RowType
+ .builder()
+ .field("x", DataTypes.INT)
+ .build()
+
+ val flussRow = new GenericRow(1)
+ flussRow.setField(0, Integer.valueOf(42))
+
+ val result = DataConverter.toSparkObject(flussRow, rowType)
+
+ assertThat(result).isNotNull()
+ assertThat(result.asInstanceOf[FlussAsSparkRow].getInt(0)).isEqualTo(42)
+ }
+
+ test("toSparkMap: unsupported") {
+ assertThrows[UnsupportedOperationException] {
+ DataConverter.toSparkMap(null, null)
+ }
+ }
+}
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkArrayTest.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkArrayTest.scala
new file mode 100644
index 000000000..029104e5b
--- /dev/null
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkArrayTest.scala
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.row
+
+import org.apache.fluss.row.{BinaryString, Decimal => FlussDecimal,
GenericArray, GenericMap, GenericRow, TimestampLtz, TimestampNtz}
+import org.apache.fluss.types.{ArrayType, BinaryType, DataTypes, IntType,
LocalZonedTimestampType, MapType, RowType, StringType, TimestampType}
+
+import org.assertj.core.api.Assertions.assertThat
+import org.scalatest.funsuite.AnyFunSuite
+
+import scala.collection.JavaConverters._
+
+class FlussAsSparkArrayTest extends AnyFunSuite {
+
+ test("numElements: empty array") {
+ val elementType = DataTypes.INT
+ val flussArray = new GenericArray(Array.empty[Object])
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThat(sparkArray.numElements()).isEqualTo(0)
+ }
+
+ test("numElements: non-empty array") {
+ val elementType = DataTypes.INT
+ val flussArray = new GenericArray(
+ Array[Object](
+ Integer.valueOf(1),
+ Integer.valueOf(2),
+ Integer.valueOf(3),
+ Integer.valueOf(4),
+ Integer.valueOf(5)))
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThat(sparkArray.numElements()).isEqualTo(5)
+ }
+
+ test("isNullAt: check null elements") {
+ val elementType = DataTypes.INT
+ val flussArray = new GenericArray(Array[Object](Integer.valueOf(1), null,
Integer.valueOf(3)))
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThat(sparkArray.isNullAt(0)).isFalse()
+ assertThat(sparkArray.isNullAt(1)).isTrue()
+ assertThat(sparkArray.isNullAt(2)).isFalse()
+ }
+
+ test("getBoolean: read boolean array") {
+ val elementType = DataTypes.BOOLEAN
+ val flussArray = new GenericArray(
+ Array[Object](
+ java.lang.Boolean.TRUE,
+ java.lang.Boolean.FALSE,
+ java.lang.Boolean.TRUE
+ ))
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThat(sparkArray.getBoolean(0)).isTrue()
+ assertThat(sparkArray.getBoolean(1)).isFalse()
+ assertThat(sparkArray.getBoolean(2)).isTrue()
+ }
+
+ test("getByte: read byte array") {
+ val elementType = DataTypes.TINYINT
+ val flussArray = new GenericArray(
+ Array[Object](
+ java.lang.Byte.valueOf(1.toByte),
+ java.lang.Byte.valueOf(10.toByte),
+ java.lang.Byte.valueOf((-5).toByte)
+ ))
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThat(sparkArray.getByte(0)).isEqualTo(1.toByte)
+ assertThat(sparkArray.getByte(1)).isEqualTo(10.toByte)
+ assertThat(sparkArray.getByte(2)).isEqualTo((-5).toByte)
+ }
+
+ test("getShort: read short array") {
+ val elementType = DataTypes.SMALLINT
+ val flussArray = new GenericArray(
+ Array[Object](
+ java.lang.Short.valueOf(100.toShort),
+ java.lang.Short.valueOf(200.toShort),
+ java.lang.Short.valueOf((-50).toShort)
+ ))
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThat(sparkArray.getShort(0)).isEqualTo(100.toShort)
+ assertThat(sparkArray.getShort(1)).isEqualTo(200.toShort)
+ assertThat(sparkArray.getShort(2)).isEqualTo((-50).toShort)
+ }
+
+ test("getInt: read integer array") {
+ val elementType = DataTypes.INT
+ val flussArray = new GenericArray(
+ Array[Object](
+ Integer.valueOf(10),
+ Integer.valueOf(20),
+ Integer.valueOf(30),
+ Integer.valueOf(40),
+ Integer.valueOf(50)))
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThat(sparkArray.getInt(0)).isEqualTo(10)
+ assertThat(sparkArray.getInt(2)).isEqualTo(30)
+ assertThat(sparkArray.getInt(4)).isEqualTo(50)
+ }
+
+ test("getLong: read BIGINT array") {
+ val elementType = DataTypes.BIGINT
+ val flussArray = new GenericArray(
+ Array[Object](
+ java.lang.Long.valueOf(100L),
+ java.lang.Long.valueOf(200L),
+ java.lang.Long.valueOf(300L)))
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThat(sparkArray.getLong(0)).isEqualTo(100L)
+ assertThat(sparkArray.getLong(1)).isEqualTo(200L)
+ assertThat(sparkArray.getLong(2)).isEqualTo(300L)
+ }
+
+ test("getLong: read TIMESTAMP_WITHOUT_TIME_ZONE array") {
+ val timestampType = new TimestampType(3)
+ val timestamps = Array[Object](
+ TimestampNtz.fromMillis(1000L),
+ TimestampNtz.fromMillis(2000L),
+ TimestampNtz.fromMillis(3000L)
+ )
+ val flussArray = new GenericArray(timestamps)
+ val sparkArray = new FlussAsSparkArray(timestampType).replace(flussArray)
+
+ // Spark expects microseconds
+ assertThat(sparkArray.getLong(0)).isEqualTo(1000000L)
+ assertThat(sparkArray.getLong(1)).isEqualTo(2000000L)
+ assertThat(sparkArray.getLong(2)).isEqualTo(3000000L)
+ }
+
+ test("getLong: read TIMESTAMP_WITH_LOCAL_TIME_ZONE array") {
+ val timestampType = new LocalZonedTimestampType(3)
+ val timestamps = Array[Object](
+ TimestampLtz.fromEpochMillis(5000L),
+ TimestampLtz.fromEpochMillis(6000L)
+ )
+ val flussArray = new GenericArray(timestamps)
+ val sparkArray = new FlussAsSparkArray(timestampType).replace(flussArray)
+
+ // Spark expects microseconds
+ assertThat(sparkArray.getLong(0)).isEqualTo(5000000L)
+ assertThat(sparkArray.getLong(1)).isEqualTo(6000000L)
+ }
+
+ test("getLong: unsupported type throws exception") {
+ val elementType = DataTypes.STRING
+ val flussArray = new
GenericArray(Array[Object](BinaryString.fromString("test")))
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThrows[UnsupportedOperationException] {
+ sparkArray.getLong(0)
+ }
+ }
+
+ test("getFloat: read float array") {
+ val elementType = DataTypes.FLOAT
+ val flussArray = new GenericArray(
+ Array[Object](
+ java.lang.Float.valueOf(1.1f),
+ java.lang.Float.valueOf(2.2f),
+ java.lang.Float.valueOf(3.3f)))
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThat(sparkArray.getFloat(0)).isEqualTo(1.1f)
+ assertThat(sparkArray.getFloat(1)).isEqualTo(2.2f)
+ assertThat(sparkArray.getFloat(2)).isEqualTo(3.3f)
+ }
+
+ test("getDouble: read double array") {
+ val elementType = DataTypes.DOUBLE
+ val flussArray = new GenericArray(
+ Array[Object](
+ java.lang.Double.valueOf(1.11),
+ java.lang.Double.valueOf(2.22),
+ java.lang.Double.valueOf(3.33)))
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThat(sparkArray.getDouble(0)).isEqualTo(1.11)
+ assertThat(sparkArray.getDouble(1)).isEqualTo(2.22)
+ assertThat(sparkArray.getDouble(2)).isEqualTo(3.33)
+ }
+
+ test("getDecimal: read decimal array") {
+ val elementType = DataTypes.DECIMAL(10, 2)
+ val decimals = Array[Object](
+ FlussDecimal.fromBigDecimal(new java.math.BigDecimal("10.50"), 10, 2),
+ FlussDecimal.fromBigDecimal(new java.math.BigDecimal("20.75"), 10, 2),
+ FlussDecimal.fromBigDecimal(new java.math.BigDecimal("30.99"), 10, 2)
+ )
+ val flussArray = new GenericArray(decimals)
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThat(sparkArray.getDecimal(0, 10,
2).toBigDecimal.doubleValue()).isEqualTo(10.50)
+ assertThat(sparkArray.getDecimal(1, 10,
2).toBigDecimal.doubleValue()).isEqualTo(20.75)
+ assertThat(sparkArray.getDecimal(2, 10,
2).toBigDecimal.doubleValue()).isEqualTo(30.99)
+ }
+
+ test("getUTF8String: read string array") {
+ val elementType = DataTypes.STRING
+ val strings = Array[Object](
+ BinaryString.fromString("apple"),
+ BinaryString.fromString("banana"),
+ BinaryString.fromString("cherry"))
+ val flussArray = new GenericArray(strings)
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThat(sparkArray.getUTF8String(0).toString).isEqualTo("apple")
+ assertThat(sparkArray.getUTF8String(1).toString).isEqualTo("banana")
+ assertThat(sparkArray.getUTF8String(2).toString).isEqualTo("cherry")
+ }
+
+ test("getUTF8String: read UTF-8 characters array") {
+ val elementType = DataTypes.STRING
+ val strings = Array[Object](
+ BinaryString.fromString("你好"),
+ BinaryString.fromString("世界"),
+ BinaryString.fromString("😊"))
+ val flussArray = new GenericArray(strings)
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThat(sparkArray.getUTF8String(0).toString).isEqualTo("你好")
+ assertThat(sparkArray.getUTF8String(1).toString).isEqualTo("世界")
+ assertThat(sparkArray.getUTF8String(2).toString).isEqualTo("😊")
+ }
+
+ test("getBinary: read binary array") {
+ val binaryType = new BinaryType(3)
+ val binaries = Array[Object](
+ Array[Byte](1, 2, 3),
+ Array[Byte](4, 5, 6),
+ Array[Byte](7, 8, 9)
+ )
+ val flussArray = new GenericArray(binaries)
+ val sparkArray = new FlussAsSparkArray(binaryType).replace(flussArray)
+
+ assertThat(sparkArray.getBinary(0)).isEqualTo(Array[Byte](1, 2, 3))
+ assertThat(sparkArray.getBinary(1)).isEqualTo(Array[Byte](4, 5, 6))
+ assertThat(sparkArray.getBinary(2)).isEqualTo(Array[Byte](7, 8, 9))
+ }
+
+ test("getStruct: read row array") {
+ val rowType = RowType
+ .builder()
+ .field("id", DataTypes.INT)
+ .field("name", DataTypes.STRING)
+ .build()
+
+ val row1 = new GenericRow(2)
+ row1.setField(0, Integer.valueOf(1))
+ row1.setField(1, BinaryString.fromString("Alice"))
+
+ val row2 = new GenericRow(2)
+ row2.setField(0, Integer.valueOf(2))
+ row2.setField(1, BinaryString.fromString("Bob"))
+
+ val flussArray = new GenericArray(Array[Object](row1, row2))
+ val sparkArray = new FlussAsSparkArray(rowType).replace(flussArray)
+
+ val sparkRow1 = sparkArray.getStruct(0, 2)
+ assertThat(sparkRow1.getInt(0)).isEqualTo(1)
+ assertThat(sparkRow1.getUTF8String(1).toString).isEqualTo("Alice")
+
+ val sparkRow2 = sparkArray.getStruct(1, 2)
+ assertThat(sparkRow2.getInt(0)).isEqualTo(2)
+ assertThat(sparkRow2.getUTF8String(1).toString).isEqualTo("Bob")
+ }
+
+ test("getArray: read nested array") {
+ val innerArrayType = new ArrayType(DataTypes.INT)
+ val innerArray1 =
+ new GenericArray(Array[Object](Integer.valueOf(1), Integer.valueOf(2),
Integer.valueOf(3)))
+ val innerArray2 =
+ new GenericArray(Array[Object](Integer.valueOf(4), Integer.valueOf(5),
Integer.valueOf(6)))
+
+ val outerArray = new GenericArray(Array[Object](innerArray1, innerArray2))
+ val sparkOuterArray = new
FlussAsSparkArray(innerArrayType).replace(outerArray)
+
+ val sparkInnerArray1 = sparkOuterArray.getArray(0)
+ assertThat(sparkInnerArray1.numElements()).isEqualTo(3)
+ assertThat(sparkInnerArray1.getInt(0)).isEqualTo(1)
+ assertThat(sparkInnerArray1.getInt(2)).isEqualTo(3)
+
+ val sparkInnerArray2 = sparkOuterArray.getArray(1)
+ assertThat(sparkInnerArray2.numElements()).isEqualTo(3)
+ assertThat(sparkInnerArray2.getInt(0)).isEqualTo(4)
+ assertThat(sparkInnerArray2.getInt(2)).isEqualTo(6)
+ }
+
+ test("getMap: unsupported operation") {
+ val mapType = DataTypes.MAP(DataTypes.INT, DataTypes.STRING)
+ val flussArray = GenericArray.of(new GenericMap(Map(1 -> "map").asJava))
+ val sparkArray = new FlussAsSparkArray(mapType).replace(flussArray)
+
+ assertThrows[UnsupportedOperationException] {
+ sparkArray.getMap(0)
+ }
+ }
+
+ test("getInterval: unsupported operation") {
+ val elementType = DataTypes.INT
+ val flussArray = new GenericArray(Array[Object](Integer.valueOf(1)))
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThrows[UnsupportedOperationException] {
+ sparkArray.getInterval(0)
+ }
+ }
+
+ test("setNullAt: unsupported operation") {
+ val elementType = DataTypes.INT
+ val flussArray = new GenericArray(Array[Object](Integer.valueOf(1)))
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThrows[UnsupportedOperationException] {
+ sparkArray.setNullAt(0)
+ }
+ }
+
+ test("update: unsupported operation") {
+ val elementType = DataTypes.INT
+ val flussArray = new GenericArray(Array[Object](Integer.valueOf(1)))
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThrows[UnsupportedOperationException] {
+ sparkArray.update(0, Integer.valueOf(2))
+ }
+ }
+
+ test("copy: creates deep copy") {
+ val elementType = DataTypes.INT
+ val flussArray =
+ new GenericArray(Array[Object](Integer.valueOf(1), Integer.valueOf(2),
Integer.valueOf(3)))
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ val copiedArray = sparkArray.copy()
+
+ assertThat(copiedArray.numElements()).isEqualTo(3)
+ assertThat(copiedArray.getInt(0)).isEqualTo(1)
+ assertThat(copiedArray.getInt(1)).isEqualTo(2)
+ assertThat(copiedArray.getInt(2)).isEqualTo(3)
+
+ // Verify it's a different instance
+ assertThat(copiedArray).isNotSameAs(sparkArray)
+ }
+
+ test("replace: reuses wrapper instance") {
+ val elementType = DataTypes.INT
+
+ val array1 = new GenericArray(Array[Object](Integer.valueOf(1),
Integer.valueOf(2)))
+ val array2 =
+ new GenericArray(Array[Object](Integer.valueOf(10), Integer.valueOf(20),
Integer.valueOf(30)))
+
+ val sparkArray = new FlussAsSparkArray(elementType)
+ sparkArray.replace(array1)
+ assertThat(sparkArray.numElements()).isEqualTo(2)
+ assertThat(sparkArray.getInt(0)).isEqualTo(1)
+
+ sparkArray.replace(array2)
+ assertThat(sparkArray.numElements()).isEqualTo(3)
+ assertThat(sparkArray.getInt(0)).isEqualTo(10)
+ assertThat(sparkArray.getInt(2)).isEqualTo(30)
+ }
+
+ test("array with all nulls") {
+ val elementType = DataTypes.INT
+ val flussArray = new GenericArray(Array[Object](null, null, null))
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThat(sparkArray.numElements()).isEqualTo(3)
+ assertThat(sparkArray.isNullAt(0)).isTrue()
+ assertThat(sparkArray.isNullAt(1)).isTrue()
+ assertThat(sparkArray.isNullAt(2)).isTrue()
+ }
+
+ test("large array: 1000 elements") {
+ val elementType = DataTypes.INT
+ val elements = (0 until 1000).map(i => Integer.valueOf(i)).toArray[Object]
+ val flussArray = new GenericArray(elements)
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThat(sparkArray.numElements()).isEqualTo(1000)
+ assertThat(sparkArray.getInt(0)).isEqualTo(0)
+ assertThat(sparkArray.getInt(500)).isEqualTo(500)
+ assertThat(sparkArray.getInt(999)).isEqualTo(999)
+ }
+
+ test("mixed nulls and values in array") {
+ val elementType = DataTypes.STRING
+ val elements = Array[Object](
+ BinaryString.fromString("first"),
+ null,
+ BinaryString.fromString("third"),
+ null,
+ BinaryString.fromString("fifth")
+ )
+ val flussArray = new GenericArray(elements)
+ val sparkArray = new FlussAsSparkArray(elementType).replace(flussArray)
+
+ assertThat(sparkArray.numElements()).isEqualTo(5)
+ assertThat(sparkArray.getUTF8String(0).toString).isEqualTo("first")
+ assertThat(sparkArray.isNullAt(1)).isTrue()
+ assertThat(sparkArray.getUTF8String(2).toString).isEqualTo("third")
+ assertThat(sparkArray.isNullAt(3)).isTrue()
+ assertThat(sparkArray.getUTF8String(4).toString).isEqualTo("fifth")
+ }
+}
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkRowTest.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkRowTest.scala
new file mode 100644
index 000000000..03ccad4ef
--- /dev/null
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/row/FlussAsSparkRowTest.scala
@@ -0,0 +1,484 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.row
+
+import org.apache.fluss.row.{BinaryString, Decimal => FlussDecimal,
GenericArray, GenericRow, TimestampLtz, TimestampNtz}
+import org.apache.fluss.types.{ArrayType, BinaryType, DataTypes,
LocalZonedTimestampType, RowType, TimestampType}
+
+import org.assertj.core.api.Assertions.assertThat
+import org.scalatest.funsuite.AnyFunSuite
+
+class FlussAsSparkRowTest extends AnyFunSuite {
+
+ test("basic row operations: numFields and fieldCount") {
+ val rowType = RowType
+ .builder()
+ .field("id", DataTypes.INT)
+ .field("name", DataTypes.STRING)
+ .field("age", DataTypes.INT)
+ .build()
+
+ val flussRow = new GenericRow(3)
+ flussRow.setField(0, Integer.valueOf(1))
+ flussRow.setField(1, BinaryString.fromString("Alice"))
+ flussRow.setField(2, Integer.valueOf(30))
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ assertThat(sparkRow.numFields).isEqualTo(3)
+ assertThat(sparkRow.fieldCount).isEqualTo(3)
+ }
+
+ test("isNullAt: check null fields") {
+ val rowType = RowType
+ .builder()
+ .field("col1", DataTypes.INT)
+ .field("col2", DataTypes.STRING)
+ .field("col3", DataTypes.INT)
+ .build()
+
+ val flussRow = new GenericRow(3)
+ flussRow.setField(0, Integer.valueOf(1))
+ flussRow.setField(1, null)
+ flussRow.setField(2, Integer.valueOf(3))
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ assertThat(sparkRow.isNullAt(0)).isFalse()
+ assertThat(sparkRow.isNullAt(1)).isTrue()
+ assertThat(sparkRow.isNullAt(2)).isFalse()
+ }
+
+ test("getBoolean: read boolean values") {
+ val rowType = RowType
+ .builder()
+ .field("flag1", DataTypes.BOOLEAN)
+ .field("flag2", DataTypes.BOOLEAN)
+ .build()
+
+ val flussRow = new GenericRow(2)
+ flussRow.setField(0, java.lang.Boolean.TRUE)
+ flussRow.setField(1, java.lang.Boolean.FALSE)
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ assertThat(sparkRow.getBoolean(0)).isTrue()
+ assertThat(sparkRow.getBoolean(1)).isFalse()
+ }
+
+ test("getByte: read byte values") {
+ val rowType = RowType
+ .builder()
+ .field("b1", DataTypes.TINYINT)
+ .field("b2", DataTypes.TINYINT)
+ .build()
+
+ val flussRow = new GenericRow(2)
+ flussRow.setField(0, java.lang.Byte.valueOf(10.toByte))
+ flussRow.setField(1, java.lang.Byte.valueOf((-5).toByte))
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ assertThat(sparkRow.getByte(0)).isEqualTo(10.toByte)
+ assertThat(sparkRow.getByte(1)).isEqualTo((-5).toByte)
+ }
+
+ test("getShort: read short values") {
+ val rowType = RowType
+ .builder()
+ .field("s1", DataTypes.SMALLINT)
+ .field("s2", DataTypes.SMALLINT)
+ .build()
+
+ val flussRow = new GenericRow(2)
+ flussRow.setField(0, java.lang.Short.valueOf(100.toShort))
+ flussRow.setField(1, java.lang.Short.valueOf((-200).toShort))
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ assertThat(sparkRow.getShort(0)).isEqualTo(100.toShort)
+ assertThat(sparkRow.getShort(1)).isEqualTo((-200).toShort)
+ }
+
+ test("getInt: read integer values") {
+ val rowType = RowType
+ .builder()
+ .field("num1", DataTypes.INT)
+ .field("num2", DataTypes.INT)
+ .build()
+
+ val flussRow = new GenericRow(2)
+ flussRow.setField(0, Integer.valueOf(42))
+ flussRow.setField(1, Integer.valueOf(-999))
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ assertThat(sparkRow.getInt(0)).isEqualTo(42)
+ assertThat(sparkRow.getInt(1)).isEqualTo(-999)
+ }
+
+ test("getLong: read BIGINT values") {
+ val rowType = RowType
+ .builder()
+ .field("big1", DataTypes.BIGINT)
+ .field("big2", DataTypes.BIGINT)
+ .build()
+
+ val flussRow = new GenericRow(2)
+ flussRow.setField(0, java.lang.Long.valueOf(123456789L))
+ flussRow.setField(1, java.lang.Long.valueOf(-987654321L))
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ assertThat(sparkRow.getLong(0)).isEqualTo(123456789L)
+ assertThat(sparkRow.getLong(1)).isEqualTo(-987654321L)
+ }
+
+ test("getLong: read TIMESTAMP_WITHOUT_TIME_ZONE") {
+ val timestampType = new TimestampType(3)
+ val rowType = RowType
+ .builder()
+ .field("ts", timestampType)
+ .build()
+
+ val timestamp = TimestampNtz.fromMillis(1234567890L)
+ val flussRow = new GenericRow(1)
+ flussRow.setField(0, timestamp)
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ // Spark expects microseconds
+ assertThat(sparkRow.getLong(0)).isEqualTo(1234567890000L)
+ }
+
+ test("getLong: read TIMESTAMP_WITH_LOCAL_TIME_ZONE") {
+ val timestampType = new LocalZonedTimestampType(3)
+ val rowType = RowType
+ .builder()
+ .field("ts_ltz", timestampType)
+ .build()
+
+ val timestamp = TimestampLtz.fromEpochMillis(9876543210L)
+ val flussRow = new GenericRow(1)
+ flussRow.setField(0, timestamp)
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ // Spark expects microseconds
+ assertThat(sparkRow.getLong(0)).isEqualTo(9876543210000L)
+ }
+
+ test("getFloat: read float values") {
+ val rowType = RowType
+ .builder()
+ .field("f1", DataTypes.FLOAT)
+ .field("f2", DataTypes.FLOAT)
+ .build()
+
+ val flussRow = new GenericRow(2)
+ flussRow.setField(0, java.lang.Float.valueOf(3.14f))
+ flussRow.setField(1, java.lang.Float.valueOf(-2.5f))
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ assertThat(sparkRow.getFloat(0)).isEqualTo(3.14f)
+ assertThat(sparkRow.getFloat(1)).isEqualTo(-2.5f)
+ }
+
+ test("getDouble: read double values") {
+ val rowType = RowType
+ .builder()
+ .field("d1", DataTypes.DOUBLE)
+ .field("d2", DataTypes.DOUBLE)
+ .build()
+
+ val flussRow = new GenericRow(2)
+ flussRow.setField(0, java.lang.Double.valueOf(2.718281828))
+ flussRow.setField(1, java.lang.Double.valueOf(-1.414))
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ assertThat(sparkRow.getDouble(0)).isEqualTo(2.718281828)
+ assertThat(sparkRow.getDouble(1)).isEqualTo(-1.414)
+ }
+
+ test("getDecimal: read decimal values") {
+ val rowType = RowType
+ .builder()
+ .field("price", DataTypes.DECIMAL(10, 2))
+ .build()
+
+ val decimal = FlussDecimal.fromBigDecimal(new
java.math.BigDecimal("123.45"), 10, 2)
+ val flussRow = new GenericRow(1)
+ flussRow.setField(0, decimal)
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ val sparkDecimal = sparkRow.getDecimal(0, 10, 2)
+ assertThat(sparkDecimal.toBigDecimal.doubleValue()).isEqualTo(123.45)
+ }
+
+ test("getUTF8String: read string values") {
+ val rowType = RowType
+ .builder()
+ .field("name", DataTypes.STRING)
+ .field("desc", DataTypes.STRING)
+ .build()
+
+ val flussRow = new GenericRow(2)
+ flussRow.setField(0, BinaryString.fromString("Hello"))
+ flussRow.setField(1, BinaryString.fromString("World"))
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ assertThat(sparkRow.getUTF8String(0).toString).isEqualTo("Hello")
+ assertThat(sparkRow.getUTF8String(1).toString).isEqualTo("World")
+ }
+
+ test("getUTF8String: read UTF-8 characters") {
+ val rowType = RowType
+ .builder()
+ .field("chinese", DataTypes.STRING)
+ .field("emoji", DataTypes.STRING)
+ .build()
+
+ val flussRow = new GenericRow(2)
+ flussRow.setField(0, BinaryString.fromString("你好世界"))
+ flussRow.setField(1, BinaryString.fromString("😊🎉"))
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ assertThat(sparkRow.getUTF8String(0).toString).isEqualTo("你好世界")
+ assertThat(sparkRow.getUTF8String(1).toString).isEqualTo("😊🎉")
+ }
+
+ test("getBinary: read binary values") {
+ val binaryType = new BinaryType(5)
+ val rowType = RowType
+ .builder()
+ .field("data", binaryType)
+ .build()
+
+ val binaryData = Array[Byte](1, 2, 3, 4, 5)
+ val flussRow = new GenericRow(1)
+ flussRow.setField(0, binaryData)
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ val result = sparkRow.getBinary(0)
+ assertThat(result).isEqualTo(binaryData)
+ }
+
+ test("getStruct: read nested row") {
+ val innerRowType = RowType
+ .builder()
+ .field("city", DataTypes.STRING)
+ .field("zipcode", DataTypes.INT)
+ .build()
+
+ val outerRowType = RowType
+ .builder()
+ .field("id", DataTypes.INT)
+ .field("address", innerRowType)
+ .build()
+
+ val innerRow = new GenericRow(2)
+ innerRow.setField(0, BinaryString.fromString("Beijing"))
+ innerRow.setField(1, Integer.valueOf(100000))
+
+ val outerRow = new GenericRow(2)
+ outerRow.setField(0, Integer.valueOf(1))
+ outerRow.setField(1, innerRow)
+
+ val sparkRow = new FlussAsSparkRow(outerRowType).replace(outerRow)
+
+ assertThat(sparkRow.getInt(0)).isEqualTo(1)
+
+ val innerSparkRow = sparkRow.getStruct(1, 2)
+ assertThat(innerSparkRow.getUTF8String(0).toString).isEqualTo("Beijing")
+ assertThat(innerSparkRow.getInt(1)).isEqualTo(100000)
+ }
+
+ test("getArray: read array field") {
+ val arrayType = new ArrayType(DataTypes.INT)
+ val rowType = RowType
+ .builder()
+ .field("numbers", arrayType)
+ .build()
+
+ val array = new GenericArray(
+ Array[Object](
+ Integer.valueOf(1),
+ Integer.valueOf(2),
+ Integer.valueOf(3),
+ Integer.valueOf(4),
+ Integer.valueOf(5)))
+ val flussRow = new GenericRow(1)
+ flussRow.setField(0, array)
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ val sparkArray = sparkRow.getArray(0)
+ assertThat(sparkArray.numElements()).isEqualTo(5)
+ assertThat(sparkArray.getInt(0)).isEqualTo(1)
+ assertThat(sparkArray.getInt(4)).isEqualTo(5)
+ }
+
+ test("getMap: unsupported operation") {
+ val rowType = RowType
+ .builder()
+ .field("dummy", DataTypes.INT)
+ .build()
+
+ val flussRow = new GenericRow(1)
+ flussRow.setField(0, Integer.valueOf(1))
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ assertThrows[UnsupportedOperationException] {
+ sparkRow.getMap(0)
+ }
+ }
+
+ test("getInterval: unsupported operation") {
+ val rowType = RowType
+ .builder()
+ .field("dummy", DataTypes.INT)
+ .build()
+
+ val flussRow = new GenericRow(1)
+ flussRow.setField(0, Integer.valueOf(1))
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ assertThrows[UnsupportedOperationException] {
+ sparkRow.getInterval(0)
+ }
+ }
+
+ test("setNullAt: unsupported operation") {
+ val rowType = RowType
+ .builder()
+ .field("col", DataTypes.INT)
+ .build()
+
+ val flussRow = new GenericRow(1)
+ flussRow.setField(0, Integer.valueOf(1))
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ assertThrows[UnsupportedOperationException] {
+ sparkRow.setNullAt(0)
+ }
+ }
+
+ test("update: unsupported operation") {
+ val rowType = RowType
+ .builder()
+ .field("col", DataTypes.INT)
+ .build()
+
+ val flussRow = new GenericRow(1)
+ flussRow.setField(0, Integer.valueOf(1))
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ assertThrows[UnsupportedOperationException] {
+ sparkRow.update(0, Integer.valueOf(2))
+ }
+ }
+
+ test("copy: creates deep copy") {
+ val rowType = RowType
+ .builder()
+ .field("id", DataTypes.INT)
+ .field("name", DataTypes.STRING)
+ .build()
+
+ val flussRow = new GenericRow(2)
+ flussRow.setField(0, Integer.valueOf(1))
+ flussRow.setField(1, BinaryString.fromString("Original"))
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+ val copiedRow = sparkRow.copy()
+
+ assertThat(copiedRow.getInt(0)).isEqualTo(1)
+ assertThat(copiedRow.getUTF8String(1).toString).isEqualTo("Original")
+
+ // Verify it's a different instance
+ assertThat(copiedRow).isNotSameAs(sparkRow)
+ }
+
+ test("replace: reuses wrapper instance") {
+ val rowType = RowType
+ .builder()
+ .field("val", DataTypes.INT)
+ .build()
+
+ val row1 = new GenericRow(1)
+ row1.setField(0, Integer.valueOf(1))
+
+ val row2 = new GenericRow(1)
+ row2.setField(0, Integer.valueOf(2))
+
+ val sparkRow = new FlussAsSparkRow(rowType)
+ sparkRow.replace(row1)
+ assertThat(sparkRow.getInt(0)).isEqualTo(1)
+
+ sparkRow.replace(row2)
+ assertThat(sparkRow.getInt(0)).isEqualTo(2)
+ }
+
+ test("complex row: all data types") {
+ val rowType = RowType
+ .builder()
+ .field("bool_col", DataTypes.BOOLEAN)
+ .field("byte_col", DataTypes.TINYINT)
+ .field("short_col", DataTypes.SMALLINT)
+ .field("int_col", DataTypes.INT)
+ .field("long_col", DataTypes.BIGINT)
+ .field("float_col", DataTypes.FLOAT)
+ .field("double_col", DataTypes.DOUBLE)
+ .field("string_col", DataTypes.STRING)
+ .field("decimal_col", DataTypes.DECIMAL(10, 2))
+ .build()
+
+ val flussRow = new GenericRow(9)
+ flussRow.setField(0, java.lang.Boolean.TRUE)
+ flussRow.setField(1, java.lang.Byte.valueOf(10.toByte))
+ flussRow.setField(2, java.lang.Short.valueOf(100.toShort))
+ flussRow.setField(3, Integer.valueOf(1000))
+ flussRow.setField(4, java.lang.Long.valueOf(10000L))
+ flussRow.setField(5, java.lang.Float.valueOf(3.14f))
+ flussRow.setField(6, java.lang.Double.valueOf(2.718))
+ flussRow.setField(7, BinaryString.fromString("test"))
+ flussRow.setField(8, FlussDecimal.fromBigDecimal(new
java.math.BigDecimal("99.99"), 10, 2))
+
+ val sparkRow = new FlussAsSparkRow(rowType).replace(flussRow)
+
+ assertThat(sparkRow.getBoolean(0)).isTrue()
+ assertThat(sparkRow.getByte(1)).isEqualTo(10.toByte)
+ assertThat(sparkRow.getShort(2)).isEqualTo(100.toShort)
+ assertThat(sparkRow.getInt(3)).isEqualTo(1000)
+ assertThat(sparkRow.getLong(4)).isEqualTo(10000L)
+ assertThat(sparkRow.getFloat(5)).isEqualTo(3.14f)
+ assertThat(sparkRow.getDouble(6)).isEqualTo(2.718)
+ assertThat(sparkRow.getUTF8String(7).toString).isEqualTo("test")
+ assertThat(sparkRow.getDecimal(8, 10,
2).toBigDecimal.doubleValue()).isEqualTo(99.99)
+ }
+}