wuchong commented on code in PR #2523:
URL: https://github.com/apache/fluss/pull/2523#discussion_r2749721385


##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/SortMergeReader.java:
##########
@@ -46,6 +48,22 @@ class SortMergeReader {
     private final ChangeLogIteratorWrapper changeLogIteratorWrapper;
     private @Nullable final ProjectedRow projectedRow;
 
+    public SortMergeReader(
+            @Nullable int[] projectedFields,
+            int[] pkIndexes,
+            @Nullable CloseableIterator<LogRecord> lakeRecordIterator,

Review Comment:
   Could you rename this parameter and the member variable `lakeRecordIterator` 
to `snapshotRecordIterator`? This can better reflect the usage of Spark.



##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala:
##########
@@ -50,39 +90,148 @@ class FlussUpsertPartitionReader(
       return false
     }
 
-    if (snapshotIterator == null || !snapshotIterator.hasNext) {
-      // Try to get next batch from snapshot scanner
-      val batch = snapshotScanner.pollBatch(POLL_TIMEOUT)
-      if (batch == null) {
-        // No more data fetched.
-        false
+    if (mergedIterator.hasNext) {
+      currentRow = convertToSparkRow(mergedIterator.next())
+      true
+    } else {
+      false
+    }
+  }
+
+  private def createSortMergeReader(): SortMergeReader = {
+    // Create key encoder for primary keys
+    val keyEncoder = encode.KeyEncoder.of(rowType, 
tableInfo.getPhysicalPrimaryKeys, null)

Review Comment:
   In the latest `main` branch, we’ve refactored `KeyEncoder`.  
   Could you please rebase onto the latest `main` and use the 
`KeyEncoder.ofPrimaryKey(...)` method? Otherwise, the key encoding won’t align 
with the keys stored in RocksDB, leading to incorrect query results.
   
   
https://github.com/apache/fluss/commit/3a803dfd812398decbfbced81b655d6a8839351e



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to