luoyuxia commented on code in PR #2170:
URL: https://github.com/apache/fluss/pull/2170#discussion_r2618148866
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SortMergeReader.java:
##########
@@ -300,12 +307,27 @@ public void close() {
@Override
public boolean hasNext() {
- return changeLogRecordIterator != null &&
changeLogRecordIterator.hasNext();
Review Comment:
the next will be delete row.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java:
##########
@@ -192,23 +192,26 @@ public CloseableIterator<InternalRow> pollBatch(Duration
timeout) throws IOExcep
return currentSortMergeReader.readBatch();
} else {
if (lakeRecordIterators.isEmpty()) {
+ List<RecordReader> recordReaders = new ArrayList<>();
if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null
||
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) {
- lakeRecordIterators = Collections.emptyList();
- logRows = new LinkedHashMap<>();
Review Comment:
ProjectedRow doesn't support hash & equal. It'll throw exception if come to
here.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java:
##########
@@ -55,8 +55,16 @@ public void addSplit(SourceSplitBase split,
Queue<SourceSplitBase> boundedSplits
if (split instanceof LakeSnapshotSplit) {
boundedSplits.add(split);
} else if (split instanceof LakeSnapshotAndFlussLogSplit) {
- // lake split not finished, add to it
- if (!((LakeSnapshotAndFlussLogSplit) split).isLakeSplitFinished())
{
+ LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit =
+ (LakeSnapshotAndFlussLogSplit) split;
+ boolean isStreaming = ((LakeSnapshotAndFlussLogSplit)
split).isStreaming();
+ // if is streaming and lake split not finished, add to it
+ if (isStreaming) {
+ if (!lakeSnapshotAndFlussLogSplit.isLakeSplitFinished()) {
+ boundedSplits.add(split);
+ }
+ } else {
+ // otherwise, in batch mode, always add it
Review Comment:
in batch mode, we need to add it. otherwise, the split won't never be read &
mark as finished.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -680,6 +688,7 @@ private void handleSplitsAdd(List<SourceSplitBase> splits,
Throwable t) {
t);
}
}
+ doHandleSplitsAdd(splits);
Review Comment:
handle split firstly before mark noMoreNewSplits to true
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]