This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new dfc0d558 [FLINK-31331] Flink 1.16 should implement new LookupFunction
dfc0d558 is described below

commit dfc0d558d2c0d5d299e2da2ffa0819d0c4720919
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Mar 6 17:44:51 2023 +0800

    [FLINK-31331] Flink 1.16 should implement new LookupFunction
    
    This closes #583
---
 .../flink-table-store-flink-1.14/pom.xml           |  66 ++++++
 .../lookup/LookupRuntimeProviderFactory.java       |  30 +++
 .../store/connector/lookup/OldLookupFunction.java  |  52 +++++
 .../table/store/connector/CatalogITCaseBase.java   | 150 +++++++++++++
 .../table/store/connector/LookupJoinITCase.java    |  76 +++++++
 .../src/test/resources/log4j2-test.properties      |  28 +++
 .../flink-table-store-flink-1.15/pom.xml           |  87 ++++++++
 .../lookup/LookupRuntimeProviderFactory.java       |  30 +++
 .../store/connector/lookup/OldLookupFunction.java  |  52 +++++
 .../table/store/connector/CatalogITCaseBase.java   | 150 +++++++++++++
 .../table/store/connector/LookupJoinITCase.java    |  76 +++++++
 .../src/test/resources/log4j2-test.properties      |  28 +++
 .../connector/lookup/FileStoreLookupFunction.java  |  34 +--
 .../lookup/LookupRuntimeProviderFactory.java       |  30 +++
 .../store/connector/lookup/NewLookupFunction.java  |  53 +++++
 .../store/connector/source/TableStoreSource.java   |   4 +-
 .../table/store/connector/LookupJoinITCase.java    | 233 ++++++++++-----------
 17 files changed, 1046 insertions(+), 133 deletions(-)

diff --git a/flink-table-store-flink/flink-table-store-flink-1.14/pom.xml 
b/flink-table-store-flink/flink-table-store-flink-1.14/pom.xml
index 838feaa5..8579b836 100644
--- a/flink-table-store-flink/flink-table-store-flink-1.14/pom.xml
+++ b/flink-table-store-flink/flink-table-store-flink-1.14/pom.xml
@@ -33,6 +33,7 @@ under the License.
 
     <properties>
         <flink.version>1.14.6</flink.version>
+        <frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
     </properties>
 
     <dependencies>
@@ -40,6 +41,12 @@ under the License.
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-store-flink-common</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
@@ -55,6 +62,65 @@ under the License.
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
+
+        <!-- test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs-client</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.curator</groupId>
+                    <artifactId>curator-test</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-yarn-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.codehaus.jackson</groupId>
+                    <artifactId>jackson-core-asl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>frocksdbjni</artifactId>
+            <version>${frocksdbjni.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/flink-table-store-flink/flink-table-store-flink-1.14/src/main/java/org/apache/flink/table/store/connector/lookup/LookupRuntimeProviderFactory.java
 
b/flink-table-store-flink/flink-table-store-flink-1.14/src/main/java/org/apache/flink/table/store/connector/lookup/LookupRuntimeProviderFactory.java
new file mode 100644
index 00000000..09862e58
--- /dev/null
+++ 
b/flink-table-store-flink/flink-table-store-flink-1.14/src/main/java/org/apache/flink/table/store/connector/lookup/LookupRuntimeProviderFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import 
org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+
+/** Factory to create {@link LookupRuntimeProvider}. */
+public class LookupRuntimeProviderFactory {
+
+    public static LookupRuntimeProvider create(FileStoreLookupFunction 
function) {
+        return TableFunctionProvider.of(new OldLookupFunction(function));
+    }
+}
diff --git 
a/flink-table-store-flink/flink-table-store-flink-1.14/src/main/java/org/apache/flink/table/store/connector/lookup/OldLookupFunction.java
 
b/flink-table-store-flink/flink-table-store-flink-1.14/src/main/java/org/apache/flink/table/store/connector/lookup/OldLookupFunction.java
new file mode 100644
index 00000000..b778ba4f
--- /dev/null
+++ 
b/flink-table-store-flink/flink-table-store-flink-1.14/src/main/java/org/apache/flink/table/store/connector/lookup/OldLookupFunction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+
+/** Old lookup {@link TableFunction} for 1.15-. */
+public class OldLookupFunction extends TableFunction<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FileStoreLookupFunction function;
+
+    public OldLookupFunction(FileStoreLookupFunction function) {
+        this.function = function;
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        function.open(context);
+    }
+
+    /** Used by code generation. */
+    @SuppressWarnings("unused")
+    public void eval(Object... values) {
+        function.lookup(GenericRowData.of(values)).forEach(this::collect);
+    }
+
+    @Override
+    public void close() throws Exception {
+        function.close();
+    }
+}
diff --git 
a/flink-table-store-flink/flink-table-store-flink-1.14/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
 
