Copilot commented on code in PR #2170:
URL: https://github.com/apache/fluss/pull/2170#discussion_r2617861546


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java:
##########
@@ -192,23 +192,26 @@ public CloseableIterator<InternalRow> pollBatch(Duration 
timeout) throws IOExcep
             return currentSortMergeReader.readBatch();
         } else {
             if (lakeRecordIterators.isEmpty()) {
+                List<RecordReader> recordReaders = new ArrayList<>();
                 if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null
                         || 
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) {
-                    lakeRecordIterators = Collections.emptyList();
-                    logRows = new LinkedHashMap<>();
+                    // pass null split to get rowComparator
+                    recordReaders.add(lakeSource.createRecordReader(() -> 
null));
                 } else {
                     for (LakeSplit lakeSplit : 
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) {
-                        RecordReader reader = lakeSource.createRecordReader(() 
-> lakeSplit);
-                        if (reader instanceof SortedRecordReader) {
-                            rowComparator = ((SortedRecordReader) 
reader).order();
-                        } else {
-                            throw new UnsupportedOperationException(
-                                    "lake records must instance of sorted 
view.");
-                        }
-                        lakeRecordIterators.add(reader.read());
+                        recordReaders.add(lakeSource.createRecordReader(() -> 
lakeSplit));
+                    }
+                }
+                for (RecordReader reader : recordReaders) {
+                    if (reader instanceof SortedRecordReader) {
+                        rowComparator = ((SortedRecordReader) reader).order();
+                    } else {
+                        throw new UnsupportedOperationException(
+                                "lake records must instance of sorted view.");
                     }
-                    logRows = new TreeMap<>(rowComparator);
+                    lakeRecordIterators.add(reader.read());

Review Comment:
   When there are no lake splits, a RecordReader is created with a null split 
just to obtain the rowComparator. However, this reader is then added to 
lakeRecordIterators list, which means its iterator (containing no data) will be 
included in the sort merge process. This could be inefficient. Consider 
extracting the rowComparator without adding the empty iterator to 
lakeRecordIterators, or ensure the empty iterator is properly handled during 
merging.
   ```suggestion
                       // Only add iterator if the reader is not for a null 
split
                       // (i.e., only if there are actual splits)
                       if (!(lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() 
== null
                             || 
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty())) {
                           lakeRecordIterators.add(reader.read());
                       }
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java:
##########
@@ -109,7 +107,9 @@ public LakeSnapshotAndLogSplitScanner(
                                                 "StoppingOffset is null for 
split: "
                                                         + 
lakeSnapshotAndFlussLogSplit));
 
-        this.logScanFinished = 
lakeSnapshotAndFlussLogSplit.getStartingOffset() >= stoppingOffset;
+        this.logScanFinished =
+                lakeSnapshotAndFlussLogSplit.getStartingOffset() >= 
stoppingOffset
+                        || stoppingOffset <= 0;

Review Comment:
   The condition `stoppingOffset <= 0` may not be the correct check. If 
stoppingOffset is exactly 0, this means we want to stop at offset 0, not that 
the scan should be finished immediately. Consider whether the intended logic is 
`stoppingOffset < 0` to indicate an invalid or uninitialized offset, or if this 
check should be removed entirely.
   ```suggestion
                           || stoppingOffset < 0;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to