wuchong commented on code in PR #2377:
URL: https://github.com/apache/fluss/pull/2377#discussion_r2707572177


##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussPartitionReader.scala:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.{Connection, ConnectionFactory}
+import org.apache.fluss.client.table.Table
+import org.apache.fluss.client.table.scanner.ScanRecord
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.{TableInfo, TablePath}
+import org.apache.fluss.row.{InternalRow => FlussInternalRow}
+import org.apache.fluss.spark.row.DataConverter
+import org.apache.fluss.types.RowType
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.read.PartitionReader
+
+import java.time.Duration
+
+abstract class FlussPartitionReader(tablePath: TablePath, flussConfig: 
Configuration)
+  extends PartitionReader[InternalRow] {
+
+  protected val POLL_TIMEOUT: Duration = Duration.ofMillis(100)
+  protected lazy val conn: Connection = 
ConnectionFactory.createConnection(flussConfig)
+  protected lazy val table: Table = conn.getTable(tablePath)
+  protected lazy val tableInfo: TableInfo = 
conn.getAdmin.getTableInfo(tablePath).get()

Review Comment:
   The `tableInfo` can be got from `table`: `table.getTableInfo`.



##########
fluss-common/src/main/java/org/apache/fluss/utils/InternalRowUtils.java:
##########
@@ -22,15 +22,117 @@
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
  * additional information regarding copyright ownership. */
 
+import org.apache.fluss.row.BinaryMap;
+import org.apache.fluss.row.BinaryRow;
 import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.GenericArray;
+import org.apache.fluss.row.GenericMap;
+import org.apache.fluss.row.GenericRow;
+import org.apache.fluss.row.InternalArray;
+import org.apache.fluss.row.InternalMap;
+import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.ArrayType;
+import org.apache.fluss.types.DataType;
 import org.apache.fluss.types.DataTypeRoot;
+import org.apache.fluss.types.MapType;
+import org.apache.fluss.types.RowType;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /** Utility class for {@link org.apache.fluss.row.InternalRow} related 
operations. */
 public class InternalRowUtils {
 
+    public static InternalRow copyRow(InternalRow row, RowType rowType) {
+        if (row instanceof BinaryRow) {
+            return ((BinaryRow) row).copy();
+        } else {
+            InternalRow.FieldGetter[] fieldGetters = 
InternalRow.createFieldGetters(rowType);
+            GenericRow genericRow = new GenericRow(row.getFieldCount());
+            for (int i = 0; i < row.getFieldCount(); i++) {
+                genericRow.setField(
+                        i, copyRow(fieldGetters[i].getFieldOrNull(row), 
rowType.getTypeAt(i)));
+            }
+            return genericRow;
+        }
+    }
+
+    public static InternalArray copyArray(InternalArray from, DataType 
eleType) {
+        if (!eleType.isNullable()) {
+            switch (eleType.getTypeRoot()) {
+                case BOOLEAN:
+                    return new GenericArray(from.toBooleanArray());
+                case TINYINT:
+                    return new GenericArray(from.toByteArray());
+                case SMALLINT:
+                    return new GenericArray(from.toShortArray());
+                case INTEGER:
+                case DATE:
+                case TIME_WITHOUT_TIME_ZONE:
+                    return new GenericArray(from.toIntArray());
+                case BIGINT:
+                    return new GenericArray(from.toLongArray());
+                case FLOAT:
+                    return new GenericArray(from.toFloatArray());
+                case DOUBLE:
+                    return new GenericArray(from.toDoubleArray());
+            }
+        }
+
+        InternalArray.ElementGetter elementGetter = 
InternalArray.createElementGetter(eleType);
+        Object[] newArray = new Object[from.size()];
+        for (int i = 0; i < newArray.length; ++i) {
+            if (!from.isNullAt(i)) {
+                newArray[i] = copyRow(elementGetter.getElementOrNull(from, i), 
eleType);
+            } else {
+                newArray[i] = null;
+            }
+        }
+        return new GenericArray(newArray);
+    }
+
+    public static InternalMap copyMap(InternalMap map, DataType keyType, 
DataType valueType) {
+        if (map instanceof BinaryMap) {
+            return ((BinaryMap) map).copy();
+        }
+        InternalArray.ElementGetter keyGetter = 
InternalArray.createElementGetter(keyType);
+        InternalArray.ElementGetter valueGetter = 
InternalArray.createElementGetter(valueType);
+        Map<Object, Object> newMap = new HashMap<>();
+        InternalArray keys = map.keyArray();
+        InternalArray values = map.valueArray();
+        for (int i = 0; i < keys.size(); i++) {
+            newMap.put(
+                    copyRow(keyGetter.getElementOrNull(keys, i), keyType),
+                    copyRow(valueGetter.getElementOrNull(values, i), 
valueType));
+        }
+        return new GenericMap(newMap);
+    }
+
+    public static Object copyRow(Object o, DataType type) {

Review Comment:
   This method name conflicts with `copyRow(InternalRow row, RowType rowType)` 
and may be confusing. Can we rename it to `copyValue` to distinguish with 
`copyRow`?
   
   Besides, since it is only called `InternalRowUtils`, we can change the 
visibility of this method and `copyArray`, `copyMap` to `private`. 



##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.table.scanner.ScanRecord
+import org.apache.fluss.client.table.scanner.batch.BatchScanner
+import org.apache.fluss.client.table.scanner.log.{LogScanner, ScanRecords}
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.{TableBucket, TablePath}
+
+import java.util
+
+/** Partition reader that reads primary key table data. */
+class FlussUpsertPartitionReader(
+    tablePath: TablePath,
+    projection: Array[Int],
+    flussPartition: FlussUpsertInputPartition,
+    flussConfig: Configuration)
+  extends FlussPartitionReader(tablePath, flussConfig) {
+
+  private val tableBucket: TableBucket = flussPartition.tableBucket
+  private val partitionId = tableBucket.getPartitionId
+  private val bucketId = tableBucket.getBucket
+  private val snapshotId: Long = flussPartition.snapshotId
+  private val logStartingOffset: Long = flussPartition.logStartingOffset
+
+  // KV Snapshot Reader (if snapshot exists)
+  private var snapshotScanner: BatchScanner = _
+  private var snapshotIterator: 
util.Iterator[org.apache.fluss.row.InternalRow] = _
+  private var snapshotFinished = false
+
+  // Log Scanner for incremental data
+  private var logScanner: LogScanner = _
+  private var logRecords: util.Iterator[ScanRecord] = _
+
+  // initialize scanners
+  initialize()
+
+  override def next(): Boolean = {
+    if (closed) {
+      return false
+    }
+
+    // Phase 1: Read snapshot if not finished
+    if (!snapshotFinished) {
+      if (snapshotIterator == null || !snapshotIterator.hasNext) {
+        // Try to get next batch from snapshot scanner
+        val batch = snapshotScanner.pollBatch(POLL_TIMEOUT)
+        if (batch == null) {
+          // Snapshot reading finished
+          snapshotFinished = true
+          if (snapshotScanner != null) {
+            snapshotScanner.close()
+            snapshotScanner = null
+          }
+
+          // Subscribe to log scanner for incremental data
+          subscribeLogScanner()
+          return next()
+        } else {
+          snapshotIterator = batch
+          if (snapshotIterator.hasNext) {
+            currentRow = convertToSparkRow(snapshotIterator.next())
+            return true
+          } else {
+            return next()
+          }
+        }
+      } else {
+        // get data from current snapshot batch
+        currentRow = convertToSparkRow(snapshotIterator.next())
+        return true
+      }
+    }
+
+    // Phase 2: Read incremental log
+    if (logRecords != null && logRecords.hasNext) {
+      val scanRecord = logRecords.next()
+      currentRow = convertToSparkRow(scanRecord)
+      return true
+    }
+
+    // Poll for more log records
+    val scanRecords: ScanRecords = logScanner.poll(POLL_TIMEOUT)

Review Comment:
   `logScanner.poll()` is a best-effort API: it may return an empty result due 
to transient issues (e.g., network glitches) even when unread log records 
remain on the server. Therefore, we should poll in a loop until we reach the 
known end offset.
   
   The end offset should be determined at job startup using 
`OffsetsInitializer.latest().getBucketOffsets(...)`, which gives us the 
high-watermark for each bucket at the beginning of the batch job.
   
   Since there’s no built-in API to read a bounded log split, we must manually:
   
   - Skip any records with offsets beyond the precomputed end offset, and
   - Signal there is no `next` once all buckets have reached their respective 
end offsets.



##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.{Connection, ConnectionFactory}
+import org.apache.fluss.client.admin.Admin
+import org.apache.fluss.client.metadata.KvSnapshots
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.{PartitionInfo, TableBucket, TableInfo, 
TablePath}
+
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, 
PartitionReaderFactory}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+abstract class FlussBatch(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    readSchema: StructType,
+    flussConfig: Configuration)
+  extends Batch
+  with AutoCloseable {
+
+  lazy val conn: Connection = ConnectionFactory.createConnection(flussConfig)
+
+  lazy val admin: Admin = conn.getAdmin
+
+  lazy val partitionInfos: util.List[PartitionInfo] = 
admin.listPartitionInfos(tablePath).get()
+
+  protected def projection: Array[Int] = {
+    val columnNameToIndex = 
tableInfo.getSchema.getColumnNames.asScala.zipWithIndex.toMap
+    readSchema.fields.map {
+      field =>
+        columnNameToIndex.getOrElse(
+          field.name,
+          throw new IllegalArgumentException(s"Invalid field name: 
${field.name}"))
+    }
+  }
+
+  override def close(): Unit = {
+    if (admin != null) {
+      admin.close()
+    }
+    if (conn != null) {
+      conn.close()
+    }
+  }
+}
+
+/** Batch for reading log table (append-only table). */
+class FlussAppendBatch(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    readSchema: StructType,
+    options: CaseInsensitiveStringMap,
+    flussConfig: Configuration)
+  extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
+
+  override def planInputPartitions(): Array[InputPartition] = {
+    def createPartitions(partitionId: Option[Long]): Array[InputPartition] = {
+      (0 until tableInfo.getNumBuckets).map {
+        bucketId =>
+          val tableBucket = partitionId match {
+            case Some(partitionId) =>
+              new TableBucket(tableInfo.getTableId, partitionId, bucketId)
+            case None =>
+              new TableBucket(tableInfo.getTableId, bucketId)
+          }
+          FlussAppendInputPartition(tableBucket).asInstanceOf[InputPartition]
+      }.toArray
+    }
+
+    if (tableInfo.isPartitioned) {
+      partitionInfos.asScala.flatMap {
+        partitionInfo => createPartitions(Some(partitionInfo.getPartitionId))
+      }.toArray
+    } else {
+      createPartitions(None)
+    }
+  }
+
+  override def createReaderFactory(): PartitionReaderFactory = {
+    new FlussAppendPartitionReaderFactory(tablePath, projection, options, 
flussConfig)
+  }
+
+}
+
+/** Batch for reading primary key table (upsert table). */
+class FlussUpsertBatch(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    readSchema: StructType,
+    options: CaseInsensitiveStringMap,
+    flussConfig: Configuration)
+  extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
+
+  override def planInputPartitions(): Array[InputPartition] = {
+    def createPartitions(kvSnapshots: KvSnapshots): Array[InputPartition] = {
+      val tableId = kvSnapshots.getTableId
+      val partitionId = kvSnapshots.getPartitionId
+      kvSnapshots.getBucketIds.asScala
+        .map {
+          bucketId =>
+            val tableBucket = new TableBucket(tableId, partitionId, bucketId)
+            val snapshotIdOpt = kvSnapshots.getSnapshotId(bucketId)
+            val logOffsetOpt = kvSnapshots.getLogOffset(bucketId)
+
+            if (snapshotIdOpt.isPresent) {
+              assert(
+                logOffsetOpt.isPresent,
+                "Log offset must be present when snapshot id is present")
+
+              // Create hybrid partition
+              FlussUpsertInputPartition(
+                tableBucket,
+                snapshotIdOpt.getAsLong,
+                logOffsetOpt.getAsLong
+              )
+            } else {
+              // No snapshot yet, only read log from beginning
+              FlussUpsertInputPartition(tableBucket, -1L, 0L)

Review Comment:
   We should use 
`org.apache.fluss.client.table.scanner.log.LogScanner#EARLIEST_OFFSET` instead 
of `0` to indicate reading log from beginning. Because the `0L` offset maybe 
TTLed, and be thrown `LogOffsetOutOfRangeException`. 



##########
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkReadTest.scala:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.spark.sql.Row
+
+class SparkReadTest extends FlussSparkTestBase {
+
+  test("Spark Read: log table") {
+    withTable("t") {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT, 
amount INT, address STRING)
+             |""".stripMargin)
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t
+             |VALUES
+             |(600L, 21L, 601, "addr1"), (700L, 22L, 602, "addr2"),
+             |(800L, 23L, 603, "addr3"), (900L, 24L, 604, "addr4"),
+             |(1000L, 25L, 605, "addr5")
+             |""".stripMargin)
+
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+        Row(600L, 21L, 601, "addr1") ::
+          Row(700L, 22L, 602, "addr2") ::
+          Row(800L, 23L, 603, "addr3") ::
+          Row(900L, 24L, 604, "addr4") ::
+          Row(1000L, 25L, 605, "addr5") :: Nil
+      )
+
+      // projection
+      checkAnswer(
+        sql(s"SELECT address, itemId FROM $DEFAULT_DATABASE.t ORDER BY 
orderId"),
+        Row("addr1", 21L) ::
+          Row("addr2", 22L) ::
+          Row("addr3", 23L) ::
+          Row("addr4", 24L) ::
+          Row("addr5", 25L) :: Nil
+      )
+
+      // filter
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t WHERE amount % 2 = 0 ORDER BY 
orderId"),
+        Row(700L, 22L, 602, "addr2") ::
+          Row(900L, 24L, 604, "addr4") :: Nil
+      )
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(700L, 220L, 602, "addr2"),
+             |(900L, 240L, 604, "addr4"),
+             |(1100L, 260L, 606, "addr6")
+             |""".stripMargin)
+      // projection + filter
+      checkAnswer(
+        sql(s"""
+               |SELECT orderId, itemId FROM $DEFAULT_DATABASE.t
+               |WHERE orderId >= 900 ORDER BY orderId, itemId""".stripMargin),
+        Row(900L, 24L) ::
+          Row(900L, 240L) ::
+          Row(1000L, 25L) ::
+          Row(1100L, 260L) :: Nil
+      )
+    }
+  }
+
+  test("Spark Read: primary key table") {
+    withTable("t") {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT, 
amount INT, address STRING)
+             |TBLPROPERTIES("primary.key" = "orderId")
+             |""".stripMargin)
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(600L, 21L, 601, "addr1"), (700L, 22L, 602, "addr2"),
+             |(800L, 23L, 603, "addr3"), (900L, 24L, 604, "addr4"),
+             |(1000L, 25L, 605, "addr5")
+             |""".stripMargin)
+
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+        Row(600L, 21L, 601, "addr1") ::
+          Row(700L, 22L, 602, "addr2") ::
+          Row(800L, 23L, 603, "addr3") ::
+          Row(900L, 24L, 604, "addr4") ::
+          Row(1000L, 25L, 605, "addr5") :: Nil
+      )
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(700L, 220L, 602, "addr2"),
+             |(900L, 240L, 604, "addr4"),
+             |(1100L, 260L, 606, "addr6")
+             |""".stripMargin)
+
+      checkAnswer(
+        sql(s"""
+               |SELECT orderId, itemId, address FROM $DEFAULT_DATABASE.t
+               |WHERE amount <= 603 ORDER BY orderId""".stripMargin),
+        Row(600L, 21L, "addr1") ::
+          Row(700L, 220L, "addr2") ::
+          Row(800L, 23L, "addr3") ::
+          Nil
+      )
+    }
+  }
+
+  test("Spark Read: partitioned log table") {
+    withTable("t") {
+      sql(
+        s"""
+           |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT, 
amount INT, address STRING, dt STRING)
+           |PARTITIONED BY (dt)
+           |""".stripMargin
+      )
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(600L, 21L, 601, "addr1", "2026-01-01"), (700L, 22L, 602, 
"addr2", "2026-01-01"),
+             |(800L, 23L, 603, "addr3", "2026-01-02"), (900L, 24L, 604, 
"addr4", "2026-01-02"),
+             |(1000L, 25L, 605, "addr5", "2026-01-03")
+             |""".stripMargin)
+
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+        Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+          Row(700L, 22L, 602, "addr2", "2026-01-01") ::
+          Row(800L, 23L, 603, "addr3", "2026-01-02") ::
+          Row(900L, 24L, 604, "addr4", "2026-01-02") ::
+          Row(1000L, 25L, 605, "addr5", "2026-01-03") :: Nil
+      )
+
+      // Read with partition filter
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t WHERE dt = '2026-01-01' ORDER 
BY orderId"),
+        Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+          Row(700L, 22L, 602, "addr2", "2026-01-01") :: Nil
+      )
+
+      // Read with multiple partitions filter
+      checkAnswer(
+        sql(s"""
+               |SELECT orderId, address, dt FROM $DEFAULT_DATABASE.t
+               |WHERE dt IN ('2026-01-01', '2026-01-02')
+               |ORDER BY orderId""".stripMargin),
+        Row(600L, "addr1", "2026-01-01") ::
+          Row(700L, "addr2", "2026-01-01") ::
+          Row(800L, "addr3", "2026-01-02") ::
+          Row(900L, "addr4", "2026-01-02") :: Nil
+      )
+    }
+  }
+
+  test("Spark Read: partitioned primary key table") {
+    withTable("t") {
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT, 
amount INT, address STRING, dt STRING)
+             |PARTITIONED BY (dt)
+             |TBLPROPERTIES("primary.key" = "orderId,dt")
+             |""".stripMargin)
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(600L, 21L, 601, "addr1", "2026-01-01"), (700L, 22L, 602, 
"addr2", "2026-01-01"),
+             |(800L, 23L, 603, "addr3", "2026-01-02"), (900L, 24L, 604, 
"addr4", "2026-01-02"),
+             |(1000L, 25L, 605, "addr5", "2026-01-03")
+             |""".stripMargin)
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(700L, 220L, 602, "addr2_updated", "2026-01-01"),
+             |(900L, 240L, 604, "addr4_updated", "2026-01-02"),
+             |(1100L, 260L, 606, "addr6", "2026-01-03")
+             |""".stripMargin)
+
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+        Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+          Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
+          Row(800L, 23L, 603, "addr3", "2026-01-02") ::
+          Row(900L, 240L, 604, "addr4_updated", "2026-01-02") ::
+          Row(1000L, 25L, 605, "addr5", "2026-01-03") ::
+          Row(1100L, 260L, 606, "addr6", "2026-01-03") ::

Review Comment:
   Currently, the test passes even if changelog reading is not properly 
implemented. This is because the test base uses a very short KV snapshot 
interval (1 second), so the reader always falls back to the KV snapshot and 
never actually consumes the changelog.
   
   I think it’s acceptable to keep this as-is for now, since we plan to 
refactor the changelog merge-read logic in upcoming PRs, as discussed offline. 
But please create an issue to track this. 



##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussBatch.scala:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.{Connection, ConnectionFactory}
+import org.apache.fluss.client.admin.Admin
+import org.apache.fluss.client.metadata.KvSnapshots
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.{PartitionInfo, TableBucket, TableInfo, 
TablePath}
+
+import org.apache.spark.sql.connector.read.{Batch, InputPartition, 
PartitionReaderFactory}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+abstract class FlussBatch(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    readSchema: StructType,
+    flussConfig: Configuration)
+  extends Batch
+  with AutoCloseable {
+
+  lazy val conn: Connection = ConnectionFactory.createConnection(flussConfig)
+
+  lazy val admin: Admin = conn.getAdmin
+
+  lazy val partitionInfos: util.List[PartitionInfo] = 
admin.listPartitionInfos(tablePath).get()
+
+  protected def projection: Array[Int] = {
+    val columnNameToIndex = 
tableInfo.getSchema.getColumnNames.asScala.zipWithIndex.toMap
+    readSchema.fields.map {
+      field =>
+        columnNameToIndex.getOrElse(
+          field.name,
+          throw new IllegalArgumentException(s"Invalid field name: 
${field.name}"))
+    }
+  }
+
+  override def close(): Unit = {
+    if (admin != null) {
+      admin.close()
+    }
+    if (conn != null) {
+      conn.close()
+    }
+  }
+}
+
+/** Batch for reading log table (append-only table). */
+class FlussAppendBatch(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    readSchema: StructType,
+    options: CaseInsensitiveStringMap,
+    flussConfig: Configuration)
+  extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
+
+  override def planInputPartitions(): Array[InputPartition] = {
+    def createPartitions(partitionId: Option[Long]): Array[InputPartition] = {
+      (0 until tableInfo.getNumBuckets).map {
+        bucketId =>
+          val tableBucket = partitionId match {
+            case Some(partitionId) =>
+              new TableBucket(tableInfo.getTableId, partitionId, bucketId)
+            case None =>
+              new TableBucket(tableInfo.getTableId, bucketId)
+          }
+          FlussAppendInputPartition(tableBucket).asInstanceOf[InputPartition]
+      }.toArray
+    }
+
+    if (tableInfo.isPartitioned) {
+      partitionInfos.asScala.flatMap {
+        partitionInfo => createPartitions(Some(partitionInfo.getPartitionId))
+      }.toArray
+    } else {
+      createPartitions(None)
+    }
+  }
+
+  override def createReaderFactory(): PartitionReaderFactory = {
+    new FlussAppendPartitionReaderFactory(tablePath, projection, options, 
flussConfig)
+  }
+
+}
+
+/** Batch for reading primary key table (upsert table). */
+class FlussUpsertBatch(
+    tablePath: TablePath,
+    tableInfo: TableInfo,
+    readSchema: StructType,
+    options: CaseInsensitiveStringMap,
+    flussConfig: Configuration)
+  extends FlussBatch(tablePath, tableInfo, readSchema, flussConfig) {
+
+  override def planInputPartitions(): Array[InputPartition] = {
+    def createPartitions(kvSnapshots: KvSnapshots): Array[InputPartition] = {
+      val tableId = kvSnapshots.getTableId
+      val partitionId = kvSnapshots.getPartitionId
+      kvSnapshots.getBucketIds.asScala
+        .map {
+          bucketId =>
+            val tableBucket = new TableBucket(tableId, partitionId, bucketId)
+            val snapshotIdOpt = kvSnapshots.getSnapshotId(bucketId)
+            val logOffsetOpt = kvSnapshots.getLogOffset(bucketId)
+
+            if (snapshotIdOpt.isPresent) {
+              assert(
+                logOffsetOpt.isPresent,
+                "Log offset must be present when snapshot id is present")
+
+              // Create hybrid partition
+              FlussUpsertInputPartition(
+                tableBucket,
+                snapshotIdOpt.getAsLong,
+                logOffsetOpt.getAsLong

Review Comment:
   Since this is a batch InputPartition, we should add an end offset to make 
the log split bounded. The latest end offset can be got from 
`OffsetsInitializer.latest().getBucketOffsets(..)` method. 
   
   We should:
   1. fetch the latest `kvSnapshots`, it is a `map<bucket, 
snapshot_id&log_start_offset>`.
   2. fetch the latest offset from `OffsetsInitializer.latest`, it is a 
`map<bucket, log_end_offset>`.
   3. Join the `kvSnapshots` and `OffsetsInitializer.latest`, to generate a 
input partition list for each bucket. 



##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.read
+
+import org.apache.fluss.client.table.scanner.ScanRecord
+import org.apache.fluss.client.table.scanner.batch.BatchScanner
+import org.apache.fluss.client.table.scanner.log.{LogScanner, ScanRecords}
+import org.apache.fluss.config.Configuration
+import org.apache.fluss.metadata.{TableBucket, TablePath}
+
+import java.util
+
+/** Partition reader that reads primary key table data. */
+class FlussUpsertPartitionReader(
+    tablePath: TablePath,
+    projection: Array[Int],
+    flussPartition: FlussUpsertInputPartition,
+    flussConfig: Configuration)
+  extends FlussPartitionReader(tablePath, flussConfig) {
+
+  private val tableBucket: TableBucket = flussPartition.tableBucket
+  private val partitionId = tableBucket.getPartitionId
+  private val bucketId = tableBucket.getBucket
+  private val snapshotId: Long = flussPartition.snapshotId
+  private val logStartingOffset: Long = flussPartition.logStartingOffset
+
+  // KV Snapshot Reader (if snapshot exists)
+  private var snapshotScanner: BatchScanner = _
+  private var snapshotIterator: 
util.Iterator[org.apache.fluss.row.InternalRow] = _
+  private var snapshotFinished = false
+
+  // Log Scanner for incremental data
+  private var logScanner: LogScanner = _
+  private var logRecords: util.Iterator[ScanRecord] = _
+
+  // initialize scanners
+  initialize()
+
+  override def next(): Boolean = {
+    if (closed) {
+      return false
+    }
+
+    // Phase 1: Read snapshot if not finished
+    if (!snapshotFinished) {
+      if (snapshotIterator == null || !snapshotIterator.hasNext) {
+        // Try to get next batch from snapshot scanner
+        val batch = snapshotScanner.pollBatch(POLL_TIMEOUT)
+        if (batch == null) {
+          // Snapshot reading finished
+          snapshotFinished = true
+          if (snapshotScanner != null) {
+            snapshotScanner.close()
+            snapshotScanner = null
+          }
+
+          // Subscribe to log scanner for incremental data
+          subscribeLogScanner()
+          return next()
+        } else {
+          snapshotIterator = batch
+          if (snapshotIterator.hasNext) {
+            currentRow = convertToSparkRow(snapshotIterator.next())
+            return true
+          } else {
+            return next()
+          }
+        }
+      } else {
+        // get data from current snapshot batch
+        currentRow = convertToSparkRow(snapshotIterator.next())
+        return true
+      }
+    }
+
+    // Phase 2: Read incremental log
+    if (logRecords != null && logRecords.hasNext) {
+      val scanRecord = logRecords.next()
+      currentRow = convertToSparkRow(scanRecord)
+      return true
+    }
+
+    // Poll for more log records
+    val scanRecords: ScanRecords = logScanner.poll(POLL_TIMEOUT)
+
+    if (scanRecords == null || scanRecords.isEmpty) {
+      return false
+    }
+
+    // Get records for our bucket
+    val bucketRecords = scanRecords.records(tableBucket)
+    if (bucketRecords.isEmpty) {
+      return false
+    }
+
+    logRecords = bucketRecords.iterator()
+    if (logRecords.hasNext) {
+      val scanRecord = logRecords.next()
+      currentRow = convertToSparkRow(scanRecord)

Review Comment:
   The LogRecord is a changelog that contains `-D` (delete) and `-U` 
(update-before) records. To produce a consistent view, we need to merge these 
changes with the KV snapshot data in a union-read fashion—just like how we 
combine data lake snapshots with changelogs.
   
   Fortunately, the KV snapshot scan is already sorted by primary key. We can 
leverage this by:
   
   1. Materializing the delta changes into a temporary `delta` table;
   2. Sorting the `delta` table by primary key using 
`org.apache.fluss.row.encode.KeyEncoder#of(...)`;
   3. Performing a sort-merge between the sorted KV snapshot reader and the 
sorted delta table reader.
   
   This enables an efficient and correct merge without requiring random lookups 
or hash-based joins.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to