This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 6d0b9c20 [core] Introduce StreamTableScan to support checkpoint and
restore
6d0b9c20 is described below
commit 6d0b9c20669ea953f8a38463c26c5b8905500d52
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 15 09:07:59 2023 +0800
[core] Introduce StreamTableScan to support checkpoint and restore
This closes #602
---
.../apache/flink/table/store/table/InnerTable.java | 3 +-
.../flink/table/store/table/ReadonlyTable.java | 4 +--
.../store/table/source/InnerStreamTableScan.java | 22 ++++++++++++
.../table/store/table/source/ReadBuilder.java | 2 +-
.../table/store/table/source/ReadBuilderImpl.java | 4 +--
.../store/table/source/StreamDataTableScan.java | 8 ++---
.../table/source/StreamDataTableScanImpl.java | 17 ++++++----
.../table/store/table/source/StreamTableScan.java | 39 ++++++++++++++++++++++
.../table/store/table/system/AuditLogTable.java | 14 +++++---
.../table/ChangelogWithKeyFileStoreTableTest.java | 9 ++---
.../connector/source/CompactorSourceBuilder.java | 16 ++++++---
11 files changed, 109 insertions(+), 29 deletions(-)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/InnerTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/InnerTable.java
index 42a0db75..26af910e 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/InnerTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/InnerTable.java
@@ -24,6 +24,7 @@ import
org.apache.flink.table.store.table.sink.InnerTableCommit;
import org.apache.flink.table.store.table.sink.InnerTableWrite;
import org.apache.flink.table.store.table.sink.StreamWriteBuilder;
import org.apache.flink.table.store.table.sink.StreamWriteBuilderImpl;
+import org.apache.flink.table.store.table.source.InnerStreamTableScan;
import org.apache.flink.table.store.table.source.InnerTableRead;
import org.apache.flink.table.store.table.source.InnerTableScan;
import org.apache.flink.table.store.table.source.ReadBuilder;
@@ -34,7 +35,7 @@ public interface InnerTable extends Table {
InnerTableScan newScan();
- InnerTableScan newStreamScan();
+ InnerStreamTableScan newStreamScan();
InnerTableRead newRead();
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ReadonlyTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ReadonlyTable.java
index e37ddbeb..ac25a2a7 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ReadonlyTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ReadonlyTable.java
@@ -22,7 +22,7 @@ import
org.apache.flink.table.store.table.sink.BatchWriteBuilder;
import org.apache.flink.table.store.table.sink.InnerTableCommit;
import org.apache.flink.table.store.table.sink.InnerTableWrite;
import org.apache.flink.table.store.table.sink.StreamWriteBuilder;
-import org.apache.flink.table.store.table.source.InnerTableScan;
+import org.apache.flink.table.store.table.source.InnerStreamTableScan;
/** Readonly table which only provide implementation for scan and read. */
public interface ReadonlyTable extends InnerTable {
@@ -60,7 +60,7 @@ public interface ReadonlyTable extends InnerTable {
}
@Override
- default InnerTableScan newStreamScan() {
+ default InnerStreamTableScan newStreamScan() {
throw new UnsupportedOperationException(
String.format(
"Readonly Table %s does not support newStreamScan.",
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/InnerStreamTableScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/InnerStreamTableScan.java
new file mode 100644
index 00000000..8ddeb7ae
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/InnerStreamTableScan.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+/** Streaming {@link InnerTableScan} with {@link StreamTableScan}. */
+public interface InnerStreamTableScan extends InnerTableScan, StreamTableScan
{}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ReadBuilder.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ReadBuilder.java
index a245508b..81f7c31e 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ReadBuilder.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ReadBuilder.java
@@ -119,7 +119,7 @@ public interface ReadBuilder extends Serializable {
TableScan newScan();
/** Create a {@link TableScan} to perform streaming planning. */
- TableScan newStreamScan();
+ StreamTableScan newStreamScan();
/** Create a {@link TableRead} to read {@link Split}s. */
TableRead newRead();
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ReadBuilderImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ReadBuilderImpl.java
index de724113..f62b5d03 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ReadBuilderImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/ReadBuilderImpl.java
@@ -72,8 +72,8 @@ public class ReadBuilderImpl implements ReadBuilder {
}
@Override
- public TableScan newStreamScan() {
- return table.newStreamScan().withFilter(filter);
+ public StreamTableScan newStreamScan() {
+ return (StreamTableScan) table.newStreamScan().withFilter(filter);
}
@Override
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScan.java
index 81599adc..b92ab4ec 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScan.java
@@ -32,7 +32,7 @@ import java.util.HashMap;
import static
org.apache.flink.table.store.CoreOptions.ChangelogProducer.FULL_COMPACTION;
/** {@link DataTableScan} for streaming planning. */
-public interface StreamDataTableScan extends DataTableScan {
+public interface StreamDataTableScan extends DataTableScan,
InnerStreamTableScan {
boolean supportStreamingReadOverwrite();
@@ -40,8 +40,6 @@ public interface StreamDataTableScan extends DataTableScan {
StreamDataTableScan withFollowUpScanner(FollowUpScanner followUpScanner);
- StreamDataTableScan withNextSnapshotId(@Nullable Long nextSnapshotId);
-
StreamDataTableScan withSnapshotStarting();
static void validate(TableSchema schema) {
@@ -79,7 +77,9 @@ public interface StreamDataTableScan extends DataTableScan {
@Override
public StreamDataTableScan create(DataTable dataTable, @Nullable Long
nextSnapshotId) {
- return
dataTable.newStreamScan().withNextSnapshotId(nextSnapshotId);
+ StreamDataTableScan scan = dataTable.newStreamScan();
+ scan.restore(nextSnapshotId);
+ return scan;
}
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScanImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScanImpl.java
index 747a9b1f..de8d2f0b 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScanImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScanImpl.java
@@ -75,12 +75,6 @@ public class StreamDataTableScanImpl extends
AbstractDataTableScan implements St
return this;
}
- @Override
- public StreamDataTableScan withNextSnapshotId(@Nullable Long
nextSnapshotId) {
- this.nextSnapshotId = nextSnapshotId;
- return this;
- }
-
@Override
public StreamDataTableScan withSnapshotStarting() {
startingScanner =
@@ -176,4 +170,15 @@ public class StreamDataTableScanImpl extends
AbstractDataTableScan implements St
}
return followUpScanner;
}
+
+ @Nullable
+ @Override
+ public Long checkpoint() {
+ return nextSnapshotId;
+ }
+
+ @Override
+ public void restore(@Nullable Long nextSnapshotId) {
+ this.nextSnapshotId = nextSnapshotId;
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java
new file mode 100644
index 00000000..98526a1e
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamTableScan.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.source;
+
+import org.apache.flink.table.store.annotation.Experimental;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link TableScan} for streaming, supports {@link #checkpoint} and {@link
#restore}.
+ *
+ * @since 0.4.0
+ */
+@Experimental
+public interface StreamTableScan extends TableScan {
+
+ /** Checkpoint this stream table scan, return next snapshot id. */
+ @Nullable
+ Long checkpoint();
+
+ /** Restore this stream table scan, read incremental from {@code
nextSnapshotId}. */
+ void restore(@Nullable Long nextSnapshotId);
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
index 33b9d9ab..a58b8fc3 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/system/AuditLogTable.java
@@ -301,13 +301,19 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
}
@Override
- public StreamDataTableScan withNextSnapshotId(@Nullable Long
nextSnapshotId) {
- return streamScan.withNextSnapshotId(nextSnapshotId);
+ public StreamDataTableScan withSnapshotStarting() {
+ return streamScan.withSnapshotStarting();
}
+ @Nullable
@Override
- public StreamDataTableScan withSnapshotStarting() {
- return streamScan.withSnapshotStarting();
+ public Long checkpoint() {
+ return streamScan.checkpoint();
+ }
+
+ @Override
+ public void restore(@Nullable Long nextSnapshotId) {
+ streamScan.restore(nextSnapshotId);
}
}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 4b94835d..be234f38 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -421,10 +421,11 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
// partition 2
Collections.singletonList("-D
2|10|301|binary|varbinary")));
- StreamDataTableScan scan = table.newStreamScan();
- scan.withStartingScanner(new FullStartingScanner())
- .withFollowUpScanner(new InputChangelogFollowUpScanner())
- .withNextSnapshotId(1L);
+ StreamDataTableScan scan =
+ table.newStreamScan()
+ .withStartingScanner(new FullStartingScanner())
+ .withFollowUpScanner(new
InputChangelogFollowUpScanner());
+ scan.restore(1L);
Function<Integer, Void> assertNextSnapshot =
i -> {
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java
index d7fdfab9..f9443aea 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/CompactorSourceBuilder.java
@@ -28,6 +28,7 @@ import
org.apache.flink.table.store.connector.LogicalTypeConversion;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateBuilder;
import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.source.StreamDataTableScan;
import
org.apache.flink.table.store.table.source.snapshot.ContinuousCompactorFollowUpScanner;
import
org.apache.flink.table.store.table.source.snapshot.ContinuousCompactorStartingScanner;
import org.apache.flink.table.store.table.source.snapshot.FullStartingScanner;
@@ -96,11 +97,16 @@ public class CompactorSourceBuilder {
null,
partitionPredicate,
null,
- (table, nextSnapshotId) ->
- table.newStreamScan()
- .withStartingScanner(new
ContinuousCompactorStartingScanner())
- .withFollowUpScanner(new
ContinuousCompactorFollowUpScanner())
- .withNextSnapshotId(nextSnapshotId));
+ (table, nextSnapshotId) -> {
+ StreamDataTableScan scan =
+ table.newStreamScan()
+ .withStartingScanner(
+ new
ContinuousCompactorStartingScanner())
+ .withFollowUpScanner(
+ new
ContinuousCompactorFollowUpScanner());
+ scan.restore(nextSnapshotId);
+ return scan;
+ });
} else {
return new StaticFileStoreSource(
bucketsTable,