b/flink-table-store-flink/flink-table-store-flink-1.14/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
new file mode 100644
index 00000000..6991bee8
--- /dev/null
+++ 
b/flink-table-store-flink/flink-table-store-flink-1.14/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.fs.Path;
+import org.apache.flink.table.store.fs.local.LocalFileIO;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.junit.Before;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+
+/** ITCase for catalog. */
+public abstract class CatalogITCaseBase extends AbstractTestBase {
+
+    protected TableEnvironment tEnv;
+    protected TableEnvironment sEnv;
+    protected String path;
+
+    @Before
+    public void before() throws IOException {
+        tEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+        String catalog = "TABLE_STORE";
+        path = getTempDirPath("table_store");
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG %s WITH (" + "'type'='table-store', 
'warehouse'='%s')",
+                        catalog, path));
+        tEnv.useCatalog(catalog);
+
+        sEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+        sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, 
Duration.ofMillis(100));
+        sEnv.registerCatalog(catalog, tEnv.getCatalog(catalog).get());
+        sEnv.useCatalog(catalog);
+
+        setParallelism(defaultParallelism());
+        prepareEnv();
+    }
+
+    private void prepareEnv() {
+        Parser parser = ((TableEnvironmentImpl) tEnv).getParser();
+        for (String ddl : ddl()) {
+            tEnv.executeSql(ddl);
+            List<Operation> operations = parser.parse(ddl);
+            if (operations.size() == 1) {
+                Operation operation = operations.get(0);
+                if (operation instanceof CreateCatalogOperation) {
+                    String name = ((CreateCatalogOperation) 
operation).getCatalogName();
+                    sEnv.registerCatalog(name, 
tEnv.getCatalog(name).orElse(null));
+                }
+            }
+        }
+    }
+
+    protected void setParallelism(int parallelism) {
+        tEnv.getConfig()
+                .getConfiguration()
+                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
parallelism);
+        sEnv.getConfig()
+                .getConfiguration()
+                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
parallelism);
+    }
+
+    protected int defaultParallelism() {
+        return 2;
+    }
+
+    protected List<String> ddl() {
+        return Collections.emptyList();
+    }
+
+    protected List<Row> batchSql(String query, Object... args) {
+        return sql(query, args);
+    }
+
+    protected List<Row> sql(String query, Object... args) {
+        try (CloseableIterator<Row> iter = 
tEnv.executeSql(String.format(query, args)).collect()) {
+            return ImmutableList.copyOf(iter);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected CloseableIterator<Row> streamSqlIter(String query, Object... 
args) {
+        return sEnv.executeSql(String.format(query, args)).collect();
+    }
+
+    protected CatalogTable table(String tableName) throws 
TableNotExistException {
+        Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get();
+        CatalogBaseTable table =
+                catalog.getTable(new ObjectPath(catalog.getDefaultDatabase(), 
tableName));
+        return (CatalogTable) table;
+    }
+
+    protected Path getTableDirectory(String tableName) {
+        return new Path(
+                new File(path, String.format("%s.db/%s", 
tEnv.getCurrentDatabase(), tableName))
+                        .toString());
+    }
+
+    @Nullable
+    protected Snapshot findLatestSnapshot(String tableName) {
+        SnapshotManager snapshotManager =
+                new SnapshotManager(LocalFileIO.create(), 
getTableDirectory(tableName));
+        Long id = snapshotManager.latestSnapshotId();
+        return id == null ? null : snapshotManager.snapshot(id);
+    }
+}
diff --git 
a/flink-table-store-flink/flink-table-store-flink-1.14/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
 
b/flink-table-store-flink/flink-table-store-flink-1.14/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
new file mode 100644
index 00000000..57ad15bb
--- /dev/null
+++ 
b/flink-table-store-flink/flink-table-store-flink-1.14/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for lookup join. */
+public class LookupJoinITCase extends CatalogITCaseBase {
+
+    @Override
+    public List<String> ddl() {
+        return Arrays.asList(
+                "CREATE TABLE T (i INT, `proctime` AS PROCTIME())",
+                "CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 
INT, k2 INT) WITH"
+                        + " ('continuous.discovery-interval'='1 ms')");
+    }
+
+    @Override
+    protected int defaultParallelism() {
+        return 1;
+    }
+
+    @Test
+    public void testLookup() throws Exception {
+        batchSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222)");
+
+        String query =
+                "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for 
system_time as of T.proctime AS D ON T.i = D.i";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+        batchSql("INSERT INTO T VALUES (1), (2), (3)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(2, 22, 222, 2222),
+                        Row.of(3, null, null, null));
+
+        batchSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        Thread.sleep(2000); // wait refresh
+        batchSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(2, 44, 444, 4444),
+                        Row.of(3, 33, 333, 3333),
+                        Row.of(4, null, null, null));
+
+        iterator.close();
+    }
+}
diff --git 
a/flink-table-store-flink/flink-table-store-flink-1.14/src/test/resources/log4j2-test.properties
 
b/flink-table-store-flink/flink-table-store-flink-1.14/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000..1b3980d1
--- /dev/null
+++ 
b/flink-table-store-flink/flink-table-store-flink-1.14/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git a/flink-table-store-flink/flink-table-store-flink-1.15/pom.xml 
b/flink-table-store-flink/flink-table-store-flink-1.15/pom.xml
index 6a5bd6f5..6369c835 100644
--- a/flink-table-store-flink/flink-table-store-flink-1.15/pom.xml
+++ b/flink-table-store-flink/flink-table-store-flink-1.15/pom.xml
@@ -33,6 +33,7 @@ under the License.
 
     <properties>
         <flink.version>1.15.3</flink.version>
+        <frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
     </properties>
 
     <dependencies>
