This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new eeb7370c7 [core] Clean StreamTableScan public interface (#1993)
eeb7370c7 is described below
commit eeb7370c7bea1527036702662b52f2d3521743b5
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Sep 13 15:49:23 2023 +0800
[core] Clean StreamTableScan public interface (#1993)
---
.../paimon/table/source/InnerStreamTableScan.java | 12 ++-
.../table/source/InnerStreamTableScanImpl.java | 24 ++++--
.../org/apache/paimon/table/source/RichPlan.java | 47 ----------
.../org/apache/paimon/table/source/ScanMode.java | 9 +-
.../paimon/table/source/SnapshotNotExistPlan.java | 21 +----
.../paimon/table/source/StreamTableScan.java | 11 +--
.../snapshot/IncrementalStartingScanner.java | 6 --
.../table/source/snapshot/SnapshotReader.java | 16 +++-
.../table/source/snapshot/SnapshotReaderImpl.java | 11 ---
.../table/source/snapshot/StartingScanner.java | 7 ++
.../apache/paimon/table/system/AuditLogTable.java | 9 +-
.../flink/source/operator/MonitorFunction.java | 6 +-
.../source/ContinuousFileSplitEnumeratorTest.java | 99 +++++++---------------
13 files changed, 95 insertions(+), 183 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
index e36a40e49..b395cc9a8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
@@ -18,5 +18,15 @@
package org.apache.paimon.table.source;
+import org.apache.paimon.table.source.snapshot.StartingContext;
+
+import javax.annotation.Nullable;
+
/** Streaming {@link InnerTableScan} with {@link StreamTableScan}. */
-public interface InnerStreamTableScan extends InnerTableScan, StreamTableScan
{}
+public interface InnerStreamTableScan extends InnerTableScan, StreamTableScan {
+
+ StartingContext startingContext();
+
+ /** Restore from checkpoint next snapshot id with scan kind. */
+ void restore(@Nullable Long nextSnapshotId, boolean scanAllSnapshot);
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index 0968ac361..1de22a644 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -53,12 +53,13 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
private final boolean supportStreamingReadOverwrite;
private final DefaultValueAssigner defaultValueAssigner;
- private boolean inited = false;
+ private boolean initialized = false;
private StartingScanner startingScanner;
private FollowUpScanner followUpScanner;
private BoundedChecker boundedChecker;
private boolean isFullPhaseEnd = false;
+ @Nullable private Long currentWatermark;
@Nullable private Long nextSnapshotId;
public InnerStreamTableScanImpl(
@@ -82,15 +83,15 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
@Override
public StartingContext startingContext() {
- if (!inited) {
+ if (!initialized) {
initScanner();
}
return startingScanner.startingContext();
}
@Override
- public RichPlan plan() {
- if (!inited) {
+ public Plan plan() {
+ if (!initialized) {
initScanner();
}
@@ -111,13 +112,14 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
if (boundedChecker == null) {
boundedChecker = createBoundedChecker();
}
- inited = true;
+ initialized = true;
}
- private RichPlan tryFirstPlan() {
+ private Plan tryFirstPlan() {
StartingScanner.Result result = startingScanner.scan(snapshotReader);
if (result instanceof ScannedResult) {
ScannedResult scannedResult = (ScannedResult) result;
+ currentWatermark = scannedResult.currentWatermark();
long currentSnapshotId = scannedResult.currentSnapshotId();
nextSnapshotId = currentSnapshotId + 1;
isFullPhaseEnd =
@@ -133,7 +135,7 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
return SnapshotNotExistPlan.INSTANCE;
}
- private RichPlan nextPlan() {
+ private Plan nextPlan() {
while (true) {
if (isFullPhaseEnd) {
throw new EndOfScanException();
@@ -167,11 +169,13 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
LOG.debug("Find overwrite snapshot id {}.", nextSnapshotId);
SnapshotReader.Plan overwritePlan =
followUpScanner.getOverwriteChangesPlan(nextSnapshotId, snapshotReader);
+ currentWatermark = overwritePlan.watermark();
nextSnapshotId++;
return overwritePlan;
} else if (followUpScanner.shouldScanSnapshot(snapshot)) {
LOG.debug("Find snapshot id {}.", nextSnapshotId);
SnapshotReader.Plan plan =
followUpScanner.scan(nextSnapshotId, snapshotReader);
+ currentWatermark = plan.watermark();
nextSnapshotId++;
return plan;
} else {
@@ -233,6 +237,12 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
return nextSnapshotId;
}
+ @Nullable
+ @Override
+ public Long watermark() {
+ return currentWatermark;
+ }
+
@Override
public void restore(@Nullable Long nextSnapshotId) {
this.nextSnapshotId = nextSnapshotId;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/RichPlan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/RichPlan.java
deleted file mode 100644
index e592f4648..000000000
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/RichPlan.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source;
-
-import org.apache.paimon.annotation.Public;
-
-import javax.annotation.Nullable;
-
-/**
- * Rich Plan of scan.
- *
- * @since 0.6.0
- */
-@Public
-public interface RichPlan extends TableScan.Plan {
-
- /** Current watermark for consumed snapshot. */
- @Nullable
- Long watermark();
-
- /**
- * Snapshot id of this plan.
- *
- * @return null if the table is empty.
- */
- @Nullable
- Long snapshotId();
-
- /** Scan which part of the snapshot. */
- ScanMode scanMode();
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ScanMode.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ScanMode.java
index 10bc47c83..1a9731729 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ScanMode.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ScanMode.java
@@ -18,14 +18,7 @@
package org.apache.paimon.table.source;
-import org.apache.paimon.annotation.Public;
-
-/**
- * Scan which part of the snapshot.
- *
- * @since 0.6.0
- */
-@Public
+/** Scan which part of the snapshot. */
public enum ScanMode {
/** Scan complete data files of a snapshot. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
index 358a6ce7e..df58c8067 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
@@ -18,13 +18,11 @@
package org.apache.paimon.table.source;
-import javax.annotation.Nullable;
-
import java.util.Collections;
import java.util.List;
/** This is used to distinguish the case where the snapshot does not exist and
the plan is empty. */
-public class SnapshotNotExistPlan implements RichPlan {
+public class SnapshotNotExistPlan implements TableScan.Plan {
public static final SnapshotNotExistPlan INSTANCE = new
SnapshotNotExistPlan();
private SnapshotNotExistPlan() {
@@ -35,21 +33,4 @@ public class SnapshotNotExistPlan implements RichPlan {
public List<Split> splits() {
return Collections.emptyList();
}
-
- @Nullable
- @Override
- public Long watermark() {
- return null;
- }
-
- @Nullable
- @Override
- public Long snapshotId() {
- return null;
- }
-
- @Override
- public ScanMode scanMode() {
- throw new UnsupportedOperationException();
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
index 2df743428..c0a900bcf 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
@@ -19,7 +19,6 @@
package org.apache.paimon.table.source;
import org.apache.paimon.annotation.Public;
-import org.apache.paimon.table.source.snapshot.StartingContext;
import org.apache.paimon.utils.Restorable;
import javax.annotation.Nullable;
@@ -34,18 +33,14 @@ import javax.annotation.Nullable;
@Public
public interface StreamTableScan extends TableScan, Restorable<Long> {
- StartingContext startingContext();
-
- @Override
- RichPlan plan();
+ /** Current watermark for consumed snapshot. */
+ @Nullable
+ Long watermark();
/** Restore from checkpoint next snapshot id. */
@Override
void restore(@Nullable Long nextSnapshotId);
- /** Restore from checkpoint next snapshot id with scan kind. */
- void restore(@Nullable Long nextSnapshotId, boolean scanAllSnapshot);
-
/** Checkpoint to return next snapshot id. */
@Nullable
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index 4e750688b..aabb0dc79 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -85,12 +85,6 @@ public class IncrementalStartingScanner extends
AbstractStartingScanner {
return endingSnapshotId;
}
- @Override
- public ScanMode scanMode() {
- // TODO introduce a new mode
- throw new UnsupportedOperationException();
- }
-
@Override
public List<Split> splits() {
return (List) result;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 7cba985f8..142d4b5fa 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -23,13 +23,15 @@ import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.RichPlan;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
+import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.SnapshotManager;
+import javax.annotation.Nullable;
+
import java.util.List;
/** Read splits from specified {@link Snapshot} with given configuration. */
@@ -67,7 +69,17 @@ public interface SnapshotReader {
List<BinaryRow> partitions();
/** Result plan of this scan. */
- interface Plan extends RichPlan {
+ interface Plan extends TableScan.Plan {
+
+ @Nullable
+ Long watermark();
+
+ /**
+ * Snapshot id of this plan, return null if the table is empty or the
manifest list is
+ * specified.
+ */
+ @Nullable
+ Long snapshotId();
/** Result splits. */
List<Split> splits();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index b335524d5..391da875f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -207,11 +207,6 @@ public class SnapshotReaderImpl implements SnapshotReader {
return plan.snapshotId();
}
- @Override
- public ScanMode scanMode() {
- return plan.scanMode();
- }
-
@Override
public List<Split> splits() {
return (List) splits;
@@ -314,12 +309,6 @@ public class SnapshotReaderImpl implements SnapshotReader {
return plan.snapshotId();
}
- @Override
- public ScanMode scanMode() {
- // TODO introduce a new mode
- throw new UnsupportedOperationException();
- }
-
@Override
public List<Split> splits() {
return (List) splits;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
index b09bfe298..98dcaf669 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
@@ -21,6 +21,8 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.TableScan;
+import javax.annotation.Nullable;
+
import java.util.List;
/** Helper class for the first planning of {@link TableScan}. */
@@ -53,6 +55,11 @@ public interface StartingScanner {
return plan.snapshotId();
}
+ @Nullable
+ public Long currentWatermark() {
+ return plan.watermark();
+ }
+
public List<DataSplit> splits() {
return (List) plan.splits();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index cd2390e6a..f84ddcb59 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -39,7 +39,6 @@ import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
-import org.apache.paimon.table.source.RichPlan;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
@@ -314,7 +313,7 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
}
@Override
- public RichPlan plan() {
+ public Plan plan() {
return streamScan.plan();
}
@@ -329,6 +328,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return streamScan.checkpoint();
}
+ @Nullable
+ @Override
+ public Long watermark() {
+ return streamScan.watermark();
+ }
+
@Override
public void restore(@Nullable Long nextSnapshotId) {
streamScan.restore(nextSnapshotId);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
index 2905d8a30..30ac622ce 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
@@ -22,7 +22,6 @@ import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.ReadBuilder;
-import org.apache.paimon.table.source.RichPlan;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
@@ -181,13 +180,12 @@ public class MonitorFunction extends
RichSourceFunction<Split>
return;
}
try {
- RichPlan plan = scan.plan();
- List<Split> splits = plan.splits();
+ List<Split> splits = scan.plan().splits();
isEmpty = splits.isEmpty();
splits.forEach(ctx::collect);
if (emitSnapshotWatermark) {
- Long watermark = plan.watermark();
+ Long watermark = scan.watermark();
if (watermark != null) {
ctx.emitWatermark(new Watermark(watermark));
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index 0a02ac512..d02cd2b89 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -21,14 +21,12 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
-import org.apache.paimon.table.source.RichPlan;
-import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.SnapshotNotExistPlan;
-import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
-import org.apache.paimon.table.source.snapshot.StartingContext;
+import org.apache.paimon.table.source.TableScan;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
@@ -202,7 +200,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(0, "test-host");
context.registerReader(1, "test-host");
- TreeMap<Long, RichPlan> results = new TreeMap<>();
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
MockScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
@@ -218,7 +216,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 4; i++) {
splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
}
- results.put(1L, new MockPlan(splits));
+ results.put(1L, new DataFilePlan(splits));
context.triggerAllActions();
// assign to task 0
@@ -271,7 +269,7 @@ public class ContinuousFileSplitEnumeratorTest {
new TestingSplitEnumeratorContext<>(3);
context.registerReader(0, "test-host");
- TreeMap<Long, RichPlan> results = new TreeMap<>();
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
StreamTableScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
@@ -286,7 +284,7 @@ public class ContinuousFileSplitEnumeratorTest {
long snapshot = 0;
List<DataSplit> splits = new ArrayList<>();
splits.add(createDataSplit(snapshot, 1, Collections.emptyList()));
- results.put(1L, new MockPlan(splits));
+ results.put(1L, new DataFilePlan(splits));
context.triggerAllActions();
// assign to task 0
@@ -298,7 +296,7 @@ public class ContinuousFileSplitEnumeratorTest {
splits.clear();
splits.add(createDataSplit(snapshot, 2, Collections.emptyList()));
- results.put(2L, new MockPlan(splits));
+ results.put(2L, new DataFilePlan(splits));
context.triggerAllActions();
// assign to task 0
@@ -317,7 +315,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(2, "test-host");
context.registerReader(3, "test-host");
- TreeMap<Long, RichPlan> results = new TreeMap<>();
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
StreamTableScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
@@ -334,7 +332,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 100; i++) {
splits.add(createDataSplit(snapshot, 0, Collections.emptyList()));
}
- results.put(1L, new MockPlan(splits));
+ results.put(1L, new DataFilePlan(splits));
context.triggerAllActions();
// assign to task 0
@@ -380,7 +378,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(2, "test-host");
context.registerReader(3, "test-host");
- TreeMap<Long, RichPlan> results = new TreeMap<>();
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
MockScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
@@ -409,7 +407,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 100; i++) {
splits.add(createDataSplit(snapshot, 0, Collections.emptyList()));
}
- results.put(1L, new MockPlan(splits));
+ results.put(1L, new DataFilePlan(splits));
// trigger assign task 0 and task 1 will get their assignment
context.triggerAllActions();
@@ -438,7 +436,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(0, "test-host");
context.registerReader(1, "test-host");
- TreeMap<Long, RichPlan> results = new TreeMap<>();
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
StreamTableScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
@@ -455,7 +453,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 4; i++) {
splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
}
- results.put(1L, new MockPlan(splits));
+ results.put(1L, new DataFilePlan(splits));
context.triggerAllActions();
// assign to task 0
@@ -479,7 +477,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(0, "test-host");
context.registerReader(1, "test-host");
- TreeMap<Long, RichPlan> results = new TreeMap<>();
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
StreamTableScan scan = new MockScan(results);
ContinuousFileSplitEnumerator enumerator =
new Builder()
@@ -497,7 +495,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 4; i++) {
splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
}
- results.put(1L, new MockPlan(splits));
+ results.put(1L, new DataFilePlan(splits));
context.registeredReaders().remove(1);
// assign to task 0
@@ -512,7 +510,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(0, "test-host");
context.registerReader(1, "test-host");
- TreeMap<Long, RichPlan> results = new TreeMap<>();
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
MockScan scan = new MockScan(results);
scan.allowEnd(false);
ContinuousFileSplitEnumerator enumerator =
@@ -529,7 +527,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 4; i++) {
splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
}
- results.put(1L, new MockPlan(splits));
+ results.put(1L, new DataFilePlan(splits));
// request directly
enumerator.handleSplitRequest(0, "test-host");
@@ -557,7 +555,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(2, "test-host");
context.registerReader(3, "test-host");
- TreeMap<Long, RichPlan> results = new TreeMap<>();
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
MockScan scan = new MockScan(results);
scan.allowEnd(false);
ContinuousFileSplitEnumerator enumerator =
@@ -576,7 +574,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 0; i < 2; i++) {
splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
}
- results.put(1L, new MockPlan(splits));
+ results.put(1L, new DataFilePlan(splits));
// will not trigger scan here
enumerator.handleSplitRequest(0, "test-host");
@@ -604,7 +602,7 @@ public class ContinuousFileSplitEnumeratorTest {
for (int i = 2; i < 4; i++) {
splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
}
- results.put(2L, new MockPlan(splits));
+ results.put(2L, new DataFilePlan(splits));
// because blockScanByRequest = false, so this request will trigger
scan
enumerator.handleSplitRequest(2, "test-host");
context.getExecutorService().triggerAllNonPeriodicTasks();
@@ -622,7 +620,7 @@ public class ContinuousFileSplitEnumeratorTest {
context.getExecutorService().triggerAllNonPeriodicTasks();
splits.clear();
splits.add(createDataSplit(snapshot, 7, Collections.emptyList()));
- results.put(3L, new MockPlan(splits));
+ results.put(3L, new DataFilePlan(splits));
// this won't trigger scan, cause blockScanByRequest = true
enumerator.handleSplitRequest(3, "test-host");
@@ -649,13 +647,13 @@ public class ContinuousFileSplitEnumeratorTest {
context.registerReader(0, "test-host");
// prepare test data
- TreeMap<Long, RichPlan> results = new TreeMap<>();
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
Map<Long, List<DataSplit>> expectedResults = new HashMap<>(4);
StreamTableScan scan = new MockScan(results);
for (int i = 1; i <= 4; i++) {
List<DataSplit> dataSplits =
Collections.singletonList(createDataSplit(i, 0,
Collections.emptyList()));
- results.put((long) i, new MockPlan(dataSplits));
+ results.put((long) i, new DataFilePlan(dataSplits));
expectedResults.put((long) i, dataSplits);
}
@@ -789,56 +787,20 @@ public class ContinuousFileSplitEnumeratorTest {
}
}
- private static class MockPlan implements RichPlan {
-
- private final List<DataSplit> splits;
-
- public MockPlan(List<DataSplit> splits) {
- this.splits = splits;
- }
-
- @Nullable
- @Override
- public Long watermark() {
- throw new UnsupportedOperationException();
- }
-
- @Nullable
- @Override
- public Long snapshotId() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ScanMode scanMode() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public List<Split> splits() {
- return (List) splits;
- }
- }
-
private static class MockScan implements StreamTableScan {
- private final TreeMap<Long, RichPlan> results;
+ private final TreeMap<Long, Plan> results;
private @Nullable Long nextSnapshotId;
private boolean allowEnd = true;
- public MockScan(TreeMap<Long, RichPlan> results) {
+ public MockScan(TreeMap<Long, Plan> results) {
this.results = results;
this.nextSnapshotId = null;
}
@Override
- public StartingContext startingContext() {
- return null;
- }
-
- @Override
- public RichPlan plan() {
- Map.Entry<Long, RichPlan> planEntry = results.pollFirstEntry();
+ public Plan plan() {
+ Map.Entry<Long, Plan> planEntry = results.pollFirstEntry();
if (planEntry == null) {
if (allowEnd) {
throw new EndOfScanException();
@@ -863,11 +825,14 @@ public class ContinuousFileSplitEnumeratorTest {
@Override
public void notifyCheckpointComplete(@Nullable Long nextSnapshot) {}
+ @Nullable
@Override
- public void restore(Long state) {}
+ public Long watermark() {
+ return null;
+ }
@Override
- public void restore(@Nullable Long nextSnapshotId, boolean
scanAllSnapshot) {}
+ public void restore(Long state) {}
public void allowEnd(boolean allowEnd) {
this.allowEnd = allowEnd;