This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3eb1df03b [bug] Bounded Stream should end directly if the full scan
snapshot should be end (#677)
3eb1df03b is described below
commit 3eb1df03bc37a12de6a904f7db7c835e092badcb
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 22 11:38:15 2023 +0800
[bug] Bounded Stream should end directly if the full scan snapshot should
be end (#677)
---
.../paimon/table/source/StreamDataTableScan.java | 3 +
.../table/source/StreamDataTableScanImpl.java | 38 ++++++++---
.../{FollowUpScanner.java => BoundedChecker.java} | 27 +++-----
.../snapshot/BoundedWatermarkFollowUpScanner.java | 58 -----------------
.../table/source/snapshot/FollowUpScanner.java | 8 ---
.../apache/paimon/table/system/AuditLogTable.java | 6 ++
.../table/source/StreamDataTableScanTest.java | 76 ++++++++++++++++++++++
...rTest.java => BoundedWatermarkCheckerTest.java} | 15 ++---
.../flink/source/CompactorSourceBuilder.java | 4 +-
9 files changed, 134 insertions(+), 101 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
index bdfdb7c91..d125aec90 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScan.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.DataTable;
+import org.apache.paimon.table.source.snapshot.BoundedChecker;
import org.apache.paimon.table.source.snapshot.FollowUpScanner;
import org.apache.paimon.table.source.snapshot.StartingScanner;
@@ -38,6 +39,8 @@ public interface StreamDataTableScan extends DataTableScan,
InnerStreamTableScan
StreamDataTableScan withFollowUpScanner(FollowUpScanner followUpScanner);
+ StreamDataTableScan withBoundedChecker(BoundedChecker boundedChecker);
+
StreamDataTableScan withSnapshotStarting();
static void validate(TableSchema schema) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
index 9d1135ea7..bf5336c08 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamDataTableScanImpl.java
@@ -20,7 +20,7 @@ package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.table.source.snapshot.BoundedWatermarkFollowUpScanner;
+import org.apache.paimon.table.source.snapshot.BoundedChecker;
import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
import
org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
import org.apache.paimon.table.source.snapshot.DeltaFollowUpScanner;
@@ -47,6 +47,8 @@ public class StreamDataTableScanImpl extends
AbstractDataTableScan implements St
private StartingScanner startingScanner;
private FollowUpScanner followUpScanner;
+ private BoundedChecker boundedChecker;
+ private boolean isEnd = false;
@Nullable private Long nextSnapshotId;
public StreamDataTableScanImpl(
@@ -65,16 +67,24 @@ public class StreamDataTableScanImpl extends
AbstractDataTableScan implements St
return supportStreamingReadOverwrite;
}
+ @Override
public StreamDataTableScan withStartingScanner(StartingScanner
startingScanner) {
this.startingScanner = startingScanner;
return this;
}
+ @Override
public StreamDataTableScan withFollowUpScanner(FollowUpScanner
followUpScanner) {
this.followUpScanner = followUpScanner;
return this;
}
+ @Override
+ public StreamDataTableScan withBoundedChecker(BoundedChecker
boundedChecker) {
+ this.boundedChecker = boundedChecker;
+ return this;
+ }
+
@Override
public StreamDataTableScan withSnapshotStarting() {
startingScanner =
@@ -93,6 +103,9 @@ public class StreamDataTableScanImpl extends
AbstractDataTableScan implements St
if (followUpScanner == null) {
followUpScanner = createFollowUpScanner();
}
+ if (boundedChecker == null) {
+ boundedChecker = createBoundedChecker();
+ }
if (nextSnapshotId == null) {
return tryFirstPlan();
@@ -105,13 +118,21 @@ public class StreamDataTableScanImpl extends
AbstractDataTableScan implements St
DataTableScan.DataFilePlan plan =
startingScanner.getPlan(snapshotManager, snapshotSplitReader);
if (plan != null) {
- nextSnapshotId = plan.snapshotId + 1;
+ long snapshot = plan.snapshotId;
+ nextSnapshotId = snapshot + 1;
+ if
(boundedChecker.shouldEndInput(snapshotManager.snapshot(snapshot))) {
+ isEnd = true;
+ }
}
return plan;
}
private DataFilePlan nextPlan() {
while (true) {
+ if (isEnd) {
+ throw new EndOfScanException();
+ }
+
if (!snapshotManager.snapshotExists(nextSnapshotId)) {
LOG.debug(
"Next snapshot id {} does not exist, wait for the
snapshot generation.",
@@ -121,7 +142,7 @@ public class StreamDataTableScanImpl extends
AbstractDataTableScan implements St
Snapshot snapshot = snapshotManager.snapshot(nextSnapshotId);
- if (followUpScanner.shouldEndInput(snapshot)) {
+ if (boundedChecker.shouldEndInput(snapshot)) {
throw new EndOfScanException();
}
@@ -170,13 +191,14 @@ public class StreamDataTableScanImpl extends
AbstractDataTableScan implements St
throw new UnsupportedOperationException(
"Unknown changelog producer " +
changelogProducer.name());
}
+ return followUpScanner;
+ }
+ private BoundedChecker createBoundedChecker() {
Long boundedWatermark = options.scanBoundedWatermark();
- if (boundedWatermark != null) {
- followUpScanner =
- new BoundedWatermarkFollowUpScanner(followUpScanner,
boundedWatermark);
- }
- return followUpScanner;
+ return boundedWatermark != null
+ ? BoundedChecker.watermark(boundedWatermark)
+ : BoundedChecker.neverEnd();
}
@Nullable
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/BoundedChecker.java
similarity index 53%
copy from
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
copy to
paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/BoundedChecker.java
index 8e7e99dda..254d0cc85 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/BoundedChecker.java
@@ -19,27 +19,20 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.table.source.DataTableScan;
-import org.apache.paimon.table.source.StreamDataTableScan;
-/** Helper class for the follow-up planning of {@link StreamDataTableScan}. */
-public interface FollowUpScanner {
+/** Checker to check whether the bounded stream is end. */
+public interface BoundedChecker {
- boolean shouldScanSnapshot(Snapshot snapshot);
+ boolean shouldEndInput(Snapshot snapshot);
- default boolean shouldEndInput(Snapshot snapshot) {
- return false;
+ static BoundedChecker neverEnd() {
+ return snapshot -> false;
}
- default boolean isBounded() {
- return false;
- }
-
- DataTableScan.DataFilePlan getPlan(long snapshotId, SnapshotSplitReader
snapshotSplitReader);
-
- default DataTableScan.DataFilePlan getOverwriteChangesPlan(
- long snapshotId, SnapshotSplitReader snapshotSplitReader) {
- return new DataTableScan.DataFilePlan(
- snapshotId,
snapshotSplitReader.withSnapshot(snapshotId).overwriteSplits());
+ static BoundedChecker watermark(long boundedWatermark) {
+ return snapshot -> {
+ Long watermark = snapshot.watermark();
+ return watermark != null && watermark > boundedWatermark;
+ };
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkFollowUpScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkFollowUpScanner.java
deleted file mode 100644
index 3bee98fec..000000000
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkFollowUpScanner.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source.snapshot;
-
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.table.source.DataTableScan.DataFilePlan;
-
-/**
- * {@link FollowUpScanner} for bounded watermark, end scanning when a snapshot
larger than bounded
- * watermark appears.
- */
-public class BoundedWatermarkFollowUpScanner implements FollowUpScanner {
-
- private final FollowUpScanner innerScanner;
- private final long boundedWatermark;
-
- public BoundedWatermarkFollowUpScanner(FollowUpScanner innerScanner, long
boundedWatermark) {
- this.innerScanner = innerScanner;
- this.boundedWatermark = boundedWatermark;
- }
-
- @Override
- public boolean shouldScanSnapshot(Snapshot snapshot) {
- return innerScanner.shouldScanSnapshot(snapshot);
- }
-
- @Override
- public boolean shouldEndInput(Snapshot snapshot) {
- Long watermark = snapshot.watermark();
- return watermark != null && watermark > boundedWatermark;
- }
-
- @Override
- public boolean isBounded() {
- return true;
- }
-
- @Override
- public DataFilePlan getPlan(long snapshotId, SnapshotSplitReader
snapshotSplitReader) {
- return innerScanner.getPlan(snapshotId, snapshotSplitReader);
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
index 8e7e99dda..b34871598 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/FollowUpScanner.java
@@ -27,14 +27,6 @@ public interface FollowUpScanner {
boolean shouldScanSnapshot(Snapshot snapshot);
- default boolean shouldEndInput(Snapshot snapshot) {
- return false;
- }
-
- default boolean isBounded() {
- return false;
- }
-
DataTableScan.DataFilePlan getPlan(long snapshotId, SnapshotSplitReader
snapshotSplitReader);
default DataTableScan.DataFilePlan getOverwriteChangesPlan(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index b50fc17d1..2d03a1885 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -39,6 +39,7 @@ import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.snapshot.BoundedChecker;
import org.apache.paimon.table.source.snapshot.FollowUpScanner;
import org.apache.paimon.table.source.snapshot.SnapshotSplitReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
@@ -300,6 +301,11 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return streamScan.withFollowUpScanner(followUpScanner);
}
+ @Override
+ public StreamDataTableScan withBoundedChecker(BoundedChecker
boundedChecker) {
+ return streamScan.withBoundedChecker(boundedChecker);
+ }
+
@Override
public StreamDataTableScan withSnapshotStarting() {
return streamScan.withSnapshotStarting();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
index b7cebe3b5..95adb855d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/StreamDataTableScanTest.java
@@ -19,17 +19,24 @@
package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.source.snapshot.ScannerTestBase;
import org.apache.paimon.types.RowKind;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link StreamDataTableScan}. */
public class StreamDataTableScanTest extends ScannerTestBase {
@@ -167,4 +174,73 @@ public class StreamDataTableScanTest extends
ScannerTestBase {
write.close();
commit.close();
}
+
+ @Test
+ public void testBoundedInFull() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "4");
+ FileStoreTable table = this.table.copy(options);
+ TableRead read = table.newRead();
+ StreamTableWrite write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser);
+ StreamDataTableScan scan = table.newStreamScan();
+
+ write.write(rowData(1, 10, 100L));
+ ManifestCommittable committable = new ManifestCommittable(0, 5L);
+ write.prepareCommit(true, 0).forEach(committable::addFileCommittable);
+ commit.commit(committable);
+
+ DataTableScan.DataFilePlan plan = scan.plan();
+ assertThat(plan.snapshotId).isEqualTo(1);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Collections.singletonList("+I 1|10|100"));
+
+ assertThatThrownBy(scan::plan).isInstanceOf(EndOfScanException.class);
+
+ write.close();
+ commit.close();
+ }
+
+ @Test
+ public void testBounded() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "8");
+ FileStoreTable table = this.table.copy(options);
+ TableRead read = table.newRead();
+ StreamTableWrite write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser);
+ StreamDataTableScan scan = table.newStreamScan();
+
+ write.write(rowData(1, 10, 100L));
+ ManifestCommittable committable = new ManifestCommittable(0, 5L);
+ write.prepareCommit(true, 0).forEach(committable::addFileCommittable);
+ commit.commit(committable);
+
+ DataTableScan.DataFilePlan plan = scan.plan();
+ assertThat(plan.snapshotId).isEqualTo(1);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Collections.singletonList("+I 1|10|100"));
+ assertThat(scan.plan()).isNull();
+
+ write.write(rowData(2, 20, 200L));
+ committable = new ManifestCommittable(0, 7L);
+ write.prepareCommit(true, 0).forEach(committable::addFileCommittable);
+ commit.commit(committable);
+
+ plan = scan.plan();
+ assertThat(plan.snapshotId).isEqualTo(2);
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Collections.singletonList("+I 2|20|200"));
+ assertThat(scan.plan()).isNull();
+
+ write.write(rowData(3, 30, 300L));
+ committable = new ManifestCommittable(0, 9L);
+ write.prepareCommit(true, 0).forEach(committable::addFileCommittable);
+ commit.commit(committable);
+
+ assertThatThrownBy(scan::plan).isInstanceOf(EndOfScanException.class);
+
+ write.close();
+ commit.close();
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkFollowUpScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkCheckerTest.java
similarity index 72%
rename from
paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkFollowUpScannerTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkCheckerTest.java
index c7da8f4fe..3fc7ea971 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkFollowUpScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/BoundedWatermarkCheckerTest.java
@@ -27,28 +27,25 @@ import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test for {@link BoundedWatermarkFollowUpScanner}. */
-public class BoundedWatermarkFollowUpScannerTest extends ScannerTestBase {
+/** Test for {@link BoundedChecker}. */
+public class BoundedWatermarkCheckerTest extends ScannerTestBase {
@Test
public void testBounded() {
SnapshotManager snapshotManager = table.snapshotManager();
TableCommitImpl commit =
table.newCommit(commitUser).ignoreEmptyCommit(false);
- FollowUpScanner scanner =
- new BoundedWatermarkFollowUpScanner(new
DeltaFollowUpScanner(), 2000L);
+ BoundedChecker checker = BoundedChecker.watermark(2000L);
commit.commit(new ManifestCommittable(0, 1024L));
Snapshot snapshot = snapshotManager.latestSnapshot();
- assertThat(scanner.shouldEndInput(snapshot)).isFalse();
- assertThat(scanner.getPlan(snapshot.id(),
snapshotSplitReader).splits()).isEmpty();
+ assertThat(checker.shouldEndInput(snapshot)).isFalse();
commit.commit(new ManifestCommittable(0, 2000L));
snapshot = snapshotManager.latestSnapshot();
- assertThat(scanner.shouldEndInput(snapshot)).isFalse();
- assertThat(scanner.getPlan(snapshot.id(),
snapshotSplitReader).splits()).isEmpty();
+ assertThat(checker.shouldEndInput(snapshot)).isFalse();
commit.commit(new ManifestCommittable(0, 2001L));
snapshot = snapshotManager.latestSnapshot();
- assertThat(scanner.shouldEndInput(snapshot)).isTrue();
+ assertThat(checker.shouldEndInput(snapshot)).isTrue();
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index 9afccd594..da68d09b9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -23,6 +23,7 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.StreamDataTableScan;
+import org.apache.paimon.table.source.snapshot.BoundedChecker;
import
org.apache.paimon.table.source.snapshot.ContinuousCompactorFollowUpScanner;
import
org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
import org.apache.paimon.table.source.snapshot.FullStartingScanner;
@@ -104,7 +105,8 @@ public class CompactorSourceBuilder {
.withStartingScanner(
new
ContinuousCompactorStartingScanner())
.withFollowUpScanner(
- new
ContinuousCompactorFollowUpScanner());
+ new
ContinuousCompactorFollowUpScanner())
+
.withBoundedChecker(BoundedChecker.neverEnd());
scan.restore(nextSnapshotId);
return scan;
});