@@ -40,6 +41,92 @@ under the License.
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-store-flink-common</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-base</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-files</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs-client</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.curator</groupId>
+                    <artifactId>curator-test</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-yarn-common</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.avro</groupId>
+                    <artifactId>avro</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.codehaus.jackson</groupId>
+                    <artifactId>jackson-core-asl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>com.ververica</groupId>
+            <artifactId>frocksdbjni</artifactId>
+            <version>${frocksdbjni.version}</version>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 
diff --git 
a/flink-table-store-flink/flink-table-store-flink-1.15/src/main/java/org/apache/flink/table/store/connector/lookup/LookupRuntimeProviderFactory.java
 
b/flink-table-store-flink/flink-table-store-flink-1.15/src/main/java/org/apache/flink/table/store/connector/lookup/LookupRuntimeProviderFactory.java
new file mode 100644
index 00000000..09862e58
--- /dev/null
+++ 
b/flink-table-store-flink/flink-table-store-flink-1.15/src/main/java/org/apache/flink/table/store/connector/lookup/LookupRuntimeProviderFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import 
org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
+import org.apache.flink.table.connector.source.TableFunctionProvider;
+
+/** Factory to create {@link LookupRuntimeProvider}. */
+public class LookupRuntimeProviderFactory {
+
+    public static LookupRuntimeProvider create(FileStoreLookupFunction 
function) {
+        return TableFunctionProvider.of(new OldLookupFunction(function));
+    }
+}
diff --git 
a/flink-table-store-flink/flink-table-store-flink-1.15/src/main/java/org/apache/flink/table/store/connector/lookup/OldLookupFunction.java
 
b/flink-table-store-flink/flink-table-store-flink-1.15/src/main/java/org/apache/flink/table/store/connector/lookup/OldLookupFunction.java
new file mode 100644
index 00000000..b778ba4f
--- /dev/null
+++ 
b/flink-table-store-flink/flink-table-store-flink-1.15/src/main/java/org/apache/flink/table/store/connector/lookup/OldLookupFunction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.TableFunction;
+
+/** Old lookup {@link TableFunction} for 1.15-. */
+public class OldLookupFunction extends TableFunction<RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FileStoreLookupFunction function;
+
+    public OldLookupFunction(FileStoreLookupFunction function) {
+        this.function = function;
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        function.open(context);
+    }
+
+    /** Used by code generation. */
+    @SuppressWarnings("unused")
+    public void eval(Object... values) {
+        function.lookup(GenericRowData.of(values)).forEach(this::collect);
+    }
+
+    @Override
+    public void close() throws Exception {
+        function.close();
+    }
+}
diff --git 
a/flink-table-store-flink/flink-table-store-flink-1.15/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
 
b/flink-table-store-flink/flink-table-store-flink-1.15/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
new file mode 100644
index 00000000..6991bee8
--- /dev/null
+++ 
b/flink-table-store-flink/flink-table-store-flink-1.15/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
+import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.fs.Path;
+import org.apache.flink.table.store.fs.local.LocalFileIO;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.junit.Before;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+
+/** ITCase for catalog. */
+public abstract class CatalogITCaseBase extends AbstractTestBase {
+
+    protected TableEnvironment tEnv;
+    protected TableEnvironment sEnv;
+    protected String path;
+
+    @Before
+    public void before() throws IOException {
+        tEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+        String catalog = "TABLE_STORE";
+        path = getTempDirPath("table_store");
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG %s WITH (" + "'type'='table-store', 
'warehouse'='%s')",
+                        catalog, path));
+        tEnv.useCatalog(catalog);
+
+        sEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+        sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, 
Duration.ofMillis(100));
+        sEnv.registerCatalog(catalog, tEnv.getCatalog(catalog).get());
+        sEnv.useCatalog(catalog);
+
+        setParallelism(defaultParallelism());
+        prepareEnv();
+    }
+
+    private void prepareEnv() {
+        Parser parser = ((TableEnvironmentImpl) tEnv).getParser();
+        for (String ddl : ddl()) {
+            tEnv.executeSql(ddl);
+            List<Operation> operations = parser.parse(ddl);
+            if (operations.size() == 1) {
+                Operation operation = operations.get(0);
+                if (operation instanceof CreateCatalogOperation) {
+                    String name = ((CreateCatalogOperation) 
operation).getCatalogName();
+                    sEnv.registerCatalog(name, 
tEnv.getCatalog(name).orElse(null));
+                }
+            }
+        }
+    }
+
+    protected void setParallelism(int parallelism) {
+        tEnv.getConfig()
+                .getConfiguration()
+                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
parallelism);
+        sEnv.getConfig()
+                .getConfiguration()
+                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
parallelism);
+    }
+
+    protected int defaultParallelism() {
+        return 2;
+    }
+
+    protected List<String> ddl() {
+        return Collections.emptyList();
+    }
+
+    protected List<Row> batchSql(String query, Object... args) {
+        return sql(query, args);
+    }
+
+    protected List<Row> sql(String query, Object... args) {
+        try (CloseableIterator<Row> iter = 
tEnv.executeSql(String.format(query, args)).collect()) {
+            return ImmutableList.copyOf(iter);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected CloseableIterator<Row> streamSqlIter(String query, Object... 
args) {
+        return sEnv.executeSql(String.format(query, args)).collect();
+    }
+
+    protected CatalogTable table(String tableName) throws 
TableNotExistException {
+        Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get();
+        CatalogBaseTable table =
+                catalog.getTable(new ObjectPath(catalog.getDefaultDatabase(), 
tableName));
+        return (CatalogTable) table;
+    }
+
+    protected Path getTableDirectory(String tableName) {
+        return new Path(
+                new File(path, String.format("%s.db/%s", 
tEnv.getCurrentDatabase(), tableName))
+                        .toString());
+    }
+
+    @Nullable
+    protected Snapshot findLatestSnapshot(String tableName) {
+        SnapshotManager snapshotManager =
+                new SnapshotManager(LocalFileIO.create(), 
getTableDirectory(tableName));
+        Long id = snapshotManager.latestSnapshotId();
+        return id == null ? null : snapshotManager.snapshot(id);
+    }
+}
diff --git 
a/flink-table-store-flink/flink-table-store-flink-1.15/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
 
b/flink-table-store-flink/flink-table-store-flink-1.15/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
new file mode 100644
index 00000000..57ad15bb
--- /dev/null
+++ 
b/flink-table-store-flink/flink-table-store-flink-1.15/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for lookup join. */
+public class LookupJoinITCase extends CatalogITCaseBase {
+
+    @Override
+    public List<String> ddl() {
+        return Arrays.asList(
+                "CREATE TABLE T (i INT, `proctime` AS PROCTIME())",
+                "CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 
INT, k2 INT) WITH"
+                        + " ('continuous.discovery-interval'='1 ms')");
+    }
+
+    @Override
+    protected int defaultParallelism() {
+        return 1;
+    }
+
+    @Test
+    public void testLookup() throws Exception {
+        batchSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222)");
+
+        String query =
+                "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for 
system_time as of T.proctime AS D ON T.i = D.i";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+        batchSql("INSERT INTO T VALUES (1), (2), (3)");
+        List<Row> result = iterator.collect(3);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(2, 22, 222, 2222),
+                        Row.of(3, null, null, null));
+
+        batchSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        Thread.sleep(2000); // wait refresh
+        batchSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        result = iterator.collect(4);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(2, 44, 444, 4444),
+                        Row.of(3, 33, 333, 3333),
+                        Row.of(4, null, null, null));
+
+        iterator.close();
+    }
+}
diff --git 
a/flink-table-store-flink/flink-table-store-flink-1.15/src/test/resources/log4j2-test.properties
 
