This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d0b9c20 [core] Introduce StreamTableScan to support checkpoint and 
restore
6d0b9c20 is described below

commit 6d0b9c20669ea953f8a38463c26c5b8905500d52
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 15 09:07:59 2023 +0800

    [core] Introduce StreamTableScan to support checkpoint and restore
    
    This closes #602
---
 .../apache/flink/table/store/table/InnerTable.java |  3 +-
 .../flink/table/store/table/ReadonlyTable.java     |  4 +--
 .../store/table/source/InnerStreamTableScan.java   | 22 ++++++++++++
 .../table/store/table/source/ReadBuilder.java      |  2 +-
 .../table/store/table/source/ReadBuilderImpl.java  |  4 +--
 .../store/table/source/StreamDataTableScan.java    |  8 ++---
 .../table/source/StreamDataTableScanImpl.java      | 17 ++++++----
 .../table/store/table/source/StreamTableScan.java  | 39 ++++++++++++++++++++++
 .../table/store/table/system/AuditLogTable.java    | 14 +++++---
 .../table/ChangelogWithKeyFileStoreTableTest.java  |  9 ++---
 .../connector/source/CompactorSourceBuilder.java   | 16 ++++++---
 11 files changed, 109 insertions(+), 29 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/InnerTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/InnerTable.java
index 42a0db75..26af910e 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/InnerTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/InnerTable.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.table.store.table.sink.InnerTableCommit;
 import org.apache.flink.table.store.table.sink.InnerTableWrite;
 import org.apache.flink.table.store.table.sink.StreamWriteBuilder;
 import org.apache.flink.table.store.table.sink.StreamWriteBuilderImpl;
+import org.apache.flink.table.store.table.source.InnerStreamTableScan;
 import org.apache.flink.table.store.table.source.InnerTableRead;
 import org.apache.flink.table.store.table.source.InnerTableScan;
 import org.apache.flink.table.store.table.source.ReadBuilder;
