luoyuxia commented on code in PR #2170:
URL: https://github.com/apache/fluss/pull/2170#discussion_r2618148866


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SortMergeReader.java:
##########
@@ -300,12 +307,27 @@ public void close() {
 
         @Override
         public boolean hasNext() {
-            return changeLogRecordIterator != null && 
changeLogRecordIterator.hasNext();

Review Comment:
   the next will be delete row.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java:
##########
@@ -192,23 +192,26 @@ public CloseableIterator<InternalRow> pollBatch(Duration 
timeout) throws IOExcep
             return currentSortMergeReader.readBatch();
         } else {
             if (lakeRecordIterators.isEmpty()) {
+                List<RecordReader> recordReaders = new ArrayList<>();
                 if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null
                         || 
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) {
-                    lakeRecordIterators = Collections.emptyList();
-                    logRows = new LinkedHashMap<>();

Review Comment:
   ProjectedRow doesn't support hash & equal. It'll throw exception if come to 
here.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java:
##########
@@ -55,8 +55,16 @@ public void addSplit(SourceSplitBase split, 
Queue<SourceSplitBase> boundedSplits
         if (split instanceof LakeSnapshotSplit) {
             boundedSplits.add(split);
         } else if (split instanceof LakeSnapshotAndFlussLogSplit) {
-            // lake split not finished, add to it
-            if (!((LakeSnapshotAndFlussLogSplit) split).isLakeSplitFinished()) 
{
+            LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit =
+                    (LakeSnapshotAndFlussLogSplit) split;
+            boolean isStreaming = ((LakeSnapshotAndFlussLogSplit) 
split).isStreaming();
+            // if is streaming and lake split not finished, add to it
+            if (isStreaming) {
+                if (!lakeSnapshotAndFlussLogSplit.isLakeSplitFinished()) {
+                    boundedSplits.add(split);
+                }
+            } else {
+                // otherwise, in batch mode, always add it

Review Comment:
   in batch mode, we need to add it. otherwise, the split won't never be read & 
mark as finished.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -680,6 +688,7 @@ private void handleSplitsAdd(List<SourceSplitBase> splits, 
Throwable t) {
                         t);
             }
         }
+        doHandleSplitsAdd(splits);

Review Comment:
   handle split firstly before mark noMoreNewSplits to true



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to