This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3eb1df03b [bug] Bounded Stream should end directly if the full scan 
snapshot should be end (#677)
3eb1df03b is described below

commit 3eb1df03bc37a12de6a904f7db7c835e092badcb
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 22 11:38:15 2023 +0800

    [bug] Bounded Stream should end directly if the full scan snapshot should 
be end (#677)
---
 .../paimon/table/source/StreamDataTableScan.java   |  3 +
 .../table/source/StreamDataTableScanImpl.java      | 38 ++++++++---
 .../{FollowUpScanner.java => BoundedChecker.java}  | 27 +++-----
 .../snapshot/BoundedWatermarkFollowUpScanner.java  | 58 -----------------
 .../table/source/snapshot/FollowUpScanner.java     |  8 ---
 .../apache/paimon/table/system/AuditLogTable.java  |  6 ++
 .../table/source/StreamDataTableScanTest.java      | 76 ++++++++++++++++++++++
 ...rTest.java => BoundedWatermarkCheckerTest.java} | 15 ++---
 .../flink/source/CompactorSourceBuilder.java       |  4 +-
 9 files changed, 134 insertions(+), 101 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
index bdfdb7c91..d125aec90 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.DataTable;
+import org.apache.paimon.table.source.snapshot.BoundedChecker;
 import org.apache.paimon.table.source.snapshot.FollowUpScanner;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
 
@@ -38,6 +39,8 @@ public interface StreamDataTableScan extends DataTableScan, 
InnerStreamTableScan
 
     StreamDataTableScan withFollowUpScanner(FollowUpScanner followUpScanner);
 
+    StreamDataTableScan withBoundedChecker(BoundedChecker boundedChecker);
+
     StreamDataTableScan withSnapshotStarting();
 
     static void validate(TableSchema schema) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
index 9d1135ea7..bf5336c08 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
@@ -20,7 +20,7 @@ package org.apache.paimon.table.source;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.table.source.snapshot.BoundedWatermarkFollowUpScanner;
+import org.apache.paimon.table.source.snapshot.BoundedChecker;
 import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
 import 
org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
 import org.apache.paimon.table.source.snapshot.DeltaFollowUpScanner;
@@ -47,6 +47,8 @@ public class StreamDataTableScanImpl extends 
AbstractDataTableScan implements St
 
     private StartingScanner startingScanner;
     private FollowUpScanner followUpScanner;
+    private BoundedChecker boundedChecker;
+    private boolean isEnd = false;
     @Nullable private Long nextSnapshotId;
 
     public StreamDataTableScanImpl(
@@ -65,16 +67,24 @@ public class StreamDataTableScanImpl extends 
AbstractDataTableScan implements St
         return supportStreamingReadOverwrite;
     }
 
+    @Override
     public StreamDataTableScan withStartingScanner(StartingScanner 
startingScanner) {
         this.startingScanner = startingScanner;
         return this;
     }
 
+    @Override
     public StreamDataTableScan withFollowUpScanner(FollowUpScanner 
followUpScanner) {
         this.followUpScanner = followUpScanner;
         return this;
     }
 
+    @Override
+    public StreamDataTableScan withBoundedChecker(BoundedChecker 
boundedChecker) {
+        this.boundedChecker = boundedChecker;
+        return this;
+    }
+
     @Override
     public StreamDataTableScan withSnapshotStarting() {
         startingScanner =
@@ -93,6 +103,9 @@ public class StreamDataTableScanImpl extends 
AbstractDataTableScan implements St
         if (followUpScanner == null) {
             followUpScanner = createFollowUpScanner();
         }
+        if (boundedChecker == null) {
+            boundedChecker = createBoundedChecker();
+        }
 
         if (nextSnapshotId == null) {
             return tryFirstPlan();
@@ -105,13 +118,21 @@ public class StreamDataTableScanImpl extends 
AbstractDataTableScan implements St
         DataTableScan.DataFilePlan plan =
                 startingScanner.getPlan(snapshotManager, snapshotSplitReader);
         if (plan != null) {
-            nextSnapshotId = plan.snapshotId + 1;
+            long snapshot = plan.snapshotId;
+            nextSnapshotId = snapshot + 1;
+            if 
(boundedChecker.shouldEndInput(snapshotManager.snapshot(snapshot))) {
+                isEnd = true;
+            }
         }
         return plan;
     }
 
     private DataFilePlan nextPlan() {
         while (true) {
+            if (isEnd) {
+                throw new EndOfScanException();
+            }
+
             if (!snapshotManager.snapshotExists(nextSnapshotId)) {
                 LOG.debug(
                         "Next snapshot id {} does not exist, wait for the 
snapshot generation.",
@@ -121,7 +142,7 @@ public class StreamDataTableScanImpl extends 
AbstractDataTableScan implements St
 
             Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
 
-            if (followUpScanner.shouldEndInput(snapshot)) {
+            if (boundedChecker.shouldEndInput(snapshot)) {
                 throw new EndOfScanException();
             }
 
@@ -170,13 +191,14 @@ public class StreamDataTableScanImpl extends 
AbstractDataTableScan implements St
                 throw new UnsupportedOperationException(
                         "Unknown changelog producer " + 
changelogProducer.name());
         }
+        return followUpScanner;
+    }
 
+    private BoundedChecker createBoundedChecker() {
         Long boundedWatermark = options.scanBoundedWatermark();
-        if (boundedWatermark != null) {
-            followUpScanner =
-                    new BoundedWatermarkFollowUpScanner(followUpScanner, 
boundedWatermark);
-        }
-        return followUpScanner;
+        return boundedWatermark != null
+                ? BoundedChecker.watermark(boundedWatermark)
+                : BoundedChecker.neverEnd();
     }
 
     @Nullable
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/BoundedChecker.java
similarity index 53%
copy from 
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
copy to 
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/BoundedChecker.java
index 8e7e99dda..254d0cc85 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/BoundedChecker.java
@@ -19,27 +19,20 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.table.source.DataTableScan;
-import org.apache.paimon.table.source.StreamDataTableScan;
 
-/** Helper class for the follow-up planning of {@link StreamDataTableScan}. */
-public interface FollowUpScanner {
+/** Checker to check whether the bounded stream is end. */
+public interface BoundedChecker {
 
-    boolean shouldScanSnapshot(Snapshot snapshot);
+    boolean shouldEndInput(Snapshot snapshot);
 
-    default boolean shouldEndInput(Snapshot snapshot) {
-        return false;
+    static BoundedChecker neverEnd() {
+        return snapshot -> false;
     }
 
-    default boolean isBounded() {
-        return false;
-    }
-
-    DataTableScan.DataFilePlan getPlan(long snapshotId, SnapshotSplitReader 
snapshotSplitReader);
-
-    default DataTableScan.DataFilePlan getOverwriteChangesPlan(
-            long snapshotId, SnapshotSplitReader snapshotSplitReader) {
-        return new DataTableScan.DataFilePlan(
-                snapshotId, 
snapshotSplitReader.withSnapshot(snapshotId).overwriteSplits());
+    static BoundedChecker watermark(long boundedWatermark) {
+        return snapshot -> {
+            Long watermark = snapshot.watermark();
+            return watermark != null && watermark > boundedWatermark;
+        };
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkFollowUpScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkFollowUpScanner.java
deleted file mode 100644
index 3bee98fec..000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkFollowUpScanner.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source.snapshot;
-
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.table.source.DataTableScan.DataFilePlan;
-
-/**
- * {@link FollowUpScanner} for bounded watermark, end scanning when a snapshot 
larger than bounded
- * watermark appears.
- */
-public class BoundedWatermarkFollowUpScanner implements FollowUpScanner {
-
-    private final FollowUpScanner innerScanner;
-    private final long boundedWatermark;
-
-    public BoundedWatermarkFollowUpScanner(FollowUpScanner innerScanner, long 
boundedWatermark) {
-        this.innerScanner = innerScanner;
-        this.boundedWatermark = boundedWatermark;
-    }
-
-    @Override
-    public boolean shouldScanSnapshot(Snapshot snapshot) {
-        return innerScanner.shouldScanSnapshot(snapshot);
-    }
-
-    @Override
-    public boolean shouldEndInput(Snapshot snapshot) {
-        Long watermark = snapshot.watermark();
-        return watermark != null && watermark > boundedWatermark;
-    }
-
-    @Override
-    public boolean isBounded() {
-        return true;
-    }
-
-    @Override
-    public DataFilePlan getPlan(long snapshotId, SnapshotSplitReader 
snapshotSplitReader) {
-        return innerScanner.getPlan(snapshotId, snapshotSplitReader);
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
index 8e7e99dda..b34871598 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
@@ -27,14 +27,6 @@ public interface FollowUpScanner {
 
     boolean shouldScanSnapshot(Snapshot snapshot);
 
-    default boolean shouldEndInput(Snapshot snapshot) {
-        return false;
-    }
-
-    default boolean isBounded() {
-        return false;
-    }
-
     DataTableScan.DataFilePlan getPlan(long snapshotId, SnapshotSplitReader 
snapshotSplitReader);
 
     default DataTableScan.DataFilePlan getOverwriteChangesPlan(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index b50fc17d1..2d03a1885 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -39,6 +39,7 @@ import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.snapshot.BoundedChecker;
 import org.apache.paimon.table.source.snapshot.FollowUpScanner;
 import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
@@ -300,6 +301,11 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return streamScan.withFollowUpScanner(followUpScanner);
         }
 
+        @Override
+        public StreamDataTableScan withBoundedChecker(BoundedChecker 
boundedChecker) {
+            return streamScan.withBoundedChecker(boundedChecker);
+        }
+
         @Override
         public StreamDataTableScan withSnapshotStarting() {
             return streamScan.withSnapshotStarting();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
index b7cebe3b5..95adb855d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
@@ -19,17 +19,24 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.source.snapshot.ScannerTestBase;
 import org.apache.paimon.types.RowKind;
 
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link StreamDataTableScan}. */
 public class StreamDataTableScanTest extends ScannerTestBase {
@@ -167,4 +174,73 @@ public class StreamDataTableScanTest extends 
ScannerTestBase {
         write.close();
         commit.close();
     }
+
+    @Test
+    public void testBoundedInFull() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "4");
+        FileStoreTable table = this.table.copy(options);
+        TableRead read = table.newRead();
+        StreamTableWrite write = table.newWrite(commitUser);
+        TableCommitImpl commit = table.newCommit(commitUser);
+        StreamDataTableScan scan = table.newStreamScan();
+
+        write.write(rowData(1, 10, 100L));
+        ManifestCommittable committable = new ManifestCommittable(0, 5L);
+        write.prepareCommit(true, 0).forEach(committable::addFileCommittable);
+        commit.commit(committable);
+
+        DataTableScan.DataFilePlan plan = scan.plan();
+        assertThat(plan.snapshotId).isEqualTo(1);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Collections.singletonList("+I 1|10|100"));
+
+        assertThatThrownBy(scan::plan).isInstanceOf(EndOfScanException.class);
+
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testBounded() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "8");
+        FileStoreTable table = this.table.copy(options);
+        TableRead read = table.newRead();
+        StreamTableWrite write = table.newWrite(commitUser);
+        TableCommitImpl commit = table.newCommit(commitUser);
+        StreamDataTableScan scan = table.newStreamScan();
+
+        write.write(rowData(1, 10, 100L));
+        ManifestCommittable committable = new ManifestCommittable(0, 5L);
+        write.prepareCommit(true, 0).forEach(committable::addFileCommittable);
+        commit.commit(committable);
+
+        DataTableScan.DataFilePlan plan = scan.plan();
+        assertThat(plan.snapshotId).isEqualTo(1);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Collections.singletonList("+I 1|10|100"));
+        assertThat(scan.plan()).isNull();
+
+        write.write(rowData(2, 20, 200L));
+        committable = new ManifestCommittable(0, 7L);
+        write.prepareCommit(true, 0).forEach(committable::addFileCommittable);
+        commit.commit(committable);
+
+        plan = scan.plan();
+        assertThat(plan.snapshotId).isEqualTo(2);
+        assertThat(getResult(read, plan.splits()))
+                .hasSameElementsAs(Collections.singletonList("+I 2|20|200"));
+        assertThat(scan.plan()).isNull();
+
+        write.write(rowData(3, 30, 300L));
+        committable = new ManifestCommittable(0, 9L);
+        write.prepareCommit(true, 0).forEach(committable::addFileCommittable);
+        commit.commit(committable);
+
+        assertThatThrownBy(scan::plan).isInstanceOf(EndOfScanException.class);
+
+        write.close();
+        commit.close();
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkFollowUpScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkCheckerTest.java
similarity index 72%
rename from 
paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkFollowUpScannerTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkCheckerTest.java
index c7da8f4fe..3fc7ea971 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkFollowUpScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkCheckerTest.java
@@ -27,28 +27,25 @@ import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link BoundedWatermarkFollowUpScanner}. */
-public class BoundedWatermarkFollowUpScannerTest extends ScannerTestBase {
+/** Test for {@link BoundedChecker}. */
+public class BoundedWatermarkCheckerTest extends ScannerTestBase {
 
     @Test
     public void testBounded() {
         SnapshotManager snapshotManager = table.snapshotManager();
         TableCommitImpl commit = 
table.newCommit(commitUser).ignoreEmptyCommit(false);
-        FollowUpScanner scanner =
-                new BoundedWatermarkFollowUpScanner(new 
DeltaFollowUpScanner(), 2000L);
+        BoundedChecker checker = BoundedChecker.watermark(2000L);
 
         commit.commit(new ManifestCommittable(0, 1024L));
         Snapshot snapshot = snapshotManager.latestSnapshot();
-        assertThat(scanner.shouldEndInput(snapshot)).isFalse();
-        assertThat(scanner.getPlan(snapshot.id(), 
snapshotSplitReader).splits()).isEmpty();
+        assertThat(checker.shouldEndInput(snapshot)).isFalse();
 
         commit.commit(new ManifestCommittable(0, 2000L));
         snapshot = snapshotManager.latestSnapshot();
-        assertThat(scanner.shouldEndInput(snapshot)).isFalse();
-        assertThat(scanner.getPlan(snapshot.id(), 
snapshotSplitReader).splits()).isEmpty();
+        assertThat(checker.shouldEndInput(snapshot)).isFalse();
 
         commit.commit(new ManifestCommittable(0, 2001L));
         snapshot = snapshotManager.latestSnapshot();
-        assertThat(scanner.shouldEndInput(snapshot)).isTrue();
+        assertThat(checker.shouldEndInput(snapshot)).isTrue();
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index 9afccd594..da68d09b9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -23,6 +23,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.snapshot.BoundedChecker;
 import 
org.apache.paimon.table.source.snapshot.ContinuousCompactorFollowUpScanner;
 import 
org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
 import org.apache.paimon.table.source.snapshot.FullStartingScanner;
@@ -104,7 +105,8 @@ public class CompactorSourceBuilder {
                                         .withStartingScanner(
                                                 new 
ContinuousCompactorStartingScanner())
                                         .withFollowUpScanner(
-                                                new 
ContinuousCompactorFollowUpScanner());
+                                                new 
ContinuousCompactorFollowUpScanner())
+                                        
.withBoundedChecker(BoundedChecker.neverEnd());
                         scan.restore(nextSnapshotId);
                         return scan;
                     });

Reply via email to