b/flink-table-store-flink/flink-table-store-flink-1.15/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000..1b3980d1
--- /dev/null
+++ 
b/flink-table-store-flink/flink-table-store-flink-1.15/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
index ac77d254..9aafc3a0 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/FileStoreLookupFunction.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.store.connector.lookup;
 
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.FunctionContext;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.store.CoreOptions;
@@ -44,11 +44,15 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
@@ -61,7 +65,9 @@ import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.trans
 import static org.apache.flink.table.store.utils.Preconditions.checkArgument;
 
 /** A lookup {@link TableFunction} for file store. */
-public class FileStoreLookupFunction extends 
TableFunction<org.apache.flink.table.data.RowData> {
+public class FileStoreLookupFunction implements Serializable, Closeable {
+
+    private static final long serialVersionUID = 1L;
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FileStoreLookupFunction.class);
 
@@ -113,9 +119,7 @@ public class FileStoreLookupFunction extends 
TableFunction<org.apache.flink.tabl
         this.predicate = predicate;
     }
 
-    @Override
     public void open(FunctionContext context) throws Exception {
-        super.open(context);
         String tmpDirectory = getTmpDirectory(context);
         this.path = new File(tmpDirectory, "lookup-" + UUID.randomUUID());
 
@@ -159,13 +163,17 @@ public class FileStoreLookupFunction extends 
TableFunction<org.apache.flink.tabl
                 TypeUtils.project(table.schema().logicalRowType(), 
projection), adjustedPredicate);
     }
 
-    /** Used by code generation. */
-    @SuppressWarnings("unused")
-    public void eval(Object... values) throws Exception {
-        checkRefresh();
-        List<InternalRow> results = lookupTable.get(new 
FlinkRowWrapper(GenericRowData.of(values)));
-        for (InternalRow matchedRow : results) {
-            collect(new FlinkRowData(matchedRow));
+    public Collection<RowData> lookup(RowData keyRow) {
+        try {
+            checkRefresh();
+            List<InternalRow> results = lookupTable.get(new 
FlinkRowWrapper(keyRow));
+            List<RowData> rows = new ArrayList<>(results.size());
+            for (InternalRow matchedRow : results) {
+                rows.add(new FlinkRowData(matchedRow));
+            }
+            return rows;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
     }
 
@@ -175,8 +183,8 @@ public class FileStoreLookupFunction extends 
TableFunction<org.apache.flink.tabl
         }
         if (nextLoadTime > 0) {
             LOG.info(
-                    "Lookup table has refreshed after {} minute(s), 
refreshing",
-                    refreshInterval.toMinutes());
+                    "Lookup table has refreshed after {} second(s), 
refreshing",
+                    refreshInterval.toMillis() / 1000);
         }
 
         refresh();
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/LookupRuntimeProviderFactory.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/LookupRuntimeProviderFactory.java
new file mode 100644
index 00000000..ffc9a256
--- /dev/null
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/LookupRuntimeProviderFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import 
org.apache.flink.table.connector.source.LookupTableSource.LookupRuntimeProvider;
+import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
+
+/** Factory to create {@link LookupRuntimeProvider}. */
+public class LookupRuntimeProviderFactory {
+
+    public static LookupRuntimeProvider create(FileStoreLookupFunction 
function) {
+        return LookupFunctionProvider.of(new NewLookupFunction(function));
+    }
+}
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/NewLookupFunction.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/NewLookupFunction.java
new file mode 100644
index 00000000..b5e17a29
--- /dev/null
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/lookup/NewLookupFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.lookup;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.LookupFunction;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/** New {@link LookupFunction} for 1.16+, it supports Flink retry join. */
+public class NewLookupFunction extends LookupFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private final FileStoreLookupFunction function;
+
+    public NewLookupFunction(FileStoreLookupFunction function) {
+        this.function = function;
+    }
+
+    @Override
+    public void open(FunctionContext context) throws Exception {
+        function.open(context);
+    }
+
+    @Override
+    public Collection<RowData> lookup(RowData keyRow) throws IOException {
+        return function.lookup(keyRow);
+    }
+
+    @Override
+    public void close() throws Exception {
+        function.close();
+    }
+}
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index b120cec7..b2b0ae09 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.LookupTableSource;
-import org.apache.flink.table.connector.source.TableFunctionProvider;
 import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableFactory;
