Copilot commented on code in PR #2170:
URL: https://github.com/apache/fluss/pull/2170#discussion_r2617861546
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java:
##########
@@ -192,23 +192,26 @@ public CloseableIterator<InternalRow> pollBatch(Duration
timeout) throws IOExcep
return currentSortMergeReader.readBatch();
} else {
if (lakeRecordIterators.isEmpty()) {
+ List<RecordReader> recordReaders = new ArrayList<>();
if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null
||
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) {
- lakeRecordIterators = Collections.emptyList();
- logRows = new LinkedHashMap<>();
+ // pass null split to get rowComparator
+ recordReaders.add(lakeSource.createRecordReader(() ->
null));
} else {
for (LakeSplit lakeSplit :
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) {
- RecordReader reader = lakeSource.createRecordReader(()
-> lakeSplit);
- if (reader instanceof SortedRecordReader) {
- rowComparator = ((SortedRecordReader)
reader).order();
- } else {
- throw new UnsupportedOperationException(
- "lake records must instance of sorted
view.");
- }
- lakeRecordIterators.add(reader.read());
+ recordReaders.add(lakeSource.createRecordReader(() ->
lakeSplit));
+ }
+ }
+ for (RecordReader reader : recordReaders) {
+ if (reader instanceof SortedRecordReader) {
+ rowComparator = ((SortedRecordReader) reader).order();
+ } else {
+ throw new UnsupportedOperationException(
+ "lake records must instance of sorted view.");
}
- logRows = new TreeMap<>(rowComparator);
+ lakeRecordIterators.add(reader.read());
Review Comment:
When there are no lake splits, a RecordReader is created with a null split
just to obtain the rowComparator. However, this reader is then added to
lakeRecordIterators list, which means its iterator (containing no data) will be
included in the sort merge process. This could be inefficient. Consider
extracting the rowComparator without adding the empty iterator to
lakeRecordIterators, or ensure the empty iterator is properly handled during
merging.
```suggestion
// Only add iterator if the reader is not for a null
split
// (i.e., only if there are actual splits)
if (!(lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()
== null
||
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty())) {
lakeRecordIterators.add(reader.read());
}
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java:
##########
@@ -109,7 +107,9 @@ public LakeSnapshotAndLogSplitScanner(
"StoppingOffset is null for
split: "
+
lakeSnapshotAndFlussLogSplit));
- this.logScanFinished =
lakeSnapshotAndFlussLogSplit.getStartingOffset() >= stoppingOffset;
+ this.logScanFinished =
+ lakeSnapshotAndFlussLogSplit.getStartingOffset() >=
stoppingOffset
+ || stoppingOffset <= 0;
Review Comment:
The condition `stoppingOffset <= 0` may not be the correct check. If
stoppingOffset is exactly 0, this means we want to stop at offset 0, not that
the scan should be finished immediately. Consider whether the intended logic is
`stoppingOffset < 0` to indicate an invalid or uninitialized offset, or if this
check should be removed entirely.
```suggestion
|| stoppingOffset < 0;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]