This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new d3a935f3b [spark] support merge-read between kv snapshot and log for 
primary-key table (#2523)
d3a935f3b is described below

commit d3a935f3b148a5de3adc6453207ab252bbf68aae
Author: Yann Byron <[email protected]>
AuthorDate: Sun Feb 1 15:47:43 2026 +0800

    [spark] support merge-read between kv snapshot and log for primary-key 
table (#2523)
---
 .../client/table/scanner}/SortMergeReader.java     |  97 +++-----
 .../client/utils/SingleElementHeadIterator.java    |  88 +++++++
 .../client/table/scanner}/SortMergeReaderTest.java |   4 +-
 .../java/org/apache/fluss/row}/KeyValueRow.java    |   5 +-
 .../reader/LakeSnapshotAndLogSplitScanner.java     |   2 +
 .../org/apache/fluss/spark/SparkFlussConf.scala    |   9 +
 .../scala/org/apache/fluss/spark/SparkTable.scala  |  14 +-
 .../spark/read/FlussUpsertPartitionReader.scala    | 206 ++++++++++++++--
 .../fluss/spark/utils/LogChangesIterator.scala     | 146 ++++++++++++
 .../apache/fluss/spark/FlussSparkTestBase.scala    |  23 +-
 ...kReadTest.scala => SparkLogTableReadTest.scala} | 100 +-------
 .../fluss/spark/SparkPrimaryKeyTableReadTest.scala | 262 +++++++++++++++++++++
 12 files changed, 740 insertions(+), 216 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SortMergeReader.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SortMergeReader.java
similarity index 85%
rename from 
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SortMergeReader.java
rename to 
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SortMergeReader.java
index cf9d22925..0be36b584 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SortMergeReader.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SortMergeReader.java
@@ -15,10 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.flink.lake.reader;
+package org.apache.fluss.client.table.scanner;
 
+import org.apache.fluss.client.utils.SingleElementHeadIterator;
 import org.apache.fluss.record.LogRecord;
 import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.KeyValueRow;
 import org.apache.fluss.row.ProjectedRow;
 import org.apache.fluss.utils.CloseableIterator;
 
@@ -33,11 +35,14 @@ import java.util.NoSuchElementException;
 import java.util.PriorityQueue;
 import java.util.function.Function;
 