@@ -33,6 +32,7 @@ import 
org.apache.flink.table.store.CoreOptions.LogConsistency;
 import org.apache.flink.table.store.connector.FlinkConnectorOptions;
 import org.apache.flink.table.store.connector.TableStoreDataStreamScanProvider;
 import org.apache.flink.table.store.connector.lookup.FileStoreLookupFunction;
+import 
org.apache.flink.table.store.connector.lookup.LookupRuntimeProviderFactory;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.log.LogSourceProvider;
 import org.apache.flink.table.store.log.LogStoreTableFactory;
@@ -206,7 +206,7 @@ public class TableStoreSource extends FlinkTableSource
                         ? IntStream.range(0, 
table.schema().fields().size()).toArray()
                         : Projection.of(projectFields).toTopLevelIndexes();
         int[] joinKey = Projection.of(context.getKeys()).toTopLevelIndexes();
-        return TableFunctionProvider.of(
+        return LookupRuntimeProviderFactory.create(
                 new FileStoreLookupFunction(table, projection, joinKey, 
predicate));
     }
 }
diff --git 
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
index 44f5e1ee..76023c8f 100644
--- 
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
+++ 
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
@@ -18,55 +18,40 @@
 
 package org.apache.flink.table.store.connector;
 
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.store.connector.util.AbstractTestBase;
 import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.types.Row;
 
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.time.Duration;
+import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 
-import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** ITCase for lookup join. */
-public class LookupJoinITCase extends AbstractTestBase {
-
-    private TableEnvironment env;
-
-    @BeforeEach
-    public void before() throws Exception {
-        env = 
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
-        env.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, 
Duration.ofMillis(100));
-        env.getConfig()
-                .getConfiguration()
-                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
-        String path = getTempDirPath();
-        env.executeSql(
-                String.format(
-                        "CREATE CATALOG my_catalog WITH ('type'='table-store', 
'warehouse'='%s')",
-                        path));
-        executeSql("USE CATALOG my_catalog");
-        executeSql("CREATE TABLE T (i INT, `proctime` AS PROCTIME())");
-        executeSql(
+public class LookupJoinITCase extends CatalogITCaseBase {
+
+    @Override
+    public List<String> ddl() {
+        return Arrays.asList(
+                "CREATE TABLE T (i INT, `proctime` AS PROCTIME())",
                 "CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 
INT, k2 INT) WITH"
                         + " ('continuous.discovery-interval'='1 ms')");
     }
 
+    @Override
+    protected int defaultParallelism() {
+        return 1;
+    }
+
     @Test
     public void testLookupEmptyTable() throws Exception {
         String query =
                 "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for 
system_time as of T.proctime AS D ON T.i = D.i";
-        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
 
-        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        sql("INSERT INTO T VALUES (1), (2), (3)");
 
         List<Row> result = iterator.collect(3);
         assertThat(result)
@@ -75,9 +60,9 @@ public class LookupJoinITCase extends AbstractTestBase {
                         Row.of(2, null, null, null),
                         Row.of(3, null, null, null));
 
-        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222)");
+        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
         Thread.sleep(2000); // wait refresh
-        executeSql("INSERT INTO T VALUES (1), (2), (4)");
+        sql("INSERT INTO T VALUES (1), (2), (4)");
         result = iterator.collect(3);
         assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -89,13 +74,13 @@ public class LookupJoinITCase extends AbstractTestBase {
 
     @Test
     public void testLookup() throws Exception {
-        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222)");
+        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
 
         String query =
                 "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for 
system_time as of T.proctime AS D ON T.i = D.i";
-        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
 
-        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        sql("INSERT INTO T VALUES (1), (2), (3)");
         List<Row> result = iterator.collect(3);
         assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -103,9 +88,9 @@ public class LookupJoinITCase extends AbstractTestBase {
                         Row.of(2, 22, 222, 2222),
                         Row.of(3, null, null, null));
 
-        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
         Thread.sleep(2000); // wait refresh
-        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        sql("INSERT INTO T VALUES (1), (2), (3), (4)");
         result = iterator.collect(4);
         assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -119,13 +104,13 @@ public class LookupJoinITCase extends AbstractTestBase {
 
     @Test
     public void testLookupWithLatest() throws Exception {
-        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222)");
+        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
         String query =
                 "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM /*+ 
OPTIONS('scan.mode'='latest') */"
                         + " for system_time as of T.proctime AS D ON T.i = 
D.i";
-        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
 
-        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        sql("INSERT INTO T VALUES (1), (2), (3)");
         List<Row> result = iterator.collect(3);
         assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -133,9 +118,9 @@ public class LookupJoinITCase extends AbstractTestBase {
                         Row.of(2, 22, 222, 2222),
                         Row.of(3, null, null, null));
 
-        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
         Thread.sleep(2000); // wait refresh
-        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        sql("INSERT INTO T VALUES (1), (2), (3), (4)");
         result = iterator.collect(4);
         assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -149,21 +134,21 @@ public class LookupJoinITCase extends AbstractTestBase {
 
     @Test
     public void testLookupProjection() throws Exception {
-        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222)");
+        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
 
         String query =
                 "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as 
of T.proctime AS D ON T.i = D.i";
-        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
 
-        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        sql("INSERT INTO T VALUES (1), (2), (3)");
         List<Row> result = iterator.collect(3);
         assertThat(result)
                 .containsExactlyInAnyOrder(
                         Row.of(1, 11, 111), Row.of(2, 22, 222), Row.of(3, 
null, null));
 
-        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
         Thread.sleep(2000); // wait refresh
-        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        sql("INSERT INTO T VALUES (1), (2), (3), (4)");
         result = iterator.collect(4);
         assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -177,21 +162,21 @@ public class LookupJoinITCase extends AbstractTestBase {
 
     @Test
     public void testLookupFilterPk() throws Exception {
-        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222)");
+        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
 
         String query =
                 "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as 
of T.proctime AS D ON T.i = D.i AND D.i > 2";
-        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
 
-        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        sql("INSERT INTO T VALUES (1), (2), (3)");
         List<Row> result = iterator.collect(3);
         assertThat(result)
                 .containsExactlyInAnyOrder(
                         Row.of(1, null, null), Row.of(2, null, null), 
Row.of(3, null, null));
 
-        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
         Thread.sleep(2000); // wait refresh
-        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        sql("INSERT INTO T VALUES (1), (2), (3), (4)");
         result = iterator.collect(4);
         assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -205,21 +190,21 @@ public class LookupJoinITCase extends AbstractTestBase {
 
     @Test
     public void testLookupFilterSelect() throws Exception {
-        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222)");
+        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
 
         String query =
                 "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as 
of T.proctime AS D ON T.i = D.i AND D.k1 > 111";
-        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
 
-        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        sql("INSERT INTO T VALUES (1), (2), (3)");
         List<Row> result = iterator.collect(3);
         assertThat(result)
                 .containsExactlyInAnyOrder(
                         Row.of(1, null, null), Row.of(2, 22, 222), Row.of(3, 
null, null));
 
-        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
         Thread.sleep(2000); // wait refresh
-        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        sql("INSERT INTO T VALUES (1), (2), (3), (4)");
         result = iterator.collect(4);
         assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -233,21 +218,21 @@ public class LookupJoinITCase extends AbstractTestBase {
 
     @Test
     public void testLookupFilterUnSelect() throws Exception {
-        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222)");
+        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
 
         String query =
                 "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as 
of T.proctime AS D ON T.i = D.i AND D.k2 > 1111";
-        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
 
-        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        sql("INSERT INTO T VALUES (1), (2), (3)");
         List<Row> result = iterator.collect(3);
         assertThat(result)
                 .containsExactlyInAnyOrder(
                         Row.of(1, null, null), Row.of(2, 22, 222), Row.of(3, 
null, null));
 
-        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
         Thread.sleep(2000); // wait refresh
-        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        sql("INSERT INTO T VALUES (1), (2), (3), (4)");
         result = iterator.collect(4);
         assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -261,21 +246,21 @@ public class LookupJoinITCase extends AbstractTestBase {
 
     @Test
     public void testLookupFilterUnSelectAndUpdate() throws Exception {
-        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222)");
+        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
 
         String query =
                 "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as 
of T.proctime AS D ON T.i = D.i AND D.k2 < 4444";
-        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
 
-        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        sql("INSERT INTO T VALUES (1), (2), (3)");
         List<Row> result = iterator.collect(3);
         assertThat(result)
                 .containsExactlyInAnyOrder(
                         Row.of(1, 11, 111), Row.of(2, 22, 222), Row.of(3, 
null, null));
 
-        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
         Thread.sleep(2000); // wait refresh
-        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        sql("INSERT INTO T VALUES (1), (2), (3), (4)");
         result = iterator.collect(4);
         assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -289,14 +274,13 @@ public class LookupJoinITCase extends AbstractTestBase {
 
     @Test
     public void testNonPkLookup() throws Exception {
-        executeSql(
-                "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222), (3, 22, 333, 3333)");
+        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), 
(3, 22, 333, 3333)");
 
         String query =
                 "SELECT D.i, T.i, D.k1, D.k2 FROM T LEFT JOIN DIM for 
system_time as of T.proctime AS D ON T.i = D.j";
-        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
 
-        executeSql("INSERT INTO T VALUES (11), (22), (33)");
+        sql("INSERT INTO T VALUES (11), (22), (33)");
         List<Row> result = iterator.collect(4);
         assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -305,9 +289,9 @@ public class LookupJoinITCase extends AbstractTestBase {
                         Row.of(3, 22, 333, 3333),
                         Row.of(null, 33, null, null));
 
-        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
         Thread.sleep(2000); // wait refresh
-        executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+        sql("INSERT INTO T VALUES (11), (22), (33), (44)");
         result = iterator.collect(4);
         assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -321,22 +305,21 @@ public class LookupJoinITCase extends AbstractTestBase {
 
     @Test
     public void testNonPkLookupProjection() throws Exception {
-        executeSql(
-                "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222), (3, 22, 333, 3333)");
+        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), 
(3, 22, 333, 3333)");
 
         String query =
                 "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of 
T.proctime AS D ON T.i = D.j";
-        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
 
-        executeSql("INSERT INTO T VALUES (11), (22), (33)");
+        sql("INSERT INTO T VALUES (11), (22), (33)");
         List<Row> result = iterator.collect(4);
         assertThat(result)
                 .containsExactlyInAnyOrder(
                         Row.of(11, 111), Row.of(22, 222), Row.of(22, 333), 
Row.of(33, null));
 
-        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
         Thread.sleep(2000); // wait refresh
-        executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+        sql("INSERT INTO T VALUES (11), (22), (33), (44)");
         result = iterator.collect(4);
         assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -347,21 +330,20 @@ public class LookupJoinITCase extends AbstractTestBase {
 
     @Test
     public void testNonPkLookupFilterPk() throws Exception {
-        executeSql(
-                "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222), (3, 22, 333, 3333)");
+        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), 
(3, 22, 333, 3333)");
 
         String query =
                 "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of 
T.proctime AS D ON T.i = D.j AND D.i > 2";
-        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
 
-        executeSql("INSERT INTO T VALUES (11), (22), (33)");
+        sql("INSERT INTO T VALUES (11), (22), (33)");
         List<Row> result = iterator.collect(3);
         assertThat(result)
                 .containsExactlyInAnyOrder(Row.of(11, null), Row.of(22, 333), 
Row.of(33, null));
 
-        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
         Thread.sleep(2000); // wait refresh
-        executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+        sql("INSERT INTO T VALUES (11), (22), (33), (44)");
         result = iterator.collect(4);
         assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -372,22 +354,21 @@ public class LookupJoinITCase extends AbstractTestBase {
 
     @Test
     public void testNonPkLookupFilterSelect() throws Exception {
-        executeSql(
-                "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222), (3, 22, 333, 3333)");
+        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), 
(3, 22, 333, 3333)");
 
         String query =
                 "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of 
T.proctime AS D ON T.i = D.j AND D.k1 > 111";
-        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
 
-        executeSql("INSERT INTO T VALUES (11), (22), (33)");
+        sql("INSERT INTO T VALUES (11), (22), (33)");
         List<Row> result = iterator.collect(4);
         assertThat(result)
                 .containsExactlyInAnyOrder(
                         Row.of(11, null), Row.of(22, 222), Row.of(22, 333), 
Row.of(33, null));
 
-        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
         Thread.sleep(2000); // wait refresh
-        executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+        sql("INSERT INTO T VALUES (11), (22), (33), (44)");
         result = iterator.collect(4);
         assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -398,22 +379,21 @@ public class LookupJoinITCase extends AbstractTestBase {
 
     @Test
     public void testNonPkLookupFilterUnSelect() throws Exception {
-        executeSql(
-                "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222), (3, 22, 333, 3333)");
+        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), 
(3, 22, 333, 3333)");
 
         String query =
                 "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of 
T.proctime AS D ON T.i = D.j AND D.k2 > 1111";
-        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
 
-        executeSql("INSERT INTO T VALUES (11), (22), (33)");
+        sql("INSERT INTO T VALUES (11), (22), (33)");
         List<Row> result = iterator.collect(4);
         assertThat(result)
                 .containsExactlyInAnyOrder(
                         Row.of(11, null), Row.of(22, 222), Row.of(22, 333), 
Row.of(33, null));
 
-        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
         Thread.sleep(2000); // wait refresh
-        executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+        sql("INSERT INTO T VALUES (11), (22), (33), (44)");
         result = iterator.collect(4);
         assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -424,22 +404,21 @@ public class LookupJoinITCase extends AbstractTestBase {
 
     @Test
     public void testNonPkLookupFilterUnSelectAndUpdate() throws Exception {
-        executeSql(
-                "INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222), (3, 22, 333, 3333)");
+        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222), 
(3, 22, 333, 3333)");
 
         String query =
                 "SELECT T.i, D.k1 FROM T LEFT JOIN DIM for system_time as of 
T.proctime AS D ON T.i = D.j AND D.k2 < 4444";
-        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
 
-        executeSql("INSERT INTO T VALUES (11), (22), (33)");
+        sql("INSERT INTO T VALUES (11), (22), (33)");
         List<Row> result = iterator.collect(4);
         assertThat(result)
                 .containsExactlyInAnyOrder(
                         Row.of(11, 111), Row.of(22, 222), Row.of(22, 333), 
Row.of(33, null));
 
-        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 
3333)");
+        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444), (3, 33, 333, 3333)");
         Thread.sleep(2000); // wait refresh
-        executeSql("INSERT INTO T VALUES (11), (22), (33), (44)");
+        sql("INSERT INTO T VALUES (11), (22), (33), (44)");
         result = iterator.collect(4);
         assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -450,22 +429,22 @@ public class LookupJoinITCase extends AbstractTestBase {
 
     @Test
     public void testRepeatRefresh() throws Exception {
-        executeSql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 
2222)");
+        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
 
         String query =
                 "SELECT T.i, D.j, D.k1 FROM T LEFT JOIN DIM for system_time as 
of T.proctime AS D ON T.i = D.i";
-        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
 
-        executeSql("INSERT INTO T VALUES (1), (2), (3)");
+        sql("INSERT INTO T VALUES (1), (2), (3)");
         List<Row> result = iterator.collect(3);
         assertThat(result)
                 .containsExactlyInAnyOrder(
                         Row.of(1, 11, 111), Row.of(2, 22, 222), Row.of(3, 
null, null));
 
-        executeSql("INSERT INTO DIM VALUES (2, 44, 444, 4444)");
-        executeSql("INSERT INTO DIM VALUES (3, 33, 333, 3333)");
+        sql("INSERT INTO DIM VALUES (2, 44, 444, 4444)");
+        sql("INSERT INTO DIM VALUES (3, 33, 333, 3333)");
         Thread.sleep(2000); // wait refresh
-        executeSql("INSERT INTO T VALUES (1), (2), (3), (4)");
+        sql("INSERT INTO T VALUES (1), (2), (3), (4)");
         result = iterator.collect(4);
         assertThat(result)
                 .containsExactlyInAnyOrder(
@@ -479,12 +458,12 @@ public class LookupJoinITCase extends AbstractTestBase {
 
     @Test
     public void testLookupPartialUpdateIllegal() throws Exception {
-        executeSql(
+        sql(
                 "CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 
INT, k2 INT) WITH"
                         + " 
('merge-engine'='partial-update','continuous.discovery-interval'='1 ms')");
         String query =
                 "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM2 for 
system_time as of T.proctime AS D ON T.i = D.i";
-        assertThatThrownBy(() -> env.executeSql(query))
+        assertThatThrownBy(() -> sEnv.executeSql(query))
                 .hasRootCauseMessage(
                         "Partial update continuous reading is not supported. "
                                 + "You can use full compaction changelog 
producer to support streaming reading.");
@@ -492,28 +471,46 @@ public class LookupJoinITCase extends AbstractTestBase {
 
     @Test
     public void testLookupPartialUpdate() throws Exception {
-        executeSql(
+        sql(
                 "CREATE TABLE DIM2 (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 
INT, k2 INT) WITH"
                         + " ('merge-engine'='partial-update',"
                         + " 'changelog-producer'='full-compaction',"
                         + " 'changelog-producer.compaction-interval'='1 s',"
                         + " 'continuous.discovery-interval'='10 ms')");
-        executeSql("INSERT INTO DIM2 VALUES (1, CAST(NULL AS INT), 111, 
CAST(NULL AS INT))");
+        sql("INSERT INTO DIM2 VALUES (1, CAST(NULL AS INT), 111, CAST(NULL AS 
INT))");
         String query =
                 "SELECT T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM2 for 
system_time as of T.proctime AS D ON T.i = D.i";
-        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(env.executeSql(query).collect());
-        executeSql("INSERT INTO T VALUES (1)");
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
+        sql("INSERT INTO T VALUES (1)");
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(1, 
null, 111, null));
 
-        executeSql("INSERT INTO DIM2 VALUES (1, 11, CAST(NULL AS INT), 1111)");
+        sql("INSERT INTO DIM2 VALUES (1, 11, CAST(NULL AS INT), 1111)");
         Thread.sleep(2000); // wait refresh
-        executeSql("INSERT INTO T VALUES (1)");
+        sql("INSERT INTO T VALUES (1)");
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(1, 
11, 111, 1111));
 
         iterator.close();
     }
 
-    private void executeSql(String sql) throws ExecutionException, 
InterruptedException {
-        env.executeSql(sql).await();
+    @Test
+    public void testRetryLookup() throws Exception {
+        sql("INSERT INTO DIM VALUES (1, 11, 111, 1111), (2, 22, 222, 2222)");
+
+        String query =
+                "SELECT /*+ LOOKUP('table'='DIM', 
'retry-predicate'='lookup_miss',"
+                        + " 'retry-strategy'='fixed_delay', 
'fixed-delay'='1s','max-attempts'='60') */"
+                        + " T.i, D.j, D.k1, D.k2 FROM T LEFT JOIN DIM for 
system_time as of T.proctime AS D ON T.i = D.i";
+        BlockingIterator<Row, Row> iterator = 
BlockingIterator.of(sEnv.executeSql(query).collect());
+
+        sql("INSERT INTO T VALUES (1), (2), (3)");
+        Thread.sleep(2000); // wait
+        sql("INSERT INTO DIM VALUES (3, 33, 333, 3333)");
+        assertThat(iterator.collect(3))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111, 1111),
+                        Row.of(2, 22, 222, 2222),
+                        Row.of(3, 33, 333, 3333));
+
+        iterator.close();
     }
 }

Reply via email to