@@ -34,7 +35,7 @@ public interface InnerTable extends Table {
 
     InnerTableScan newScan();
 
-    InnerTableScan newStreamScan();
+    InnerStreamTableScan newStreamScan();
 
     InnerTableRead newRead();
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ReadonlyTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ReadonlyTable.java
index e37ddbeb..ac25a2a7 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ReadonlyTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ReadonlyTable.java
@@ -22,7 +22,7 @@ import 
org.apache.flink.table.store.table.sink.BatchWriteBuilder;
 import org.apache.flink.table.store.table.sink.InnerTableCommit;
 import org.apache.flink.table.store.table.sink.InnerTableWrite;
 import org.apache.flink.table.store.table.sink.StreamWriteBuilder;
-import org.apache.flink.table.store.table.source.InnerTableScan;
+import org.apache.flink.table.store.table.source.InnerStreamTableScan;
 
 /** Readonly table which only provide implementation for scan and read. */
 public interface ReadonlyTable extends InnerTable {
@@ -60,7 +60,7 @@ public interface ReadonlyTable extends InnerTable {
     }
 
     @Override
-    default InnerTableScan newStreamScan() {
+    default InnerStreamTableScan newStreamScan() {
         throw new UnsupportedOperationException(
                 String.format(
                         "Readonly Table %s does not support newStreamScan.",
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/InnerStreamTableScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/InnerStreamTableScan.java
new file mode 100644
index 00000000..8ddeb7ae
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/InnerStreamTableScan.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+/** Streaming {@link InnerTableScan} with {@link StreamTableScan}. */
+public interface InnerStreamTableScan extends InnerTableScan, StreamTableScan 
{}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ReadBuilder.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ReadBuilder.java
index a245508b..81f7c31e 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ReadBuilder.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ReadBuilder.java
@@ -119,7 +119,7 @@ public interface ReadBuilder extends Serializable {
     TableScan newScan();
 
     /** Create a {@link TableScan} to perform streaming planning. */
-    TableScan newStreamScan();
+    StreamTableScan newStreamScan();
 
     /** Create a {@link TableRead} to read {@link Split}s. */
     TableRead newRead();
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ReadBuilderImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ReadBuilderImpl.java
index de724113..f62b5d03 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ReadBuilderImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ReadBuilderImpl.java
@@ -72,8 +72,8 @@ public class ReadBuilderImpl implements ReadBuilder {
     }
 
     @Override
-    public TableScan newStreamScan() {
-        return table.newStreamScan().withFilter(filter);
+    public StreamTableScan newStreamScan() {
+        return (StreamTableScan) table.newStreamScan().withFilter(filter);
     }
 
     @Override
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScan.java
index 81599adc..b92ab4ec 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScan.java
@@ -32,7 +32,7 @@ import java.util.HashMap;
 import static 
org.apache.flink.table.store.CoreOptions.ChangelogProducer.FULL_COMPACTION;
 
 /** {@link DataTableScan} for streaming planning. */
-public interface StreamDataTableScan extends DataTableScan {
+public interface StreamDataTableScan extends DataTableScan, 
InnerStreamTableScan {
 
     boolean supportStreamingReadOverwrite();
 
@@ -40,8 +40,6 @@ public interface StreamDataTableScan extends DataTableScan {
 
     StreamDataTableScan withFollowUpScanner(FollowUpScanner followUpScanner);
 
-    StreamDataTableScan withNextSnapshotId(@Nullable Long nextSnapshotId);
-
     StreamDataTableScan withSnapshotStarting();
 
     static void validate(TableSchema schema) {
@@ -79,7 +77,9 @@ public interface StreamDataTableScan extends DataTableScan {
 
         @Override
         public StreamDataTableScan create(DataTable dataTable, @Nullable Long 
nextSnapshotId) {
-            return 
dataTable.newStreamScan().withNextSnapshotId(nextSnapshotId);
+            StreamDataTableScan scan = dataTable.newStreamScan();
+            scan.restore(nextSnapshotId);
+            return scan;
         }
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScanImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScanImpl.java
index 747a9b1f..de8d2f0b 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScanImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScanImpl.java
@@ -75,12 +75,6 @@ public class StreamDataTableScanImpl extends 
AbstractDataTableScan implements St
         return this;
     }
 
-    @Override
-    public StreamDataTableScan withNextSnapshotId(@Nullable Long 
nextSnapshotId) {
-        this.nextSnapshotId = nextSnapshotId;
-        return this;
-    }
-
     @Override
     public StreamDataTableScan withSnapshotStarting() {
         startingScanner =
@@ -176,4 +170,15 @@ public class StreamDataTableScanImpl extends 
AbstractDataTableScan implements St
         }
         return followUpScanner;
     }
+
+    @Nullable
+    @Override
+    public Long checkpoint() {
+        return nextSnapshotId;
+    }
+
+    @Override
+    public void restore(@Nullable Long nextSnapshotId) {
+        this.nextSnapshotId = nextSnapshotId;
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java
new file mode 100644
index 00000000..98526a1e
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.store.annotation.Experimental;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link TableScan} for streaming, supports {@link #checkpoint} and {@link 
#restore}.
+ *
+ * @since 0.4.0
+ */
+@Experimental
+public interface StreamTableScan extends TableScan {
+
+    /** Checkpoint this stream table scan, return next snapshot id. */
+    @Nullable
+    Long checkpoint();
+
+    /** Restore this stream table scan, read incremental from {@code 
nextSnapshotId}. */
+    void restore(@Nullable Long nextSnapshotId);
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
index 33b9d9ab..a58b8fc3 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
@@ -301,13 +301,19 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
         }
 
         @Override
-        public StreamDataTableScan withNextSnapshotId(@Nullable Long 
nextSnapshotId) {
-            return streamScan.withNextSnapshotId(nextSnapshotId);
+        public StreamDataTableScan withSnapshotStarting() {
+            return streamScan.withSnapshotStarting();
         }
 
+        @Nullable
         @Override
-        public StreamDataTableScan withSnapshotStarting() {
-            return streamScan.withSnapshotStarting();
+        public Long checkpoint() {
+            return streamScan.checkpoint();
+        }
+
+        @Override
+        public void restore(@Nullable Long nextSnapshotId) {
+            streamScan.restore(nextSnapshotId);
         }
     }
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 4b94835d..be234f38 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -421,10 +421,11 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 // partition 2
                                 Collections.singletonList("-D 
2|10|301|binary|varbinary")));
 
-        StreamDataTableScan scan = table.newStreamScan();
-        scan.withStartingScanner(new FullStartingScanner())
-                .withFollowUpScanner(new InputChangelogFollowUpScanner())
-                .withNextSnapshotId(1L);
+        StreamDataTableScan scan =
+                table.newStreamScan()
+                        .withStartingScanner(new FullStartingScanner())
+                        .withFollowUpScanner(new 
InputChangelogFollowUpScanner());
+        scan.restore(1L);
 
         Function<Integer, Void> assertNextSnapshot =
                 i -> {
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java
index d7fdfab9..f9443aea 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.table.store.connector.LogicalTypeConversion;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.StreamDataTableScan;
 import 
org.apache.flink.table.store.table.source.snapshot.ContinuousCompactorFollowUpScanner;
 import 
org.apache.flink.table.store.table.source.snapshot.ContinuousCompactorStartingScanner;
 import org.apache.flink.table.store.table.source.snapshot.FullStartingScanner;
@@ -96,11 +97,16 @@ public class CompactorSourceBuilder {
                     null,
                     partitionPredicate,
                     null,
-                    (table, nextSnapshotId) ->
-                            table.newStreamScan()
-                                    .withStartingScanner(new 
ContinuousCompactorStartingScanner())
-                                    .withFollowUpScanner(new 
ContinuousCompactorFollowUpScanner())
-                                    .withNextSnapshotId(nextSnapshotId));
+                    (table, nextSnapshotId) -> {
+                        StreamDataTableScan scan =
+                                table.newStreamScan()
+                                        .withStartingScanner(
+                                                new 
ContinuousCompactorStartingScanner())
+                                        .withFollowUpScanner(
+                                                new 
ContinuousCompactorFollowUpScanner());
+                        scan.restore(nextSnapshotId);
+                        return scan;
+                    });
         } else {
             return new StaticFileStoreSource(
                     bucketsTable,

Reply via email to