-/** A sort merge reader to merge lakehouse snapshot record and fluss change 
log. */
-class SortMergeReader {
+/**
+ * A sort merge reader to merge snapshot record and change log. Both of them 
should have the same
+ * primary key encoding when comparing the keys.
+ */
+public class SortMergeReader {
 
     private final ProjectedRow snapshotProjectedPkRow;
-    private final CloseableIterator<LogRecord> lakeRecordIterator;
+    private final CloseableIterator<LogRecord> snapshotRecordIterator;
     private final Comparator<InternalRow> userKeyComparator;
     private CloseableIterator<KeyValueRow> changeLogIterator;
 
@@ -49,13 +54,29 @@ class SortMergeReader {
     public SortMergeReader(
             @Nullable int[] projectedFields,
             int[] pkIndexes,
-            List<CloseableIterator<LogRecord>> lakeRecordIterators,
+            @Nullable CloseableIterator<LogRecord> snapshotRecordIterator,
+            Comparator<InternalRow> userKeyComparator,
+            CloseableIterator<KeyValueRow> changeLogIterator) {
+        this(
+                projectedFields,
+                pkIndexes,
+                snapshotRecordIterator == null
+                        ? Collections.emptyList()
+                        : Collections.singletonList(snapshotRecordIterator),
+                userKeyComparator,
+                changeLogIterator);
+    }
+
+    public SortMergeReader(
+            @Nullable int[] projectedFields,
+            int[] pkIndexes,
+            List<CloseableIterator<LogRecord>> snapshotRecordIterators,
             Comparator<InternalRow> userKeyComparator,
             CloseableIterator<KeyValueRow> changeLogIterator) {
         this.userKeyComparator = userKeyComparator;
         this.snapshotProjectedPkRow = ProjectedRow.from(pkIndexes);
-        this.lakeRecordIterator =
-                ConcatRecordIterator.wrap(lakeRecordIterators, 
userKeyComparator, pkIndexes);
+        this.snapshotRecordIterator =
+                ConcatRecordIterator.wrap(snapshotRecordIterators, 
userKeyComparator, pkIndexes);
         this.changeLogIterator = changeLogIterator;
         this.changeLogIteratorWrapper = new ChangeLogIteratorWrapper();
         this.snapshotMergedRowIteratorWrapper = new 
SnapshotMergedRowIteratorWrapper();
@@ -65,13 +86,13 @@ class SortMergeReader {
 
     @Nullable
     public CloseableIterator<InternalRow> readBatch() {
-        if (!lakeRecordIterator.hasNext()) {
+        if (!snapshotRecordIterator.hasNext()) {
             return changeLogIterator.hasNext()
                     ? changeLogIteratorWrapper.replace(changeLogIterator)
                     : null;
         } else {
             CloseableIterator<SortMergeRows> mergedRecordIterator =
-                    transform(lakeRecordIterator, 
this::sortMergeWithChangeLog);
+                    transform(snapshotRecordIterator, 
this::sortMergeWithChangeLog);
 
             return 
snapshotMergedRowIteratorWrapper.replace(mergedRecordIterator);
         }
@@ -227,64 +248,6 @@ class SortMergeReader {
         }
     }
 
-    private static class SingleElementHeadIterator<T> implements 
CloseableIterator<T> {
-        private T singleElement;
-        private CloseableIterator<T> inner;
-        private boolean singleElementReturned;
-
-        public SingleElementHeadIterator(T element, CloseableIterator<T> 
inner) {
-            this.singleElement = element;
-            this.inner = inner;
-            this.singleElementReturned = false;
-        }
-
-        public static <T> SingleElementHeadIterator<T> addElementToHead(
-                T firstElement, CloseableIterator<T> originElementIterator) {
-            if (originElementIterator instanceof SingleElementHeadIterator) {
-                SingleElementHeadIterator<T> singleElementHeadIterator =
-                        (SingleElementHeadIterator<T>) originElementIterator;
-                singleElementHeadIterator.set(firstElement, 
singleElementHeadIterator.inner);
-                return singleElementHeadIterator;
-            } else {
-                return new SingleElementHeadIterator<>(firstElement, 
originElementIterator);
-            }
-        }
-
-        public void set(T element, CloseableIterator<T> inner) {
-            this.singleElement = element;
-            this.inner = inner;
-            this.singleElementReturned = false;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return !singleElementReturned || inner.hasNext();
-        }
-
-        @Override
-        public T next() {
-            if (singleElementReturned) {
-                return inner.next();
-            }
-            singleElementReturned = true;
-            return singleElement;
-        }
-
-        public T peek() {
-            if (singleElementReturned) {
-                this.singleElement = inner.next();
-                this.singleElementReturned = false;
-                return this.singleElement;
-            }
-            return singleElement;
-        }
-
-        @Override
-        public void close() {
-            inner.close();
-        }
-    }
-
     private static class ChangeLogIteratorWrapper implements 
CloseableIterator<InternalRow> {
         private CloseableIterator<KeyValueRow> changeLogRecordIterator;
         private KeyValueRow nextReturnRow;
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/utils/SingleElementHeadIterator.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/utils/SingleElementHeadIterator.java
new file mode 100644
index 000000000..876e0f1fb
--- /dev/null
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/utils/SingleElementHeadIterator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.utils;
+
+import org.apache.fluss.utils.CloseableIterator;
+
+/**
+ * A {@link CloseableIterator} that wraps another iterator with a single 
element prepended at the
+ * head.
+ *
+ * <p>This iterator first returns the single element, then delegates to the 
inner iterator for
+ * subsequent elements. It can be reused by calling {@link #set(Object, 
CloseableIterator)} to
+ * replace the head element and inner iterator.
+ */
+public class SingleElementHeadIterator<T> implements CloseableIterator<T> {
+    private T singleElement;
+
+    private CloseableIterator<T> inner;
+
+    private boolean singleElementReturned;
+
+    public SingleElementHeadIterator(T element, CloseableIterator<T> inner) {
+        this.singleElement = element;
+        this.inner = inner;
+        this.singleElementReturned = false;
+    }
+
+    public static <T> SingleElementHeadIterator<T> addElementToHead(
+            T firstElement, CloseableIterator<T> originElementIterator) {
+        if (originElementIterator instanceof SingleElementHeadIterator) {
+            SingleElementHeadIterator<T> singleElementHeadIterator =
+                    (SingleElementHeadIterator<T>) originElementIterator;
+            singleElementHeadIterator.set(firstElement, 
singleElementHeadIterator.inner);
+            return singleElementHeadIterator;
+        } else {
+            return new SingleElementHeadIterator<>(firstElement, 
originElementIterator);
+        }
+    }
+
+    public void set(T element, CloseableIterator<T> inner) {
+        this.singleElement = element;
+        this.inner = inner;
+        this.singleElementReturned = false;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return !singleElementReturned || inner.hasNext();
+    }
+
+    @Override
+    public T next() {
+        if (singleElementReturned) {
+            return inner.next();
+        }
+        singleElementReturned = true;
+        return singleElement;
+    }
+
+    public T peek() {
+        if (singleElementReturned) {
+            this.singleElement = inner.next();
+            this.singleElementReturned = false;
+            return this.singleElement;
+        }
+        return singleElement;
+    }
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/reader/SortMergeReaderTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/SortMergeReaderTest.java
similarity index 98%
rename from 
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/reader/SortMergeReaderTest.java
rename to 
fluss-client/src/test/java/org/apache/fluss/client/table/scanner/SortMergeReaderTest.java
index edeee5829..64bb958e5 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/reader/SortMergeReaderTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/SortMergeReaderTest.java
@@ -15,14 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.flink.lake.reader;
+package org.apache.fluss.client.table.scanner;
 
-import org.apache.fluss.client.table.scanner.ScanRecord;
 import org.apache.fluss.record.ChangeType;
 import org.apache.fluss.record.LogRecord;
 import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.GenericRow;
 import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.KeyValueRow;
 import org.apache.fluss.row.ProjectedRow;
 import org.apache.fluss.types.IntType;
 import org.apache.fluss.types.RowType;
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/KeyValueRow.java
 b/fluss-common/src/main/java/org/apache/fluss/row/KeyValueRow.java
similarity index 91%
rename from 
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/KeyValueRow.java
rename to fluss-common/src/main/java/org/apache/fluss/row/KeyValueRow.java
index f8e7bd479..4c7ed024e 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/KeyValueRow.java
+++ b/fluss-common/src/main/java/org/apache/fluss/row/KeyValueRow.java
@@ -16,10 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.flink.lake.reader;
-
-import org.apache.fluss.row.InternalRow;
-import org.apache.fluss.row.ProjectedRow;
+package org.apache.fluss.row;
 
 /** An {@link InternalRow} with the key part. */
 public class KeyValueRow {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
index 9d3ce7379..0caf10d55 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
@@ -20,6 +20,7 @@ package org.apache.fluss.flink.lake.reader;
 
 import org.apache.fluss.client.table.Table;
 import org.apache.fluss.client.table.scanner.ScanRecord;
+import org.apache.fluss.client.table.scanner.SortMergeReader;
 import org.apache.fluss.client.table.scanner.batch.BatchScanner;
 import org.apache.fluss.client.table.scanner.log.LogScanner;
 import org.apache.fluss.client.table.scanner.log.ScanRecords;
@@ -32,6 +33,7 @@ import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.record.ChangeType;
 import org.apache.fluss.record.LogRecord;
 import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.KeyValueRow;
 import org.apache.fluss.utils.CloseableIterator;
 
 import javax.annotation.Nullable;
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
index 5148e1e1c..f71e2c229 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkFlussConf.scala
@@ -17,6 +17,9 @@
 
 package org.apache.fluss.spark
 
+import org.apache.fluss.config.ConfigBuilder.key
+import org.apache.fluss.config.ConfigOption
+
 import org.apache.spark.sql.internal.SQLConf.buildConf
 
 object SparkFlussConf {
@@ -27,4 +30,10 @@ object SparkFlussConf {
     .booleanConf
     .createWithDefault(false)
 
+  val READ_OPTIMIZED_OPTION: ConfigOption[java.lang.Boolean] =
+    key(READ_OPTIMIZED.key)
+      .booleanType()
+      .defaultValue(READ_OPTIMIZED.defaultValue.get)
+      .withDescription(READ_OPTIMIZED.doc)
+
 }
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
index 028af2815..53ed8a091 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/SparkTable.scala
@@ -20,6 +20,7 @@ package org.apache.fluss.spark
 import org.apache.fluss.client.admin.Admin
 import org.apache.fluss.config.{Configuration => FlussConfiguration}
 import org.apache.fluss.metadata.{TableInfo, TablePath}
+import org.apache.fluss.spark.SparkFlussConf.READ_OPTIMIZED
 import org.apache.fluss.spark.catalog.{AbstractSparkTable, 
SupportsFlussPartitionManagement}
 import org.apache.fluss.spark.read.{FlussAppendScanBuilder, 
FlussUpsertScanBuilder}
 import org.apache.fluss.spark.write.{FlussAppendWriteBuilder, 
FlussUpsertWriteBuilder}
@@ -28,7 +29,6 @@ import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class SparkTable(
@@ -54,12 +54,14 @@ class SparkTable(
     if (tableInfo.getPrimaryKeys.isEmpty) {
       new FlussAppendScanBuilder(tablePath, tableInfo, options, flussConfig)
     } else {
-      if (!conf.getConf(SparkFlussConf.READ_OPTIMIZED, false)) {
-        throw new UnsupportedOperationException(
-          "For now, only data in snapshot can be read, without merging them 
with changes. " +
-            "If you can accept it, please set `spark.sql.fluss.readOptimized` 
true, and execute query again.")
+      val newFlussConfig = if (conf.getConf(SparkFlussConf.READ_OPTIMIZED, 
false)) {
+        val newFlussConfig_ = new FlussConfiguration(flussConfig)
+        newFlussConfig_.setBoolean(SparkFlussConf.READ_OPTIMIZED_OPTION.key(), 
true)
+        newFlussConfig_
+      } else {
+        flussConfig
       }
-      new FlussUpsertScanBuilder(tablePath, tableInfo, options, flussConfig)
+      new FlussUpsertScanBuilder(tablePath, tableInfo, options, newFlussConfig)
     }
   }
 }
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
index 52cd46652..c48a82dbd 100644
--- 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
@@ -17,30 +17,70 @@
 
 package org.apache.fluss.spark.read
 
+import org.apache.fluss.client.table.scanner.{ScanRecord, SortMergeReader}
 import org.apache.fluss.client.table.scanner.batch.BatchScanner
+import org.apache.fluss.client.table.scanner.log.LogScanner
 import org.apache.fluss.config.Configuration
+import org.apache.fluss.memory.MemorySegment
 import org.apache.fluss.metadata.{TableBucket, TablePath}
+import org.apache.fluss.record.LogRecord
+import org.apache.fluss.row.{encode, InternalRow, KeyValueRow}
+import org.apache.fluss.spark.SparkFlussConf
+import org.apache.fluss.spark.utils.LogChangesIterator
+import org.apache.fluss.utils.CloseableIterator
 
-import java.util
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import java.util.Comparator
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 /**
  * Partition reader that reads primary key table data.
  *
- * For now, only data in snapshot can be read, without merging them with 
changes.
+ * Reads both snapshot and log data, merging them using sort-merge algorithm. 
For rows with the same
+ * primary key, log data takes precedence over snapshot data.
  */
 class FlussUpsertPartitionReader(
     tablePath: TablePath,
     projection: Array[Int],
     flussPartition: FlussUpsertInputPartition,
     flussConfig: Configuration)
-  extends FlussPartitionReader(tablePath, flussConfig) {
+  extends FlussPartitionReader(tablePath, flussConfig)
+  with Logging {
 
+  private val readOptimized = 
flussConfig.get(SparkFlussConf.READ_OPTIMIZED_OPTION)
   private val tableBucket: TableBucket = flussPartition.tableBucket
   private val snapshotId: Long = flussPartition.snapshotId
+  private val logStartingOffset: Long = flussPartition.logStartingOffset
+  private val logStoppingOffset: Long = flussPartition.logStoppingOffset
+  private val logScanFinished = logStartingOffset >= logStoppingOffset || 
logStoppingOffset <= 0
+
+  private val (projectionWithPks, pkProjection) = {
+    val pkIndexes = tableInfo.getSchema.getPrimaryKeyIndexes
+    var i = 0
+    val _pkProjection = mutable.ArrayBuffer.empty[Int]
+    val extraProjections = mutable.ArrayBuffer.empty[Int]
+    extraProjections ++= projection
+    pkIndexes.foreach {
+      pkIndex =>
+        projection.find(p => p == pkIndex) match {
+          case Some(index) =>
+            _pkProjection += index
+          case _ =>
+            extraProjections += pkIndex
+            _pkProjection += projection.length + i
+            i += 1
+        }
+    }
+    (extraProjections.toArray, _pkProjection.toArray)
+  }
 
-  // KV Snapshot Reader (if snapshot exists)
   private var snapshotScanner: BatchScanner = _
-  private var snapshotIterator: 
util.Iterator[org.apache.fluss.row.InternalRow] = _
+  private var logScanner: LogScanner = _
+  private var mergedIterator: Iterator[InternalRow] = _
 
   // initialize scanners
   initialize()
@@ -50,39 +90,153 @@ class FlussUpsertPartitionReader(
       return false
     }
 
-    if (snapshotIterator == null || !snapshotIterator.hasNext) {
-      // Try to get next batch from snapshot scanner
-      val batch = snapshotScanner.pollBatch(POLL_TIMEOUT)
-      if (batch == null) {
-        // No more data fetched.
-        false
+    if (mergedIterator.hasNext) {
+      currentRow = convertToSparkRow(mergedIterator.next())
+      true
+    } else {
+      false
+    }
+  }
+
+  private def createSortMergeReader(): SortMergeReader = {
+    // Create key encoder for primary keys
+    val keyEncoder =
+      encode.KeyEncoder.ofPrimaryKeyEncoder(
+        rowType,
+        tableInfo.getPhysicalPrimaryKeys,
+        tableInfo.getTableConfig,
+        tableInfo.isDefaultBucketKey)
+
+    // Create comparators based on primary key
+    val comparator = new Comparator[InternalRow] {
+      override def compare(o1: InternalRow, o2: InternalRow): Int = {
+        val key1 = keyEncoder.encodeKey(o1)
+        val key2 = keyEncoder.encodeKey(o2)
+        MemorySegment.wrap(key1).compare(MemorySegment.wrap(key2), 0, 0, 
key1.length)
+      }
+    }
+
+    def createLogChangesIterator(): LogChangesIterator = {
+      // Initialize the log scanner
+      logScanner = 
table.newScan().project(projectionWithPks).createLogScanner()
+      if (tableBucket.getPartitionId == null) {
+        logScanner.subscribe(tableBucket.getBucket, logStartingOffset)
       } else {
-        snapshotIterator = batch
-        if (snapshotIterator.hasNext) {
-          currentRow = convertToSparkRow(snapshotIterator.next())
-          true
-        } else {
-          // Poll a new batch
-          next()
+        logScanner.subscribe(tableBucket.getPartitionId, 
tableBucket.getBucket, logStartingOffset)
+      }
+
+      // Collect all log records until logStoppingOffset
+      val allLogRecords = mutable.ArrayBuffer[ScanRecord]()
+      var continue = true
+
+      while (continue) {
+        val records = logScanner.poll(POLL_TIMEOUT)
+        if (!records.isEmpty) {
+          val flatRecords = records.asScala
+          for (scanRecord <- flatRecords) {
+            // Maybe data with logStoppingOffset doesn't exist.
+            if (scanRecord.logOffset() < logStoppingOffset - 1) {
+              allLogRecords += scanRecord
+            } else if (scanRecord.logOffset() == logStoppingOffset - 1) {
+              allLogRecords += scanRecord
+              continue = false
+            } else {
+              continue = false // Stop if we reach the stopping offset
+            }
+          }
         }
       }
+
+      LogChangesIterator(allLogRecords.toArray, pkProjection, comparator)
+    }
+
+    def createSnapshotIterator(): CloseableIterator[LogRecord] = {
+      // Initialize the snapshot scanner
+      snapshotScanner =
+        
table.newScan().project(projectionWithPks).createBatchScanner(tableBucket, 
snapshotId)
+
+      // Convert snapshot iterator to LogRecord iterator for SortMergeReader
+      new CloseableIterator[LogRecord] {
+        private var currentBatch: java.util.Iterator[InternalRow] = _
+        private var hasMoreBatches = true
+
+        override def hasNext: Boolean = {
+          while ((currentBatch == null || !currentBatch.hasNext) && 
hasMoreBatches) {
+            val batch = snapshotScanner.pollBatch(POLL_TIMEOUT)
+            if (batch == null) {
+              hasMoreBatches = false
+            } else {
+              currentBatch = batch
+            }
+          }
+          currentBatch != null && currentBatch.hasNext
+        }
+
+        override def next(): LogRecord = {
+          // Convert InternalRow to LogRecord using GenericRecord
+          new ScanRecord(currentBatch.next())
+        }
+
+        override def close(): Unit = {
+          snapshotScanner.close()
+        }
+      }
+
+    }
+
+    // Get log iterator with proper filtering and sorting
+    val logIterator = if (logScanFinished || readOptimized) {
+      CloseableIterator.emptyIterator[KeyValueRow]()
     } else {
-      // Get data from current snapshot batch
-      currentRow = convertToSparkRow(snapshotIterator.next())
-      true
+      createLogChangesIterator()
     }
+
+    val snapshotIterators = if (snapshotId == -1) {
+      null
+    } else {
+      createSnapshotIterator()
+    }
+
+    // Create the SortMergeReader
+    val sortMergeReader = new SortMergeReader(
+      projectionWithPks,
+      pkProjection,
+      snapshotIterators,
+      comparator,
+      logIterator
+    )
+    sortMergeReader
   }
 
   private def initialize(): Unit = {
-    // Initialize Scanners
-    if (snapshotId >= 0) {
-      // Create batch scanner
-      snapshotScanner =
-        table.newScan().project(projection).createBatchScanner(tableBucket, 
snapshotId)
+    val currentTs = System.currentTimeMillis()
+
+    val sortMergeReader = createSortMergeReader()
+
+    // Get the merged result iterator
+    val mergedResult = sortMergeReader.readBatch()
+
+    // If merged result is null, return an empty iterator
+    mergedIterator = if (mergedResult == null) {
+      Iterator.empty
+    } else {
+      mergedResult.asScala
     }
+    val spend = (System.currentTimeMillis() - currentTs) / 1000
+    logInfo(s"Initialize FlussUpsertPartitionReader cost $spend(s)")
   }
 
   override def close0(): Unit = {
+    if (mergedIterator != null) {
+      mergedIterator match {
+        case closeable: AutoCloseable => closeable.close()
+        case _ => // Do nothing
+      }
+    }
+
+    if (logScanner != null) {
+      logScanner.close()
+    }
     if (snapshotScanner != null) {
       snapshotScanner.close()
     }
diff --git 
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/LogChangesIterator.scala
 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/LogChangesIterator.scala
new file mode 100644
index 000000000..2c5871c84
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/LogChangesIterator.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark.utils
+
+import org.apache.fluss.client.table.scanner.ScanRecord
+import org.apache.fluss.client.utils.SingleElementHeadIterator
+import org.apache.fluss.record.ChangeType
+import org.apache.fluss.row.{InternalRow, KeyValueRow, ProjectedRow}
+import org.apache.fluss.utils.CloseableIterator
+
+import java.util.Comparator
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+/**
+ * An iterator that processes log records and merges changes for rows with the 
same primary key.
+ *
+ * This iterator sorts log records by primary key and then by offset, groups 
records with the same
+ * primary key together, and merges them according to change type semantics. 
The merging logic
+ * follows these rules:
+ *   - INSERT: replaces previous state
+ *   - DELETE: removes the record
+ *   - UPDATE_BEFORE: ignored during merge
+ *   - UPDATE_AFTER: replaces previous state
+ *
+ * The iterator outputs [[KeyValueRow]] instances representing the final state 
after merging all
+ * changes for each primary key.
+ */
+case class LogChangesIterator(
+    logRecords: Array[ScanRecord],
+    pkProjection: Array[Int],
+    comparator: Comparator[InternalRow]
+) extends CloseableIterator[KeyValueRow] {
+
+  // Sort the records by primary key and then by offset
+  private val sortedLogRecords = logRecords.sortWith {
+    case (record1, record2) =>
+      val keyComparison = comparator.compare(record1.getRow, record2.getRow)
+      if (keyComparison == 0) {
+        record1.logOffset() < record2.logOffset() // For same key, lower 
offset comes first
+      } else {
+        keyComparison < 0
+      }
+  }
+
+  private var recordsIterator = SingleElementHeadIterator.addElementToHead(
+    sortedLogRecords.head,
+    CloseableIterator.wrap(sortedLogRecords.tail.toIterator.asJava))
+
+  private val projectRow1 = ProjectedRow.from(pkProjection)
+  private val projectRow2 = ProjectedRow.from(pkProjection)
+
+  private var currentScanRecord: ScanRecord = _
+
+  override def hasNext: Boolean = {
+    if (currentScanRecord != null) {
+      return true
+    }
+    if (!recordsIterator.hasNext) {
+      return false
+    }
+
+    val recordsWithSamePK = mutable.ArrayBuffer(recordsIterator.next())
+    val current = recordsWithSamePK.head
+
+    var continue = true
+    while (continue && recordsIterator.hasNext) {
+      val next = recordsIterator.next()
+      if (hasSamePrimaryKey(current, next)) {
+        recordsWithSamePK.append(next)
+      } else {
+        recordsIterator = SingleElementHeadIterator.addElementToHead(next, 
recordsIterator)
+        continue = false
+      }
+    }
+
+    mergeLogRecordsWithSamePK(recordsWithSamePK.toArray) match {
+      case Some(record) =>
+        currentScanRecord = record
+        true
+      case None =>
+        hasNext
+    }
+  }
+
+  override def next(): KeyValueRow = {
+    assert(currentScanRecord != null)
+
+    val result = new KeyValueRow(
+      pkProjection,
+      currentScanRecord.getRow,
+      isDelete(currentScanRecord.getChangeType)
+    )
+    currentScanRecord = null
+    result
+  }
+
+  private def hasSamePrimaryKey(s1: ScanRecord, s2: ScanRecord): Boolean = {
+    comparator.compare(
+      projectRow1.replaceRow(s1.getRow),
+      projectRow2.replaceRow(s2.getRow)
+    ) == 0
+  }
+
+  private def mergeLogRecordsWithSamePK(logRecords: Array[ScanRecord]): 
Option[ScanRecord] = {
+    var result: Option[ScanRecord] = Some(logRecords.head)
+    logRecords.tail.foreach {
+      record =>
+        record.getChangeType match {
+          case ChangeType.INSERT =>
+            result = Some(record)
+          case ChangeType.DELETE =>
+            result = None
+          case ChangeType.UPDATE_BEFORE =>
+          // Ignore
+          case ChangeType.UPDATE_AFTER =>
+            result = Some(record)
+          case _ =>
+            throw new IllegalArgumentException(s"Unknown change type: 
${record.getChangeType}")
+        }
+    }
+    result
+  }
+
+  private def isDelete(changeType: ChangeType): Boolean = {
+    changeType == ChangeType.DELETE || changeType == ChangeType.UPDATE_BEFORE
+  }
+
+  override def close(): Unit = {}
+}
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
index 4bdf5e84d..688ae872c 100644
--- 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/FlussSparkTestBase.scala
@@ -22,10 +22,11 @@ import org.apache.fluss.client.admin.Admin
 import org.apache.fluss.client.table.Table
 import org.apache.fluss.client.table.scanner.log.LogScanner
 import org.apache.fluss.config.{ConfigOptions, Configuration}
-import org.apache.fluss.metadata.{DataLakeFormat, TableDescriptor, TablePath}
+import org.apache.fluss.metadata.{TableDescriptor, TablePath}
 import org.apache.fluss.row.InternalRow
 import org.apache.fluss.server.testutils.FlussClusterExtension
 
+import org.apache.spark.SparkConf
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.test.SharedSparkSession
 
@@ -41,27 +42,25 @@ class FlussSparkTestBase extends QueryTest with 
SharedSparkSession {
   protected var conn: Connection = _
   protected var admin: Admin = _
 
-  val flussServer: FlussClusterExtension =
+  protected val flussServer: FlussClusterExtension =
     FlussClusterExtension.builder
       .setClusterConf(flussConf)
       .setNumOfTabletServers(3)
       .build
 
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set(s"spark.sql.catalog.$DEFAULT_CATALOG", 
classOf[SparkCatalog].getName)
+      .set(s"spark.sql.catalog.$DEFAULT_CATALOG.bootstrap.servers", 
flussServer.getBootstrapServers)
+      .set("spark.sql.defaultCatalog", DEFAULT_CATALOG)
+  }
+
   override protected def beforeAll(): Unit = {
-    super.beforeAll()
     flussServer.start()
     conn = ConnectionFactory.createConnection(flussServer.getClientConfig)
     admin = conn.getAdmin
 
-    spark.conf.set(s"spark.sql.catalog.$DEFAULT_CATALOG", 
classOf[SparkCatalog].getName)
-    spark.conf.set(
-      s"spark.sql.catalog.$DEFAULT_CATALOG.bootstrap.servers",
-      flussServer.getBootstrapServers)
-    spark.conf.set("spark.sql.defaultCatalog", DEFAULT_CATALOG)
-    // Enable read optimized by default temporarily.
-    // TODO: remove this when https://github.com/apache/fluss/issues/2427 is 
done.
-    spark.conf.set("spark.sql.fluss.readOptimized", "true")
-
+    super.beforeAll()
     sql(s"USE $DEFAULT_DATABASE")
   }
 
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkReadTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala
similarity index 68%
rename from 
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkReadTest.scala
rename to 
fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala
index 834cec7ad..5f9613cbc 100644
--- 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkReadTest.scala
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkLogTableReadTest.scala
@@ -19,7 +19,7 @@ package org.apache.fluss.spark
 
 import org.apache.spark.sql.Row
 
-class SparkReadTest extends FlussSparkTestBase {
+class SparkLogTableReadTest extends FlussSparkTestBase {
 
   test("Spark Read: log table") {
     withTable("t") {
@@ -80,48 +80,6 @@ class SparkReadTest extends FlussSparkTestBase {
     }
   }
 
-  test("Spark Read: primary key table") {
-    withTable("t") {
-      sql(s"""
-             |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT, 
amount INT, address STRING)
-             |TBLPROPERTIES("primary.key" = "orderId")
-             |""".stripMargin)
-
-      sql(s"""
-             |INSERT INTO $DEFAULT_DATABASE.t VALUES
-             |(600L, 21L, 601, "addr1"), (700L, 22L, 602, "addr2"),
-             |(800L, 23L, 603, "addr3"), (900L, 24L, 604, "addr4"),
-             |(1000L, 25L, 605, "addr5")
-             |""".stripMargin)
-
-      checkAnswer(
-        sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
-        Row(600L, 21L, 601, "addr1") ::
-          Row(700L, 22L, 602, "addr2") ::
-          Row(800L, 23L, 603, "addr3") ::
-          Row(900L, 24L, 604, "addr4") ::
-          Row(1000L, 25L, 605, "addr5") :: Nil
-      )
-
-      sql(s"""
-             |INSERT INTO $DEFAULT_DATABASE.t VALUES
-             |(700L, 220L, 602, "addr2"),
-             |(900L, 240L, 604, "addr4"),
-             |(1100L, 260L, 606, "addr6")
-             |""".stripMargin)
-
-      checkAnswer(
-        sql(s"""
-               |SELECT orderId, itemId, address FROM $DEFAULT_DATABASE.t
-               |WHERE amount <= 603 ORDER BY orderId""".stripMargin),
-        Row(600L, 21L, "addr1") ::
-          Row(700L, 220L, "addr2") ::
-          Row(800L, 23L, "addr3") ::
-          Nil
-      )
-    }
-  }
-
   test("Spark Read: partitioned log table") {
     withTable("t") {
       sql(
@@ -168,62 +126,6 @@ class SparkReadTest extends FlussSparkTestBase {
     }
   }
 
-  test("Spark Read: partitioned primary key table") {
-    withTable("t") {
-      sql(s"""
-             |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT, 
amount INT, address STRING, dt STRING)
-             |PARTITIONED BY (dt)
-             |TBLPROPERTIES("primary.key" = "orderId,dt")
-             |""".stripMargin)
-
-      sql(s"""
-             |INSERT INTO $DEFAULT_DATABASE.t VALUES
-             |(600L, 21L, 601, "addr1", "2026-01-01"), (700L, 22L, 602, 
"addr2", "2026-01-01"),
-             |(800L, 23L, 603, "addr3", "2026-01-02"), (900L, 24L, 604, 
"addr4", "2026-01-02"),
-             |(1000L, 25L, 605, "addr5", "2026-01-03")
-             |""".stripMargin)
-      sql(s"""
-             |INSERT INTO $DEFAULT_DATABASE.t VALUES
-             |(700L, 220L, 602, "addr2_updated", "2026-01-01"),
-             |(900L, 240L, 604, "addr4_updated", "2026-01-02"),
-             |(1100L, 260L, 606, "addr6", "2026-01-03")
-             |""".stripMargin)
-
-      checkAnswer(
-        sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
-        Row(600L, 21L, 601, "addr1", "2026-01-01") ::
-          Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
-          Row(800L, 23L, 603, "addr3", "2026-01-02") ::
-          Row(900L, 240L, 604, "addr4_updated", "2026-01-02") ::
-          Row(1000L, 25L, 605, "addr5", "2026-01-03") ::
-          Row(1100L, 260L, 606, "addr6", "2026-01-03") ::
-          Nil
-      )
-
-      // Read with partition filter
-      checkAnswer(
-        sql(s"""
-               |SELECT * FROM $DEFAULT_DATABASE.t
-               |WHERE dt = '2026-01-01'
-               |ORDER BY orderId""".stripMargin),
-        Row(600L, 21L, 601, "addr1", "2026-01-01") ::
-          Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
-          Nil
-      )
-
-      // Read with multiple partition filters
-      checkAnswer(
-        sql(
-          s"SELECT * FROM $DEFAULT_DATABASE.t WHERE dt IN ('2026-01-01', 
'2026-01-02') ORDER BY orderId"),
-        Row(600L, 21L, 601, "addr1", "2026-01-01") ::
-          Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
-          Row(800L, 23L, 603, "addr3", "2026-01-02") ::
-          Row(900L, 240L, 604, "addr4_updated", "2026-01-02") ::
-          Nil
-      )
-    }
-  }
-
   test("Spark: all data types") {
     withTable("t") {
       sql(s"""
diff --git 
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala
 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala
new file mode 100644
index 000000000..c1cb02236
--- /dev/null
+++ 
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.spark
+
+import org.apache.fluss.client.initializer.{BucketOffsetsRetrieverImpl, 
OffsetsInitializer}
+import org.apache.fluss.config.{ConfigOptions, Configuration}
+import org.apache.fluss.metadata.{TableBucket, TablePath}
+import org.apache.fluss.spark.read.FlussUpsertInputPartition
+
+import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions.assertThat
+
+import scala.collection.JavaConverters._
+
+/** This test case is used to verify the correctness of primary key table 
read. */
+class SparkPrimaryKeyTableReadTest extends FlussSparkTestBase {
+
+  /**
+   * Do not set [[ConfigOptions.KV_SNAPSHOT_INTERVAL]] here, we want to 
control the snapshot trigger
+   * by manually.
+   */
+  override def flussConf: Configuration = {
+    new Configuration()
+  }
+
+  test("Spark Read: primary key table") {
+    withTable("t") {
+      val tablePath = createTablePath("t")
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT, 
amount INT, address STRING)
+             |TBLPROPERTIES("primary.key" = "orderId", "bucket.num" = 1)
+             |""".stripMargin)
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(600L, 21L, 601, "addr1"), (700L, 22L, 602, "addr2"),
+             |(800L, 23L, 603, "addr3"), (900L, 24L, 604, "addr4"),
+             |(1000L, 25L, 605, "addr5")
+             |""".stripMargin)
+
+      var inputPartitions = genInputPartition(tablePath, null)
+      // Data is only stored in log.
+      assertThat(inputPartitions.exists(hasSnapshotData)).isEqualTo(false)
+      assertThat(inputPartitions.forall(hasLogChanges)).isEqualTo(true)
+      // Read data from log scanner.
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+        Row(600L, 21L, 601, "addr1") ::
+          Row(700L, 22L, 602, "addr2") ::
+          Row(800L, 23L, 603, "addr3") ::
+          Row(900L, 24L, 604, "addr4") ::
+          Row(1000L, 25L, 605, "addr5") :: Nil
+      )
+
+      // Trigger snapshot.
+      flussServer.triggerAndWaitSnapshot(tablePath)
+      inputPartitions = genInputPartition(tablePath, null)
+      assertThat(inputPartitions.forall(hasSnapshotData)).isEqualTo(true)
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+        Row(600L, 21L, 601, "addr1") ::
+          Row(700L, 22L, 602, "addr2") ::
+          Row(800L, 23L, 603, "addr3") ::
+          Row(900L, 24L, 604, "addr4") ::
+          Row(1000L, 25L, 605, "addr5") :: Nil
+      )
+
+      // Upsert.
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(700L, 220L, 602, "addr2"),
+             |(900L, 240L, 604, "addr4"),
+             |(1100L, 260L, 606, "addr6")
+             |""".stripMargin)
+
+      inputPartitions = genInputPartition(tablePath, null)
+      // Data is stored in both snapshot and log.
+      assertThat(inputPartitions.exists(hasSnapshotData)).isEqualTo(true)
+      assertThat(inputPartitions.exists(hasLogChanges)).isEqualTo(true)
+      checkAnswer(
+        sql(s"""
+               |SELECT orderId, itemId, address FROM $DEFAULT_DATABASE.t
+               |WHERE amount <= 603 ORDER BY orderId""".stripMargin),
+        Row(600L, 21L, "addr1") ::
+          Row(700L, 220L, "addr2") ::
+          Row(800L, 23L, "addr3") ::
+          Nil
+      )
+      withSQLConf(SparkFlussConf.READ_OPTIMIZED.key -> "true") {
+        checkAnswer(
+          sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+          Row(600L, 21L, 601, "addr1") ::
+            Row(700L, 22L, 602, "addr2") ::
+            Row(800L, 23L, 603, "addr3") ::
+            Row(900L, 24L, 604, "addr4") ::
+            Row(1000L, 25L, 605, "addr5") :: Nil
+        )
+      }
+
+      // Trigger snapshot.
+      flussServer.triggerAndWaitSnapshot(tablePath)
+      checkAnswer(
+        sql(s"""
+               |SELECT orderId, itemId, address FROM $DEFAULT_DATABASE.t
+               |WHERE amount <= 603 ORDER BY orderId""".stripMargin),
+        Row(600L, 21L, "addr1") ::
+          Row(700L, 220L, "addr2") ::
+          Row(800L, 23L, "addr3") ::
+          Nil
+      )
+    }
+  }
+
+  test("Spark Read: partitioned primary key table") {
+    withTable("t") {
+      val tablePath = createTablePath("t")
+      sql(s"""
+             |CREATE TABLE $DEFAULT_DATABASE.t (orderId BIGINT, itemId BIGINT, 
amount INT, address STRING, dt STRING)
+             |PARTITIONED BY (dt)
+             |TBLPROPERTIES("primary.key" = "orderId,dt", "bucket.num" = 1)
+             |""".stripMargin)
+
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(600L, 21L, 601, "addr1", "2026-01-01"), (700L, 22L, 602, 
"addr2", "2026-01-01"),
+             |(800L, 23L, 603, "addr3", "2026-01-02"), (900L, 24L, 604, 
"addr4", "2026-01-02"),
+             |(1000L, 25L, 605, "addr5", "2026-01-03")
+             |""".stripMargin)
+
+      var inputPartitions = 
admin.listPartitionInfos(tablePath).get().asScala.flatMap {
+        p => genInputPartition(tablePath, p.getPartitionName)
+      }
+      // Data is only in log.
+      assertThat(inputPartitions.exists(hasSnapshotData)).isEqualTo(false)
+      assertThat(inputPartitions.forall(hasLogChanges)).isEqualTo(true)
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+        Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+          Row(700L, 22L, 602, "addr2", "2026-01-01") ::
+          Row(800L, 23L, 603, "addr3", "2026-01-02") ::
+          Row(900L, 24L, 604, "addr4", "2026-01-02") ::
+          Row(1000L, 25L, 605, "addr5", "2026-01-03") ::
+          Nil
+      )
+
+      // Trigger snapshot.
+      flussServer.triggerAndWaitSnapshot(tablePath)
+      var inputPartition0 = genInputPartition(tablePath, "2026-01-01").head
+      assertThat(hasSnapshotData(inputPartition0)).isEqualTo(true)
+      checkAnswer(
+        sql(s"SELECT * FROM $DEFAULT_DATABASE.t ORDER BY orderId"),
+        Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+          Row(700L, 22L, 602, "addr2", "2026-01-01") ::
+          Row(800L, 23L, 603, "addr3", "2026-01-02") ::
+          Row(900L, 24L, 604, "addr4", "2026-01-02") ::
+          Row(1000L, 25L, 605, "addr5", "2026-01-03") ::
+          Nil
+      )
+
+      // Upsert.
+      sql(s"""
+             |INSERT INTO $DEFAULT_DATABASE.t VALUES
+             |(700L, 220L, 602, "addr2_updated", "2026-01-01"),
+             |(900L, 240L, 604, "addr4_updated", "2026-01-02"),
+             |(1100L, 260L, 606, "addr6", "2026-01-03")
+             |""".stripMargin)
+
+      inputPartition0 = genInputPartition(tablePath, "2026-01-01").head
+      // Data(2026-01-01, bucketId=0) is stored in both snapshot and log.
+      assertThat(hasSnapshotData(inputPartition0)).isEqualTo(true)
+      assertThat(hasLogChanges(inputPartition0)).isEqualTo(true)
+      // Read with partition filter
+      checkAnswer(
+        sql(s"""
+               |SELECT * FROM $DEFAULT_DATABASE.t
+               |WHERE dt = '2026-01-01'
+               |ORDER BY orderId""".stripMargin),
+        Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+          Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
+          Nil
+      )
+
+      // Trigger a bucket snapshot.
+      flussServer.triggerAndWaitSnapshot(inputPartition0.tableBucket)
+      checkAnswer(
+        sql(s"""
+               |SELECT * FROM $DEFAULT_DATABASE.t
+               |WHERE dt = '2026-01-01'
+               |ORDER BY orderId""".stripMargin),
+        Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+          Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
+          Nil
+      )
+
+      // Trigger snapshot.
+      flussServer.triggerAndWaitSnapshot(tablePath)
+      inputPartitions = 
admin.listPartitionInfos(tablePath).get().asScala.flatMap {
+        p => genInputPartition(tablePath, p.getPartitionName)
+      }
+      assertThat(inputPartitions.forall(hasSnapshotData)).isEqualTo(true)
+      // Read with multiple partition filters
+      checkAnswer(
+        sql(
+          s"SELECT * FROM $DEFAULT_DATABASE.t WHERE dt IN ('2026-01-01', 
'2026-01-02') ORDER BY orderId"),
+        Row(600L, 21L, 601, "addr1", "2026-01-01") ::
+          Row(700L, 220L, 602, "addr2_updated", "2026-01-01") ::
+          Row(800L, 23L, 603, "addr3", "2026-01-02") ::
+          Row(900L, 240L, 604, "addr4_updated", "2026-01-02") ::
+          Nil
+      )
+    }
+  }
+
+  private def genInputPartition(
+      tablePath: TablePath,
+      partitionName: String): Array[FlussUpsertInputPartition] = {
+    val kvSnapshots = if (partitionName == null) {
+      admin.getLatestKvSnapshots(tablePath).get()
+    } else {
+      admin.getLatestKvSnapshots(tablePath, partitionName).get()
+    }
+    val bucketOffsetsRetriever = new BucketOffsetsRetrieverImpl(admin, 
tablePath)
+    val latestOffsetsInitializer = OffsetsInitializer.latest()
+    val tableId = kvSnapshots.getTableId
+    val partitionId = kvSnapshots.getPartitionId
+    val bucketIds = kvSnapshots.getBucketIds
+    val bucketIdToLogOffset =
+      latestOffsetsInitializer.getBucketOffsets(partitionName, bucketIds, 
bucketOffsetsRetriever)
+    bucketIds.asScala.map {
+      bucketId =>
+        val tableBucket = new TableBucket(tableId, partitionId, bucketId)
+        val snapshotId = kvSnapshots.getSnapshotId(bucketId).orElse(-1L)
+        val logStartingOffset = kvSnapshots.getLogOffset(bucketId).orElse(-2L)
+        val logEndingOffset = bucketIdToLogOffset.get(bucketId)
+
+        FlussUpsertInputPartition(tableBucket, snapshotId, logStartingOffset, 
logEndingOffset)
+    }.toArray
+  }
+
+  private def hasLogChanges(inputPartition: FlussUpsertInputPartition): 
Boolean = {
+    inputPartition.logStoppingOffset > 0 && inputPartition.logStartingOffset < 
inputPartition.logStoppingOffset
+  }
+
+  private def hasSnapshotData(inputPartition: FlussUpsertInputPartition): 
Boolean = {
+    inputPartition.snapshotId >= 0
+  }
+}

Reply via email to