This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new eeb7370c7 [core] Clean StreamTableScan public interface (#1993)
eeb7370c7 is described below

commit eeb7370c7bea1527036702662b52f2d3521743b5
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Sep 13 15:49:23 2023 +0800

    [core] Clean StreamTableScan public interface (#1993)
---
 .../paimon/table/source/InnerStreamTableScan.java  | 12 ++-
 .../table/source/InnerStreamTableScanImpl.java     | 24 ++++--
 .../org/apache/paimon/table/source/RichPlan.java   | 47 ----------
 .../org/apache/paimon/table/source/ScanMode.java   |  9 +-
 .../paimon/table/source/SnapshotNotExistPlan.java  | 21 +----
 .../paimon/table/source/StreamTableScan.java       | 11 +--
 .../snapshot/IncrementalStartingScanner.java       |  6 --
 .../table/source/snapshot/SnapshotReader.java      | 16 +++-
 .../table/source/snapshot/SnapshotReaderImpl.java  | 11 ---
 .../table/source/snapshot/StartingScanner.java     |  7 ++
 .../apache/paimon/table/system/AuditLogTable.java  |  9 +-
 .../flink/source/operator/MonitorFunction.java     |  6 +-
 .../source/ContinuousFileSplitEnumeratorTest.java  | 99 +++++++---------------
 13 files changed, 95 insertions(+), 183 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
index e36a40e49..b395cc9a8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScan.java
@@ -18,5 +18,15 @@
 
 package org.apache.paimon.table.source;
 
+import org.apache.paimon.table.source.snapshot.StartingContext;
+
+import javax.annotation.Nullable;
+
 /** Streaming {@link InnerTableScan} with {@link StreamTableScan}. */
-public interface InnerStreamTableScan extends InnerTableScan, StreamTableScan 
{}
+public interface InnerStreamTableScan extends InnerTableScan, StreamTableScan {
+
+    StartingContext startingContext();
+
+    /** Restore from checkpoint next snapshot id with scan kind. */
+    void restore(@Nullable Long nextSnapshotId, boolean scanAllSnapshot);
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index 0968ac361..1de22a644 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -53,12 +53,13 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
     private final boolean supportStreamingReadOverwrite;
     private final DefaultValueAssigner defaultValueAssigner;
 
-    private boolean inited = false;
+    private boolean initialized = false;
     private StartingScanner startingScanner;
     private FollowUpScanner followUpScanner;
     private BoundedChecker boundedChecker;
 
     private boolean isFullPhaseEnd = false;
+    @Nullable private Long currentWatermark;
     @Nullable private Long nextSnapshotId;
 
     public InnerStreamTableScanImpl(
@@ -82,15 +83,15 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
 
     @Override
     public StartingContext startingContext() {
-        if (!inited) {
+        if (!initialized) {
             initScanner();
         }
         return startingScanner.startingContext();
     }
 
     @Override
-    public RichPlan plan() {
-        if (!inited) {
+    public Plan plan() {
+        if (!initialized) {
             initScanner();
         }
 
@@ -111,13 +112,14 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
         if (boundedChecker == null) {
             boundedChecker = createBoundedChecker();
         }
-        inited = true;
+        initialized = true;
     }
 
-    private RichPlan tryFirstPlan() {
+    private Plan tryFirstPlan() {
         StartingScanner.Result result = startingScanner.scan(snapshotReader);
         if (result instanceof ScannedResult) {
             ScannedResult scannedResult = (ScannedResult) result;
+            currentWatermark = scannedResult.currentWatermark();
             long currentSnapshotId = scannedResult.currentSnapshotId();
             nextSnapshotId = currentSnapshotId + 1;
             isFullPhaseEnd =
@@ -133,7 +135,7 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
         return SnapshotNotExistPlan.INSTANCE;
     }
 
-    private RichPlan nextPlan() {
+    private Plan nextPlan() {
         while (true) {
             if (isFullPhaseEnd) {
                 throw new EndOfScanException();
@@ -167,11 +169,13 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
                 LOG.debug("Find overwrite snapshot id {}.", nextSnapshotId);
                 SnapshotReader.Plan overwritePlan =
                         
followUpScanner.getOverwriteChangesPlan(nextSnapshotId, snapshotReader);
+                currentWatermark = overwritePlan.watermark();
                 nextSnapshotId++;
                 return overwritePlan;
             } else if (followUpScanner.shouldScanSnapshot(snapshot)) {
                 LOG.debug("Find snapshot id {}.", nextSnapshotId);
                 SnapshotReader.Plan plan = 
followUpScanner.scan(nextSnapshotId, snapshotReader);
+                currentWatermark = plan.watermark();
                 nextSnapshotId++;
                 return plan;
             } else {
@@ -233,6 +237,12 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
         return nextSnapshotId;
     }
 
+    @Nullable
+    @Override
+    public Long watermark() {
+        return currentWatermark;
+    }
+
     @Override
     public void restore(@Nullable Long nextSnapshotId) {
         this.nextSnapshotId = nextSnapshotId;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/RichPlan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/RichPlan.java
deleted file mode 100644
index e592f4648..000000000
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/RichPlan.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source;
-
-import org.apache.paimon.annotation.Public;
-
-import javax.annotation.Nullable;
-
-/**
- * Rich Plan of scan.
- *
- * @since 0.6.0
- */
-@Public
-public interface RichPlan extends TableScan.Plan {
-
-    /** Current watermark for consumed snapshot. */
-    @Nullable
-    Long watermark();
-
-    /**
-     * Snapshot id of this plan.
-     *
-     * @return null if the table is empty.
-     */
-    @Nullable
-    Long snapshotId();
-
-    /** Scan which part of the snapshot. */
-    ScanMode scanMode();
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ScanMode.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ScanMode.java
index 10bc47c83..1a9731729 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ScanMode.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ScanMode.java
@@ -18,14 +18,7 @@
 
 package org.apache.paimon.table.source;
 
-import org.apache.paimon.annotation.Public;
-
-/**
- * Scan which part of the snapshot.
- *
- * @since 0.6.0
- */
-@Public
+/** Scan which part of the snapshot. */
 public enum ScanMode {
 
     /** Scan complete data files of a snapshot. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
index 358a6ce7e..df58c8067 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/SnapshotNotExistPlan.java
@@ -18,13 +18,11 @@
 
 package org.apache.paimon.table.source;
 
-import javax.annotation.Nullable;
-
 import java.util.Collections;
 import java.util.List;
 
 /** This is used to distinguish the case where the snapshot does not exist and 
the plan is empty. */
-public class SnapshotNotExistPlan implements RichPlan {
+public class SnapshotNotExistPlan implements TableScan.Plan {
     public static final SnapshotNotExistPlan INSTANCE = new 
SnapshotNotExistPlan();
 
     private SnapshotNotExistPlan() {
@@ -35,21 +33,4 @@ public class SnapshotNotExistPlan implements RichPlan {
     public List<Split> splits() {
         return Collections.emptyList();
     }
-
-    @Nullable
-    @Override
-    public Long watermark() {
-        return null;
-    }
-
-    @Nullable
-    @Override
-    public Long snapshotId() {
-        return null;
-    }
-
-    @Override
-    public ScanMode scanMode() {
-        throw new UnsupportedOperationException();
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
index 2df743428..c0a900bcf 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/StreamTableScan.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.annotation.Public;
-import org.apache.paimon.table.source.snapshot.StartingContext;
 import org.apache.paimon.utils.Restorable;
 
 import javax.annotation.Nullable;
@@ -34,18 +33,14 @@ import javax.annotation.Nullable;
 @Public
 public interface StreamTableScan extends TableScan, Restorable<Long> {
 
-    StartingContext startingContext();
-
-    @Override
-    RichPlan plan();
+    /** Current watermark for consumed snapshot. */
+    @Nullable
+    Long watermark();
 
     /** Restore from checkpoint next snapshot id. */
     @Override
     void restore(@Nullable Long nextSnapshotId);
 
-    /** Restore from checkpoint next snapshot id with scan kind. */
-    void restore(@Nullable Long nextSnapshotId, boolean scanAllSnapshot);
-
     /** Checkpoint to return next snapshot id. */
     @Nullable
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
index 4e750688b..aabb0dc79 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalStartingScanner.java
@@ -85,12 +85,6 @@ public class IncrementalStartingScanner extends 
AbstractStartingScanner {
                         return endingSnapshotId;
                     }
 
-                    @Override
-                    public ScanMode scanMode() {
-                        // TODO introduce a new mode
-                        throw new UnsupportedOperationException();
-                    }
-
                     @Override
                     public List<Split> splits() {
                         return (List) result;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 7cba985f8..142d4b5fa 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -23,13 +23,15 @@ import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.RichPlan;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
+import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.SnapshotManager;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 
 /** Read splits from specified {@link Snapshot} with given configuration. */
@@ -67,7 +69,17 @@ public interface SnapshotReader {
     List<BinaryRow> partitions();
 
     /** Result plan of this scan. */
-    interface Plan extends RichPlan {
+    interface Plan extends TableScan.Plan {
+
+        @Nullable
+        Long watermark();
+
+        /**
+         * Snapshot id of this plan, return null if the table is empty or the 
manifest list is
+         * specified.
+         */
+        @Nullable
+        Long snapshotId();
 
         /** Result splits. */
         List<Split> splits();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index b335524d5..391da875f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -207,11 +207,6 @@ public class SnapshotReaderImpl implements SnapshotReader {
                 return plan.snapshotId();
             }
 
-            @Override
-            public ScanMode scanMode() {
-                return plan.scanMode();
-            }
-
             @Override
             public List<Split> splits() {
                 return (List) splits;
@@ -314,12 +309,6 @@ public class SnapshotReaderImpl implements SnapshotReader {
                 return plan.snapshotId();
             }
 
-            @Override
-            public ScanMode scanMode() {
-                // TODO introduce a new mode
-                throw new UnsupportedOperationException();
-            }
-
             @Override
             public List<Split> splits() {
                 return (List) splits;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
index b09bfe298..98dcaf669 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StartingScanner.java
@@ -21,6 +21,8 @@ package org.apache.paimon.table.source.snapshot;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.TableScan;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 
 /** Helper class for the first planning of {@link TableScan}. */
@@ -53,6 +55,11 @@ public interface StartingScanner {
             return plan.snapshotId();
         }
 
+        @Nullable
+        public Long currentWatermark() {
+            return plan.watermark();
+        }
+
         public List<DataSplit> splits() {
             return (List) plan.splits();
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index cd2390e6a..f84ddcb59 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -39,7 +39,6 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.InnerStreamTableScan;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
-import org.apache.paimon.table.source.RichPlan;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
@@ -314,7 +313,7 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         }
 
         @Override
-        public RichPlan plan() {
+        public Plan plan() {
             return streamScan.plan();
         }
 
@@ -329,6 +328,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return streamScan.checkpoint();
         }
 
+        @Nullable
+        @Override
+        public Long watermark() {
+            return streamScan.watermark();
+        }
+
         @Override
         public void restore(@Nullable Long nextSnapshotId) {
             streamScan.restore(nextSnapshotId);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
index 2905d8a30..30ac622ce 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
@@ -22,7 +22,6 @@ import org.apache.paimon.flink.utils.JavaTypeInfo;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.EndOfScanException;
 import org.apache.paimon.table.source.ReadBuilder;
-import org.apache.paimon.table.source.RichPlan;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamTableScan;
 
@@ -181,13 +180,12 @@ public class MonitorFunction extends 
RichSourceFunction<Split>
                     return;
                 }
                 try {
-                    RichPlan plan = scan.plan();
-                    List<Split> splits = plan.splits();
+                    List<Split> splits = scan.plan().splits();
                     isEmpty = splits.isEmpty();
                     splits.forEach(ctx::collect);
 
                     if (emitSnapshotWatermark) {
-                        Long watermark = plan.watermark();
+                        Long watermark = scan.watermark();
                         if (watermark != null) {
                             ctx.emitWatermark(new Watermark(watermark));
                         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index 0a02ac512..d02cd2b89 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -21,14 +21,12 @@ package org.apache.paimon.flink.source;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.source.DataFilePlan;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.EndOfScanException;
-import org.apache.paimon.table.source.RichPlan;
-import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.SnapshotNotExistPlan;
-import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamTableScan;
-import org.apache.paimon.table.source.snapshot.StartingContext;
+import org.apache.paimon.table.source.TableScan;
 
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
@@ -202,7 +200,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(0, "test-host");
         context.registerReader(1, "test-host");
 
-        TreeMap<Long, RichPlan> results = new TreeMap<>();
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
         MockScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
@@ -218,7 +216,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 4; i++) {
             splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
         }
-        results.put(1L, new MockPlan(splits));
+        results.put(1L, new DataFilePlan(splits));
         context.triggerAllActions();
 
         // assign to task 0
@@ -271,7 +269,7 @@ public class ContinuousFileSplitEnumeratorTest {
                 new TestingSplitEnumeratorContext<>(3);
         context.registerReader(0, "test-host");
 
-        TreeMap<Long, RichPlan> results = new TreeMap<>();
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
         StreamTableScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
@@ -286,7 +284,7 @@ public class ContinuousFileSplitEnumeratorTest {
         long snapshot = 0;
         List<DataSplit> splits = new ArrayList<>();
         splits.add(createDataSplit(snapshot, 1, Collections.emptyList()));
-        results.put(1L, new MockPlan(splits));
+        results.put(1L, new DataFilePlan(splits));
         context.triggerAllActions();
 
         // assign to task 0
@@ -298,7 +296,7 @@ public class ContinuousFileSplitEnumeratorTest {
 
         splits.clear();
         splits.add(createDataSplit(snapshot, 2, Collections.emptyList()));
-        results.put(2L, new MockPlan(splits));
+        results.put(2L, new DataFilePlan(splits));
         context.triggerAllActions();
 
         // assign to task 0
@@ -317,7 +315,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(2, "test-host");
         context.registerReader(3, "test-host");
 
-        TreeMap<Long, RichPlan> results = new TreeMap<>();
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
         StreamTableScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
@@ -334,7 +332,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 100; i++) {
             splits.add(createDataSplit(snapshot, 0, Collections.emptyList()));
         }
-        results.put(1L, new MockPlan(splits));
+        results.put(1L, new DataFilePlan(splits));
         context.triggerAllActions();
 
         // assign to task 0
@@ -380,7 +378,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(2, "test-host");
         context.registerReader(3, "test-host");
 
-        TreeMap<Long, RichPlan> results = new TreeMap<>();
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
         MockScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
@@ -409,7 +407,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 100; i++) {
             splits.add(createDataSplit(snapshot, 0, Collections.emptyList()));
         }
-        results.put(1L, new MockPlan(splits));
+        results.put(1L, new DataFilePlan(splits));
         // trigger assign task 0 and task 1 will get their assignment
         context.triggerAllActions();
 
@@ -438,7 +436,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(0, "test-host");
         context.registerReader(1, "test-host");
 
-        TreeMap<Long, RichPlan> results = new TreeMap<>();
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
         StreamTableScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
@@ -455,7 +453,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 4; i++) {
             splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
         }
-        results.put(1L, new MockPlan(splits));
+        results.put(1L, new DataFilePlan(splits));
         context.triggerAllActions();
 
         // assign to task 0
@@ -479,7 +477,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(0, "test-host");
         context.registerReader(1, "test-host");
 
-        TreeMap<Long, RichPlan> results = new TreeMap<>();
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
         StreamTableScan scan = new MockScan(results);
         ContinuousFileSplitEnumerator enumerator =
                 new Builder()
@@ -497,7 +495,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 4; i++) {
             splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
         }
-        results.put(1L, new MockPlan(splits));
+        results.put(1L, new DataFilePlan(splits));
 
         context.registeredReaders().remove(1);
         // assign to task 0
@@ -512,7 +510,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(0, "test-host");
         context.registerReader(1, "test-host");
 
-        TreeMap<Long, RichPlan> results = new TreeMap<>();
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
         MockScan scan = new MockScan(results);
         scan.allowEnd(false);
         ContinuousFileSplitEnumerator enumerator =
@@ -529,7 +527,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 4; i++) {
             splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
         }
-        results.put(1L, new MockPlan(splits));
+        results.put(1L, new DataFilePlan(splits));
 
         // request directly
         enumerator.handleSplitRequest(0, "test-host");
@@ -557,7 +555,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(2, "test-host");
         context.registerReader(3, "test-host");
 
-        TreeMap<Long, RichPlan> results = new TreeMap<>();
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
         MockScan scan = new MockScan(results);
         scan.allowEnd(false);
         ContinuousFileSplitEnumerator enumerator =
@@ -576,7 +574,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 0; i < 2; i++) {
             splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
         }
-        results.put(1L, new MockPlan(splits));
+        results.put(1L, new DataFilePlan(splits));
 
         // will not trigger scan here
         enumerator.handleSplitRequest(0, "test-host");
@@ -604,7 +602,7 @@ public class ContinuousFileSplitEnumeratorTest {
         for (int i = 2; i < 4; i++) {
             splits.add(createDataSplit(snapshot, i, Collections.emptyList()));
         }
-        results.put(2L, new MockPlan(splits));
+        results.put(2L, new DataFilePlan(splits));
         // because blockScanByRequest = false, so this request will trigger 
scan
         enumerator.handleSplitRequest(2, "test-host");
         context.getExecutorService().triggerAllNonPeriodicTasks();
@@ -622,7 +620,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.getExecutorService().triggerAllNonPeriodicTasks();
         splits.clear();
         splits.add(createDataSplit(snapshot, 7, Collections.emptyList()));
-        results.put(3L, new MockPlan(splits));
+        results.put(3L, new DataFilePlan(splits));
 
         // this won't trigger scan, cause blockScanByRequest = true
         enumerator.handleSplitRequest(3, "test-host");
@@ -649,13 +647,13 @@ public class ContinuousFileSplitEnumeratorTest {
         context.registerReader(0, "test-host");
 
         // prepare test data
-        TreeMap<Long, RichPlan> results = new TreeMap<>();
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
         Map<Long, List<DataSplit>> expectedResults = new HashMap<>(4);
         StreamTableScan scan = new MockScan(results);
         for (int i = 1; i <= 4; i++) {
             List<DataSplit> dataSplits =
                     Collections.singletonList(createDataSplit(i, 0, 
Collections.emptyList()));
-            results.put((long) i, new MockPlan(dataSplits));
+            results.put((long) i, new DataFilePlan(dataSplits));
             expectedResults.put((long) i, dataSplits);
         }
 
@@ -789,56 +787,20 @@ public class ContinuousFileSplitEnumeratorTest {
         }
     }
 
-    private static class MockPlan implements RichPlan {
-
-        private final List<DataSplit> splits;
-
-        public MockPlan(List<DataSplit> splits) {
-            this.splits = splits;
-        }
-
-        @Nullable
-        @Override
-        public Long watermark() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Nullable
-        @Override
-        public Long snapshotId() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public ScanMode scanMode() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public List<Split> splits() {
-            return (List) splits;
-        }
-    }
-
     private static class MockScan implements StreamTableScan {
 
-        private final TreeMap<Long, RichPlan> results;
+        private final TreeMap<Long, Plan> results;
         private @Nullable Long nextSnapshotId;
         private boolean allowEnd = true;
 
-        public MockScan(TreeMap<Long, RichPlan> results) {
+        public MockScan(TreeMap<Long, Plan> results) {
             this.results = results;
             this.nextSnapshotId = null;
         }
 
         @Override
-        public StartingContext startingContext() {
-            return null;
-        }
-
-        @Override
-        public RichPlan plan() {
-            Map.Entry<Long, RichPlan> planEntry = results.pollFirstEntry();
+        public Plan plan() {
+            Map.Entry<Long, Plan> planEntry = results.pollFirstEntry();
             if (planEntry == null) {
                 if (allowEnd) {
                     throw new EndOfScanException();
@@ -863,11 +825,14 @@ public class ContinuousFileSplitEnumeratorTest {
         @Override
         public void notifyCheckpointComplete(@Nullable Long nextSnapshot) {}
 
+        @Nullable
         @Override
-        public void restore(Long state) {}
+        public Long watermark() {
+            return null;
+        }
 
         @Override
-        public void restore(@Nullable Long nextSnapshotId, boolean 
scanAllSnapshot) {}
+        public void restore(Long state) {}
 
         public void allowEnd(boolean allowEnd) {
             this.allowEnd = allowEnd;

Reply via email to