This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new dfc0d558 [FLINK-31331] Flink 1.16 should implement new LookupFunction
dfc0d558 is described below
commit dfc0d558d2c0d5d299e2da2ffa0819d0c4720919
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 6 17:44:51 2023 +0800
[FLINK-31331] Flink 1.16 should implement new LookupFunction
This closes #583
---
.../flink-table-store-flink-1.14/pom.xml | 66 ++++++
.../lookup/LookupRuntimeProviderFactory.java | 30 +++
.../store/connector/lookup/OldLookupFunction.java | 52 +++++
.../table/store/connector/CatalogITCaseBase.java | 150 +++++++++++++
.../table/store/connector/LookupJoinITCase.java | 76 +++++++
.../src/test/resources/log4j2-test.properties | 28 +++
.../flink-table-store-flink-1.15/pom.xml | 87 ++++++++
.../lookup/LookupRuntimeProviderFactory.java | 30 +++
.../store/connector/lookup/OldLookupFunction.java | 52 +++++
.../table/store/connector/CatalogITCaseBase.java | 150 +++++++++++++
.../table/store/connector/LookupJoinITCase.java | 76 +++++++
.../src/test/resources/log4j2-test.properties | 28 +++
.../connector/lookup/FileStoreLookupFunction.java | 34 +--
.../lookup/LookupRuntimeProviderFactory.java | 30 +++
.../store/connector/lookup/NewLookupFunction.java | 53 +++++
.../store/connector/source/TableStoreSource.java | 4 +-
.../table/store/connector/LookupJoinITCase.java | 233 ++++++++++-----------
17 files changed, 1046 insertions(+), 133 deletions(-)
diff --git a/flink-table-store-flink/flink-table-store-flink-1.14/pom.xml
b/flink-table-store-flink/flink-table-store-flink-1.14/pom.xml
index 838feaa5..8579b836 100644
--- a/flink-table-store-flink/flink-table-store-flink-1.14/pom.xml
+++ b/flink-table-store-flink/flink-table-store-flink-1.14/pom.xml
@@ -33,6 +33,7 @@ under the License.
<properties>
<flink.version>1.14.6</flink.version>
+ <frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
</properties>
<dependencies>
@@ -40,6 +41,12 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-store-flink-common</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -55,6 +62,65 @@ under the License.
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.ververica</groupId>
+ <artifactId>frocksdbjni</artifactId>
+ <version>${frocksdbjni.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/flink-table-store-flink/flink-table-store-flink-1.14/src/main/java/org/apache/flink/table/store/connector/lookup/LookupRuntimeProviderFactory.java
b/flink-table-store-flink/flink-table-store-flink-1.14/src/main/java/org/apache/flink/table/store/connector/lookup/LookupRuntimeProviderFactory.java
new file mode 100644
index 00000000..09862e58
--- /dev/null
+++
b/flink-table-store-flink/flink-table-store-flink-1.14/src/main/java/org/apache/flink/table/store/connector/lookup/LookupRuntimeProviderFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import
org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+
+/** Factory to create {@link LookupRuntimeProvider}. */
+public class LookupRuntimeProviderFactory {
+
+ public static LookupRuntimeProvider create(FileStoreLookupFunction
function) {
+ return TableFunctionProvider.of(new OldLookupFunction(function));
+ }
+}
diff --git
a/flink-table-store-flink/flink-table-store-flink-1.14/src/main/java/org/apache/flink/table/store/connector/lookup/OldLookupFunction.java
b/flink-table-store-flink/flink-table-store-flink-1.14/src/main/java/org/apache/flink/table/store/connector/lookup/OldLookupFunction.java
new file mode 100644
index 00000000..b778ba4f
--- /dev/null
+++
b/flink-table-store-flink/flink-table-store-flink-1.14/src/main/java/org/apache/flink/table/store/connector/lookup/OldLookupFunction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+
+/** Old lookup {@link TableFunction} for 1.15-. */
+public class OldLookupFunction extends TableFunction<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FileStoreLookupFunction function;
+
+ public OldLookupFunction(FileStoreLookupFunction function) {
+ this.function = function;
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ function.open(context);
+ }
+
+ /** Used by code generation. */
+ @SuppressWarnings("unused")
+ public void eval(Object... values) {
+ function.lookup(GenericRowData.of(values)).forEach(this::collect);
+ }
+
+ @Override
+ public void close() throws Exception {
+ function.close();
+ }
+}
diff --git
a/flink-table-store-flink/flink-table-store-flink-1.14/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
b/flink-table-store-flink/flink-table-store-flink-1.14/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
new file mode 100644
index 00000000..6991bee8
--- /dev/null
+++
b/flink-table-store-flink/flink-table-store-flink-1.14/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.fs.Path;
+import org.apache.flink.table.store.fs.local.LocalFileIO;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.junit.Before;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+
+/** ITCase for catalog. */
+public abstract class CatalogITCaseBase extends AbstractTestBase {
+
+ protected TableEnvironment tEnv;
+ protected TableEnvironment sEnv;
+ protected String path;
+
+ @Before
+ public void before() throws IOException {
+ tEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+ String catalog = "TABLE_STORE";
+ path = getTempDirPath("table_store");
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG %s WITH (" + "'type'='table-store',
'warehouse'='%s')",
+ catalog, path));
+ tEnv.useCatalog(catalog);
+
+ sEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+ sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL,
Duration.ofMillis(100));
+ sEnv.registerCatalog(catalog, tEnv.getCatalog(catalog).get());
+ sEnv.useCatalog(catalog);
+
+ setParallelism(defaultParallelism());
+ prepareEnv();
+ }
+
+ private void prepareEnv() {
+ Parser parser = ((TableEnvironmentImpl) tEnv).getParser();
+ for (String ddl : ddl()) {
+ tEnv.executeSql(ddl);
+ List<Operation> operations = parser.parse(ddl);
+ if (operations.size() == 1) {
+ Operation operation = operations.get(0);
+ if (operation instanceof CreateCatalogOperation) {
+ String name = ((CreateCatalogOperation)
operation).getCatalogName();
+ sEnv.registerCatalog(name,
tEnv.getCatalog(name).orElse(null));
+ }
+ }
+ }
+ }
+
+ protected void setParallelism(int parallelism) {
+ tEnv.getConfig()
+ .getConfiguration()
+
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
parallelism);
+ sEnv.getConfig()
+ .getConfiguration()
+
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
parallelism);
+ }
+
+ protected int defaultParallelism() {
+ return 2;
+ }
+
+ protected List<String> ddl() {
+ return Collections.emptyList();
+ }
+
+ protected List<Row> batchSql(String query, Object... args) {
+ return sql(query, args);
+ }
+
+ protected List<Row> sql(String query, Object... args) {
+ try (CloseableIterator<Row> iter =
tEnv.executeSql(String.format(query, args)).collect()) {
+ return ImmutableList.copyOf(iter);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected CloseableIterator<Row> streamSqlIter(String query, Object...
args) {
+ return sEnv.executeSql(String.format(query, args)).collect();
+ }
+
+ protected CatalogTable table(String tableName) throws
TableNotExistException {
+ Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get();
+ CatalogBaseTable table =
+ catalog.getTable(new ObjectPath(catalog.getDefaultDatabase(),
tableName));
+ return (CatalogTable) table;
+ }
+
+ protected Path getTableDirectory(String tableName) {
+ return new Path(
+ new File(path, String.format("%s.db/%s",
tEnv.getCurrentDatabase(), tableName))
+ .toString());
+ }
+
+ @Nullable
+ protected Snapshot findLatestSnapshot(String tableName) {
+ SnapshotManager snapshotManager =
+ new SnapshotManager(LocalFileIO.create(),
getTableDirectory(tableName));
+ Long id = snapshotManager.latestSnapshotId();
+ return id == null ? null : snapshotManager.snapshot(id);
+ }
+}
diff --git
a/flink-table-store-flink/flink-table-store-flink-1.14/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
b/flink-table-store-flink/flink-table-store-flink-1.14/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
new file mode 100644
index 00000000..57ad15bb
--- /dev/null
+++
b/flink-table-store-flink/flink-table-store-flink-1.14/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for lookup join. */
+public class LookupJoinITCase extends CatalogITCaseBase {
+
+ @Override
+ public List<String> ddl() {
+ return Arrays.asList(
+ "CREATE TABLE T (i INT, `proctime` AS PROCTIME())",
+ "CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT, k1
INT, k2 INT) WITH"
+ + " ('continuous.discovery-interval'='1 ms')");
+ }
+
+ @Override
+ protected int defaultParallelism() {
+ return 1;
+ }
+
+ @Test
+ public void testLookup() throws Exception {
+ batchSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222,
2222)");
+
+ String query =
+ "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for
system_time as of T.proctime AS D ON T.i = D.i";
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ batchSql("INSERT INTO T VALUES (1), (2), (3)");
+ List<Row> result = iterator.collect(3);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111, 1111),
+ Row.of(2, 22, 222, 2222),
+ Row.of(3, null, null, null));
+
+ batchSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333,
3333)");
+ Thread.sleep(2000); // wait refresh
+ batchSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+ result = iterator.collect(4);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111, 1111),
+ Row.of(2, 44, 444, 4444),
+ Row.of(3, 33, 333, 3333),
+ Row.of(4, null, null, null));
+
+ iterator.close();
+ }
+}
diff --git
a/flink-table-store-flink/flink-table-store-flink-1.14/src/test/resources/log4j2-test.properties
b/flink-table-store-flink/flink-table-store-flink-1.14/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000..1b3980d1
--- /dev/null
+++
b/flink-table-store-flink/flink-table-store-flink-1.14/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git a/flink-table-store-flink/flink-table-store-flink-1.15/pom.xml
b/flink-table-store-flink/flink-table-store-flink-1.15/pom.xml
index 6a5bd6f5..6369c835 100644
--- a/flink-table-store-flink/flink-table-store-flink-1.15/pom.xml
+++ b/flink-table-store-flink/flink-table-store-flink-1.15/pom.xml
@@ -33,6 +33,7 @@ under the License.
<properties>
<flink.version>1.15.3</flink.version>
+ <frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
</properties>
<dependencies>
@@ -40,6 +41,92 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-store-flink-common</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.ververica</groupId>
+ <artifactId>frocksdbjni</artifactId>
+ <version>${frocksdbjni.version}</version>
+ <scope>test</scope>
</dependency>
</dependencies>
diff --git
a/flink-table-store-flink/flink-table-store-flink-1.15/src/main/java/org/apache/flink/table/store/connector/lookup/LookupRuntimeProviderFactory.java
b/flink-table-store-flink/flink-table-store-flink-1.15/src/main/java/org/apache/flink/table/store/connector/lookup/LookupRuntimeProviderFactory.java
new file mode 100644
index 00000000..09862e58
--- /dev/null
+++
b/flink-table-store-flink/flink-table-store-flink-1.15/src/main/java/org/apache/flink/table/store/connector/lookup/LookupRuntimeProviderFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import
org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+
+/** Factory to create {@link LookupRuntimeProvider}. */
+public class LookupRuntimeProviderFactory {
+
+ public static LookupRuntimeProvider create(FileStoreLookupFunction
function) {
+ return TableFunctionProvider.of(new OldLookupFunction(function));
+ }
+}
diff --git
a/flink-table-store-flink/flink-table-store-flink-1.15/src/main/java/org/apache/flink/table/store/connector/lookup/OldLookupFunction.java
b/flink-table-store-flink/flink-table-store-flink-1.15/src/main/java/org/apache/flink/table/store/connector/lookup/OldLookupFunction.java
new file mode 100644
index 00000000..b778ba4f
--- /dev/null
+++
b/flink-table-store-flink/flink-table-store-flink-1.15/src/main/java/org/apache/flink/table/store/connector/lookup/OldLookupFunction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+
+/** Old lookup {@link TableFunction} for 1.15-. */
+public class OldLookupFunction extends TableFunction<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FileStoreLookupFunction function;
+
+ public OldLookupFunction(FileStoreLookupFunction function) {
+ this.function = function;
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ function.open(context);
+ }
+
+ /** Used by code generation. */
+ @SuppressWarnings("unused")
+ public void eval(Object... values) {
+ function.lookup(GenericRowData.of(values)).forEach(this::collect);
+ }
+
+ @Override
+ public void close() throws Exception {
+ function.close();
+ }
+}
diff --git
a/flink-table-store-flink/flink-table-store-flink-1.15/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
b/flink-table-store-flink/flink-table-store-flink-1.15/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
new file mode 100644
index 00000000..6991bee8
--- /dev/null
+++
b/flink-table-store-flink/flink-table-store-flink-1.15/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.fs.Path;
+import org.apache.flink.table.store.fs.local.LocalFileIO;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.junit.Before;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+
+/** ITCase for catalog. */
+public abstract class CatalogITCaseBase extends AbstractTestBase {
+
+ protected TableEnvironment tEnv;
+ protected TableEnvironment sEnv;
+ protected String path;
+
+ @Before
+ public void before() throws IOException {
+ tEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+ String catalog = "TABLE_STORE";
+ path = getTempDirPath("table_store");
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG %s WITH (" + "'type'='table-store',
'warehouse'='%s')",
+ catalog, path));
+ tEnv.useCatalog(catalog);
+
+ sEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+ sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL,
Duration.ofMillis(100));
+ sEnv.registerCatalog(catalog, tEnv.getCatalog(catalog).get());
+ sEnv.useCatalog(catalog);
+
+ setParallelism(defaultParallelism());
+ prepareEnv();
+ }
+
+ private void prepareEnv() {
+ Parser parser = ((TableEnvironmentImpl) tEnv).getParser();
+ for (String ddl : ddl()) {
+ tEnv.executeSql(ddl);
+ List<Operation> operations = parser.parse(ddl);
+ if (operations.size() == 1) {
+ Operation operation = operations.get(0);
+ if (operation instanceof CreateCatalogOperation) {
+ String name = ((CreateCatalogOperation)
operation).getCatalogName();
+ sEnv.registerCatalog(name,
tEnv.getCatalog(name).orElse(null));
+ }
+ }
+ }
+ }
+
+ protected void setParallelism(int parallelism) {
+ tEnv.getConfig()
+ .getConfiguration()
+
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
parallelism);
+ sEnv.getConfig()
+ .getConfiguration()
+
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
parallelism);
+ }
+
+ protected int defaultParallelism() {
+ return 2;
+ }
+
+ protected List<String> ddl() {
+ return Collections.emptyList();
+ }
+
+ protected List<Row> batchSql(String query, Object... args) {
+ return sql(query, args);
+ }
+
+ protected List<Row> sql(String query, Object... args) {
+ try (CloseableIterator<Row> iter =
tEnv.executeSql(String.format(query, args)).collect()) {
+ return ImmutableList.copyOf(iter);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected CloseableIterator<Row> streamSqlIter(String query, Object...
args) {
+ return sEnv.executeSql(String.format(query, args)).collect();
+ }
+
+ protected CatalogTable table(String tableName) throws
TableNotExistException {
+ Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get();
+ CatalogBaseTable table =
+ catalog.getTable(new ObjectPath(catalog.getDefaultDatabase(),
tableName));
+ return (CatalogTable) table;
+ }
+
+ protected Path getTableDirectory(String tableName) {
+ return new Path(
+ new File(path, String.format("%s.db/%s",
tEnv.getCurrentDatabase(), tableName))
+ .toString());
+ }
+
+ @Nullable
+ protected Snapshot findLatestSnapshot(String tableName) {
+ SnapshotManager snapshotManager =
+ new SnapshotManager(LocalFileIO.create(),
getTableDirectory(tableName));
+ Long id = snapshotManager.latestSnapshotId();
+ return id == null ? null : snapshotManager.snapshot(id);
+ }
+}
diff --git
a/flink-table-store-flink/flink-table-store-flink-1.15/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
b/flink-table-store-flink/flink-table-store-flink-1.15/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
new file mode 100644
index 00000000..57ad15bb
--- /dev/null
+++
b/flink-table-store-flink/flink-table-store-flink-1.15/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for lookup join. */
+public class LookupJoinITCase extends CatalogITCaseBase {
+
+ @Override
+ public List<String> ddl() {
+ return Arrays.asList(
+ "CREATE TABLE T (i INT, `proctime` AS PROCTIME())",
+ "CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT, k1
INT, k2 INT) WITH"
+ + " ('continuous.discovery-interval'='1 ms')");
+ }
+
+ @Override
+ protected int defaultParallelism() {
+ return 1;
+ }
+
+ @Test
+ public void testLookup() throws Exception {
+ batchSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222,
2222)");
+
+ String query =
+ "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for
system_time as of T.proctime AS D ON T.i = D.i";
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ batchSql("INSERT INTO T VALUES (1), (2), (3)");
+ List<Row> result = iterator.collect(3);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111, 1111),
+ Row.of(2, 22, 222, 2222),
+ Row.of(3, null, null, null));
+
+ batchSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333,
3333)");
+ Thread.sleep(2000); // wait refresh
+ batchSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+ result = iterator.collect(4);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111, 1111),
+ Row.of(2, 44, 444, 4444),
+ Row.of(3, 33, 333, 3333),
+ Row.of(4, null, null, null));
+
+ iterator.close();
+ }
+}
diff --git
a/flink-table-store-flink/flink-table-store-flink-1.15/src/test/resources/log4j2-test.properties
b/flink-table-store-flink/flink-table-store-flink-1.15/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000..1b3980d1
--- /dev/null
+++
b/flink-table-store-flink/flink-table-store-flink-1.15/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
index ac77d254..9aafc3a0 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
@@ -19,7 +19,7 @@
package org.apache.flink.table.store.connector.lookup;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.store.CoreOptions;
@@ -44,11 +44,15 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.io.Serializable;
import java.lang.reflect.Field;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
@@ -61,7 +65,9 @@ import static
org.apache.flink.table.store.file.predicate.PredicateBuilder.trans
import static org.apache.flink.table.store.utils.Preconditions.checkArgument;
/** A lookup {@link TableFunction} for file store. */
-public class FileStoreLookupFunction extends
TableFunction<org.apache.flink.table.data.RowData> {
+public class FileStoreLookupFunction implements Serializable, Closeable {
+
+ private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(FileStoreLookupFunction.class);
@@ -113,9 +119,7 @@ public class FileStoreLookupFunction extends
TableFunction<org.apache.flink.tabl
this.predicate = predicate;
}
- @Override
public void open(FunctionContext context) throws Exception {
- super.open(context);
String tmpDirectory = getTmpDirectory(context);
this.path = new File(tmpDirectory, "lookup-" + UUID.randomUUID());
@@ -159,13 +163,17 @@ public class FileStoreLookupFunction extends
TableFunction<org.apache.flink.tabl
TypeUtils.project(table.schema().logicalRowType(),
projection), adjustedPredicate);
}
- /** Used by code generation. */
- @SuppressWarnings("unused")
- public void eval(Object... values) throws Exception {
- checkRefresh();
- List<InternalRow> results = lookupTable.get(new
FlinkRowWrapper(GenericRowData.of(values)));
- for (InternalRow matchedRow : results) {
- collect(new FlinkRowData(matchedRow));
+ public Collection<RowData> lookup(RowData keyRow) {
+ try {
+ checkRefresh();
+ List<InternalRow> results = lookupTable.get(new
FlinkRowWrapper(keyRow));
+ List<RowData> rows = new ArrayList<>(results.size());
+ for (InternalRow matchedRow : results) {
+ rows.add(new FlinkRowData(matchedRow));
+ }
+ return rows;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
@@ -175,8 +183,8 @@ public class FileStoreLookupFunction extends
TableFunction<org.apache.flink.tabl
}
if (nextLoadTime > 0) {
LOG.info(
- "Lookup table has refreshed after {} minute(s),
refreshing",
- refreshInterval.toMinutes());
+ "Lookup table has refreshed after {} second(s),
refreshing",
+ refreshInterval.toMillis() / 1000);
}
refresh();
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/LookupRuntimeProviderFactory.java
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/LookupRuntimeProviderFactory.java
new file mode 100644
index 00000000..ffc9a256
--- /dev/null
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/LookupRuntimeProviderFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import
org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
+import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
+
+/** Factory to create {@link LookupRuntimeProvider}. */
+public class LookupRuntimeProviderFactory {
+
+ public static LookupRuntimeProvider create(FileStoreLookupFunction
function) {
+ return LookupFunctionProvider.of(new NewLookupFunction(function));
+ }
+}
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/NewLookupFunction.java
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/NewLookupFunction.java
new file mode 100644
index 00000000..b5e17a29
--- /dev/null
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/NewLookupFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.LookupFunction;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/** New {@link LookupFunction} for 1.16+, it supports Flink retry join. */
+public class NewLookupFunction extends LookupFunction {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FileStoreLookupFunction function;
+
+ public NewLookupFunction(FileStoreLookupFunction function) {
+ this.function = function;
+ }
+
+ @Override
+ public void open(FunctionContext context) throws Exception {
+ function.open(context);
+ }
+
+ @Override
+ public Collection<RowData> lookup(RowData keyRow) throws IOException {
+ return function.lookup(keyRow);
+ }
+
+ @Override
+ public void close() throws Exception {
+ function.close();
+ }
+}
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index b120cec7..b2b0ae09 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
-import org.apache.flink.table.connector.source.TableFunctionProvider;
import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
@@ -33,6 +32,7 @@ import
org.apache.flink.table.store.CoreOptions.LogConsistency;
import org.apache.flink.table.store.connector.FlinkConnectorOptions;
import org.apache.flink.table.store.connector.TableStoreDataStreamScanProvider;
import org.apache.flink.table.store.connector.lookup.FileStoreLookupFunction;
+import
org.apache.flink.table.store.connector.lookup.LookupRuntimeProviderFactory;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.log.LogSourceProvider;
import org.apache.flink.table.store.log.LogStoreTableFactory;
@@ -206,7 +206,7 @@ public class TableStoreSource extends FlinkTableSource
? IntStream.range(0,
table.schema().fields().size()).toArray()
: Projection.of(projectFields).toTopLevelIndexes();
int[] joinKey = Projection.of(context.getKeys()).toTopLevelIndexes();
- return TableFunctionProvider.of(
+ return LookupRuntimeProviderFactory.create(
new FileStoreLookupFunction(table, projection, joinKey,
predicate));
}
}
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
index 44f5e1ee..76023c8f 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
@@ -18,55 +18,40 @@
package org.apache.flink.table.store.connector;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.store.connector.util.AbstractTestBase;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.types.Row;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.time.Duration;
+import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.ExecutionException;
-import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for lookup join. */
-public class LookupJoinITCase extends AbstractTestBase {
-
- private TableEnvironment env;
-
- @BeforeEach
- public void before() throws Exception {
- env =
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
- env.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL,
Duration.ofMillis(100));
- env.getConfig()
- .getConfiguration()
-
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
- String path = getTempDirPath();
- env.executeSql(
- String.format(
- "CREATE CATALOG my_catalog WITH ('type'='table-store',
'warehouse'='%s')",
- path));
- executeSql("USE CATALOG my_catalog");
- executeSql("CREATE TABLE T (i INT, `proctime` AS PROCTIME())");
- executeSql(
+public class LookupJoinITCase extends CatalogITCaseBase {
+
+ @Override
+ public List<String> ddl() {
+ return Arrays.asList(
+ "CREATE TABLE T (i INT, `proctime` AS PROCTIME())",
"CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT, k1
INT, k2 INT) WITH"
+ " ('continuous.discovery-interval'='1 ms')");
}
+ @Override
+ protected int defaultParallelism() {
+ return 1;
+ }
+
@Test
public void testLookupEmptyTable() throws Exception {
String query =
"SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for
system_time as of T.proctime AS D ON T.i = D.i";
- BlockingIterator<Row, Row> iterator =
BlockingIterator.of(env.executeSql(query).collect());
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
- executeSql("INSERT INTO T VALUES (1), (2), (3)");
+ sql("INSERT INTO T VALUES (1), (2), (3)");
List<Row> result = iterator.collect(3);
assertThat(result)
@@ -75,9 +60,9 @@ public class LookupJoinITCase extends AbstractTestBase {
Row.of(2, null, null, null),
Row.of(3, null, null, null));
- executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222,
2222)");
+ sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
Thread.sleep(2000); // wait refresh
- executeSql("INSERT INTO T VALUES (1), (2), (4)");
+ sql("INSERT INTO T VALUES (1), (2), (4)");
result = iterator.collect(3);
assertThat(result)
.containsExactlyInAnyOrder(
@@ -89,13 +74,13 @@ public class LookupJoinITCase extends AbstractTestBase {
@Test
public void testLookup() throws Exception {
- executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222,
2222)");
+ sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
String query =
"SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for
system_time as of T.proctime AS D ON T.i = D.i";
- BlockingIterator<Row, Row> iterator =
BlockingIterator.of(env.executeSql(query).collect());
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
- executeSql("INSERT INTO T VALUES (1), (2), (3)");
+ sql("INSERT INTO T VALUES (1), (2), (3)");
List<Row> result = iterator.collect(3);
assertThat(result)
.containsExactlyInAnyOrder(
@@ -103,9 +88,9 @@ public class LookupJoinITCase extends AbstractTestBase {
Row.of(2, 22, 222, 2222),
Row.of(3, null, null, null));
- executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333,
3333)");
+ sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
Thread.sleep(2000); // wait refresh
- executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+ sql("INSERT INTO T VALUES (1), (2), (3), (4)");
result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
@@ -119,13 +104,13 @@ public class LookupJoinITCase extends AbstractTestBase {
@Test
public void testLookupWithLatest() throws Exception {
- executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222,
2222)");
+ sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
String query =
"SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+
OPTIONS('scan.mode'='latest') */"
+ " for system_time as of T.proctime AS D ON T.i =
D.i";
- BlockingIterator<Row, Row> iterator =
BlockingIterator.of(env.executeSql(query).collect());
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
- executeSql("INSERT INTO T VALUES (1), (2), (3)");
+ sql("INSERT INTO T VALUES (1), (2), (3)");
List<Row> result = iterator.collect(3);
assertThat(result)
.containsExactlyInAnyOrder(
@@ -133,9 +118,9 @@ public class LookupJoinITCase extends AbstractTestBase {
Row.of(2, 22, 222, 2222),
Row.of(3, null, null, null));
- executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333,
3333)");
+ sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
Thread.sleep(2000); // wait refresh
- executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+ sql("INSERT INTO T VALUES (1), (2), (3), (4)");
result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
@@ -149,21 +134,21 @@ public class LookupJoinITCase extends AbstractTestBase {
@Test
public void testLookupProjection() throws Exception {
- executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222,
2222)");
+ sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
String query =
"SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as
of T.proctime AS D ON T.i = D.i";
- BlockingIterator<Row, Row> iterator =
BlockingIterator.of(env.executeSql(query).collect());
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
- executeSql("INSERT INTO T VALUES (1), (2), (3)");
+ sql("INSERT INTO T VALUES (1), (2), (3)");
List<Row> result = iterator.collect(3);
assertThat(result)
.containsExactlyInAnyOrder(
Row.of(1, 11, 111), Row.of(2, 22, 222), Row.of(3,
null, null));
- executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333,
3333)");
+ sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
Thread.sleep(2000); // wait refresh
- executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+ sql("INSERT INTO T VALUES (1), (2), (3), (4)");
result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
@@ -177,21 +162,21 @@ public class LookupJoinITCase extends AbstractTestBase {
@Test
public void testLookupFilterPk() throws Exception {
- executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222,
2222)");
+ sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
String query =
"SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as
of T.proctime AS D ON T.i = D.i AND D.i > 2";
- BlockingIterator<Row, Row> iterator =
BlockingIterator.of(env.executeSql(query).collect());
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
- executeSql("INSERT INTO T VALUES (1), (2), (3)");
+ sql("INSERT INTO T VALUES (1), (2), (3)");
List<Row> result = iterator.collect(3);
assertThat(result)
.containsExactlyInAnyOrder(
Row.of(1, null, null), Row.of(2, null, null),
Row.of(3, null, null));
- executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333,
3333)");
+ sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
Thread.sleep(2000); // wait refresh
- executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+ sql("INSERT INTO T VALUES (1), (2), (3), (4)");
result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
@@ -205,21 +190,21 @@ public class LookupJoinITCase extends AbstractTestBase {
@Test
public void testLookupFilterSelect() throws Exception {
- executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222,
2222)");
+ sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
String query =
"SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as
of T.proctime AS D ON T.i = D.i AND D.k1 > 111";
- BlockingIterator<Row, Row> iterator =
BlockingIterator.of(env.executeSql(query).collect());
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
- executeSql("INSERT INTO T VALUES (1), (2), (3)");
+ sql("INSERT INTO T VALUES (1), (2), (3)");
List<Row> result = iterator.collect(3);
assertThat(result)
.containsExactlyInAnyOrder(
Row.of(1, null, null), Row.of(2, 22, 222), Row.of(3,
null, null));
- executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333,
3333)");
+ sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
Thread.sleep(2000); // wait refresh
- executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+ sql("INSERT INTO T VALUES (1), (2), (3), (4)");
result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
@@ -233,21 +218,21 @@ public class LookupJoinITCase extends AbstractTestBase {
@Test
public void testLookupFilterUnSelect() throws Exception {
- executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222,
2222)");
+ sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
String query =
"SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as
of T.proctime AS D ON T.i = D.i AND D.k2 > 1111";
- BlockingIterator<Row, Row> iterator =
BlockingIterator.of(env.executeSql(query).collect());
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
- executeSql("INSERT INTO T VALUES (1), (2), (3)");
+ sql("INSERT INTO T VALUES (1), (2), (3)");
List<Row> result = iterator.collect(3);
assertThat(result)
.containsExactlyInAnyOrder(
Row.of(1, null, null), Row.of(2, 22, 222), Row.of(3,
null, null));
- executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333,
3333)");
+ sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
Thread.sleep(2000); // wait refresh
- executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+ sql("INSERT INTO T VALUES (1), (2), (3), (4)");
result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
@@ -261,21 +246,21 @@ public class LookupJoinITCase extends AbstractTestBase {
@Test
public void testLookupFilterUnSelectAndUpdate() throws Exception {
- executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222,
2222)");
+ sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
String query =
"SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as
of T.proctime AS D ON T.i = D.i AND D.k2 < 4444";
- BlockingIterator<Row, Row> iterator =
BlockingIterator.of(env.executeSql(query).collect());
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
- executeSql("INSERT INTO T VALUES (1), (2), (3)");
+ sql("INSERT INTO T VALUES (1), (2), (3)");
List<Row> result = iterator.collect(3);
assertThat(result)
.containsExactlyInAnyOrder(
Row.of(1, 11, 111), Row.of(2, 22, 222), Row.of(3,
null, null));
- executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333,
3333)");
+ sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
Thread.sleep(2000); // wait refresh
- executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+ sql("INSERT INTO T VALUES (1), (2), (3), (4)");
result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
@@ -289,14 +274,13 @@ public class LookupJoinITCase extends AbstractTestBase {
@Test
public void testNonPkLookup() throws Exception {
- executeSql(
- "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222,
2222), (3, 22, 333, 3333)");
+ sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222),
(3, 22, 333, 3333)");
String query =
"SELECT D.i, T.i, D.k1, D.k2 FROM T LEFT JOIN DIM for
system_time as of T.proctime AS D ON T.i = D.j";
- BlockingIterator<Row, Row> iterator =
BlockingIterator.of(env.executeSql(query).collect());
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
- executeSql("INSERT INTO T VALUES (11), (22), (33)");
+ sql("INSERT INTO T VALUES (11), (22), (33)");
List<Row> result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
@@ -305,9 +289,9 @@ public class LookupJoinITCase extends AbstractTestBase {
Row.of(3, 22, 333, 3333),
Row.of(null, 33, null, null));
- executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333,
3333)");
+ sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
Thread.sleep(2000); // wait refresh
- executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+ sql("INSERT INTO T VALUES (11), (22), (33), (44)");
result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
@@ -321,22 +305,21 @@ public class LookupJoinITCase extends AbstractTestBase {
@Test
public void testNonPkLookupProjection() throws Exception {
- executeSql(
- "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222,
2222), (3, 22, 333, 3333)");
+ sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222),
(3, 22, 333, 3333)");
String query =
"SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of
T.proctime AS D ON T.i = D.j";
- BlockingIterator<Row, Row> iterator =
BlockingIterator.of(env.executeSql(query).collect());
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
- executeSql("INSERT INTO T VALUES (11), (22), (33)");
+ sql("INSERT INTO T VALUES (11), (22), (33)");
List<Row> result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
Row.of(11, 111), Row.of(22, 222), Row.of(22, 333),
Row.of(33, null));
- executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333,
3333)");
+ sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
Thread.sleep(2000); // wait refresh
- executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+ sql("INSERT INTO T VALUES (11), (22), (33), (44)");
result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
@@ -347,21 +330,20 @@ public class LookupJoinITCase extends AbstractTestBase {
@Test
public void testNonPkLookupFilterPk() throws Exception {
- executeSql(
- "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222,
2222), (3, 22, 333, 3333)");
+ sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222),
(3, 22, 333, 3333)");
String query =
"SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of
T.proctime AS D ON T.i = D.j AND D.i > 2";
- BlockingIterator<Row, Row> iterator =
BlockingIterator.of(env.executeSql(query).collect());
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
- executeSql("INSERT INTO T VALUES (11), (22), (33)");
+ sql("INSERT INTO T VALUES (11), (22), (33)");
List<Row> result = iterator.collect(3);
assertThat(result)
.containsExactlyInAnyOrder(Row.of(11, null), Row.of(22, 333),
Row.of(33, null));
- executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333,
3333)");
+ sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
Thread.sleep(2000); // wait refresh
- executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+ sql("INSERT INTO T VALUES (11), (22), (33), (44)");
result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
@@ -372,22 +354,21 @@ public class LookupJoinITCase extends AbstractTestBase {
@Test
public void testNonPkLookupFilterSelect() throws Exception {
- executeSql(
- "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222,
2222), (3, 22, 333, 3333)");
+ sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222),
(3, 22, 333, 3333)");
String query =
"SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of
T.proctime AS D ON T.i = D.j AND D.k1 > 111";
- BlockingIterator<Row, Row> iterator =
BlockingIterator.of(env.executeSql(query).collect());
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
- executeSql("INSERT INTO T VALUES (11), (22), (33)");
+ sql("INSERT INTO T VALUES (11), (22), (33)");
List<Row> result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
Row.of(11, null), Row.of(22, 222), Row.of(22, 333),
Row.of(33, null));
- executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333,
3333)");
+ sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
Thread.sleep(2000); // wait refresh
- executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+ sql("INSERT INTO T VALUES (11), (22), (33), (44)");
result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
@@ -398,22 +379,21 @@ public class LookupJoinITCase extends AbstractTestBase {
@Test
public void testNonPkLookupFilterUnSelect() throws Exception {
- executeSql(
- "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222,
2222), (3, 22, 333, 3333)");
+ sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222),
(3, 22, 333, 3333)");
String query =
"SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of
T.proctime AS D ON T.i = D.j AND D.k2 > 1111";
- BlockingIterator<Row, Row> iterator =
BlockingIterator.of(env.executeSql(query).collect());
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
- executeSql("INSERT INTO T VALUES (11), (22), (33)");
+ sql("INSERT INTO T VALUES (11), (22), (33)");
List<Row> result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
Row.of(11, null), Row.of(22, 222), Row.of(22, 333),
Row.of(33, null));
- executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333,
3333)");
+ sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
Thread.sleep(2000); // wait refresh
- executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+ sql("INSERT INTO T VALUES (11), (22), (33), (44)");
result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
@@ -424,22 +404,21 @@ public class LookupJoinITCase extends AbstractTestBase {
@Test
public void testNonPkLookupFilterUnSelectAndUpdate() throws Exception {
- executeSql(
- "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222,
2222), (3, 22, 333, 3333)");
+ sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222),
(3, 22, 333, 3333)");
String query =
"SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of
T.proctime AS D ON T.i = D.j AND D.k2 < 4444";
- BlockingIterator<Row, Row> iterator =
BlockingIterator.of(env.executeSql(query).collect());
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
- executeSql("INSERT INTO T VALUES (11), (22), (33)");
+ sql("INSERT INTO T VALUES (11), (22), (33)");
List<Row> result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
Row.of(11, 111), Row.of(22, 222), Row.of(22, 333),
Row.of(33, null));
- executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333,
3333)");
+ sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
Thread.sleep(2000); // wait refresh
- executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+ sql("INSERT INTO T VALUES (11), (22), (33), (44)");
result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
@@ -450,22 +429,22 @@ public class LookupJoinITCase extends AbstractTestBase {
@Test
public void testRepeatRefresh() throws Exception {
- executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222,
2222)");
+ sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
String query =
"SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as
of T.proctime AS D ON T.i = D.i";
- BlockingIterator<Row, Row> iterator =
BlockingIterator.of(env.executeSql(query).collect());
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
- executeSql("INSERT INTO T VALUES (1), (2), (3)");
+ sql("INSERT INTO T VALUES (1), (2), (3)");
List<Row> result = iterator.collect(3);
assertThat(result)
.containsExactlyInAnyOrder(
Row.of(1, 11, 111), Row.of(2, 22, 222), Row.of(3,
null, null));
- executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444)");
- executeSql("INSERT INTO DIM VALUES (3, 33, 333, 3333)");
+ sql("INSERT INTO DIM VALUES (2, 44, 444, 4444)");
+ sql("INSERT INTO DIM VALUES (3, 33, 333, 3333)");
Thread.sleep(2000); // wait refresh
- executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+ sql("INSERT INTO T VALUES (1), (2), (3), (4)");
result = iterator.collect(4);
assertThat(result)
.containsExactlyInAnyOrder(
@@ -479,12 +458,12 @@ public class LookupJoinITCase extends AbstractTestBase {
@Test
public void testLookupPartialUpdateIllegal() throws Exception {
- executeSql(
+ sql(
"CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1
INT, k2 INT) WITH"
+ "
('merge-engine'='partial-update','continuous.discovery-interval'='1 ms')");
String query =
"SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM2 for
system_time as of T.proctime AS D ON T.i = D.i";
- assertThatThrownBy(() -> env.executeSql(query))
+ assertThatThrownBy(() -> sEnv.executeSql(query))
.hasRootCauseMessage(
"Partial update continuous reading is not supported. "
+ "You can use full compaction changelog
producer to support streaming reading.");
@@ -492,28 +471,46 @@ public class LookupJoinITCase extends AbstractTestBase {
@Test
public void testLookupPartialUpdate() throws Exception {
- executeSql(
+ sql(
"CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1
INT, k2 INT) WITH"
+ " ('merge-engine'='partial-update',"
+ " 'changelog-producer'='full-compaction',"
+ " 'changelog-producer.compaction-interval'='1 s',"
+ " 'continuous.discovery-interval'='10 ms')");
- executeSql("INSERT INTO DIM2 VALUES (1, CAST(NULL AS INT), 111,
CAST(NULL AS INT))");
+ sql("INSERT INTO DIM2 VALUES (1, CAST(NULL AS INT), 111, CAST(NULL AS
INT))");
String query =
"SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM2 for
system_time as of T.proctime AS D ON T.i = D.i";
- BlockingIterator<Row, Row> iterator =
BlockingIterator.of(env.executeSql(query).collect());
- executeSql("INSERT INTO T VALUES (1)");
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
+ sql("INSERT INTO T VALUES (1)");
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(1,
null, 111, null));
- executeSql("INSERT INTO DIM2 VALUES (1, 11, CAST(NULL AS INT), 1111)");
+ sql("INSERT INTO DIM2 VALUES (1, 11, CAST(NULL AS INT), 1111)");
Thread.sleep(2000); // wait refresh
- executeSql("INSERT INTO T VALUES (1)");
+ sql("INSERT INTO T VALUES (1)");
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(1,
11, 111, 1111));
iterator.close();
}
- private void executeSql(String sql) throws ExecutionException,
InterruptedException {
- env.executeSql(sql).await();
+ @Test
+ public void testRetryLookup() throws Exception {
+ sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
+
+ String query =
+ "SELECT /*+ LOOKUP('table'='DIM',
'retry-predicate'='lookup_miss',"
+ + " 'retry-strategy'='fixed_delay',
'fixed-delay'='1s','max-attempts'='60') */"
+ + " T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for
system_time as of T.proctime AS D ON T.i = D.i";
+ BlockingIterator<Row, Row> iterator =
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+ sql("INSERT INTO T VALUES (1), (2), (3)");
+ Thread.sleep(2000); // wait
+ sql("INSERT INTO DIM VALUES (3, 33, 333, 3333)");
+ assertThat(iterator.collect(3))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 11, 111, 1111),
+ Row.of(2, 22, 222, 2222),
+ Row.of(3, 33, 333, 3333));
+
+ iterator.close();
}
}