This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git

commit 26655e5e8864511d20208ebf6d8a2d0314a4c4b2
Author: Joao Boto <[email protected]>
AuthorDate: Fri May 24 17:22:07 2024 +0200

    [FLINK-35370] Add temp module to test backward compatibility
---
 .github/workflows/backwards_compatibility.yml      | 103 ++++
 .../pom.xml                                        |  86 +++
 .../backward/compatibility/DataStreamSinkTest.java | 125 +++++
 .../compatibility/DataStreamSourceTest.java        | 199 +++++++
 .../compatibility/DynamicTableSinkTest.java        | 327 +++++++++++
 .../compatibility/DynamicTableSourceTest.java      | 605 +++++++++++++++++++++
 .../src/test/resources/log4j2-test.properties      |  28 +
 7 files changed, 1473 insertions(+)

diff --git a/.github/workflows/backwards_compatibility.yml 
b/.github/workflows/backwards_compatibility.yml
new file mode 100644
index 00000000..139b3d9e
--- /dev/null
+++ b/.github/workflows/backwards_compatibility.yml
@@ -0,0 +1,103 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# We need to specify repo related information here since Apache INFRA doesn't 
differentiate
+# between several workflows with the same names while preparing a report for 
GHA usage
+# https://infra-reports.apache.org/#ghactions
+name: Flink Connector JDBC CI Backwards Compatibility
+on: [push, pull_request]
+concurrency:
+  group: ${{ github.workflow }}-${{ github.ref }}
+  cancel-in-progress: true
+jobs:
+  backwards_compatibility_test:
+    runs-on: ubuntu-latest
+    strategy:
+      matrix:
+        flink: [1.18-SNAPSHOT, 1.19-SNAPSHOT]
+        jdk: [8, 11, 17]
+
+    env:
+      MVN_CONNECTION_OPTIONS: -Dhttp.keepAlive=false 
-Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120
+      FLINK_CACHE_DIR: "/tmp/cache/flink"
+      MVN_BUILD_OUTPUT_FILE: "/tmp/mvn_build_output.out"
+      MVN_VALIDATION_DIR: "/tmp/flink-validation-deployment"
+
+    steps:
+      - name: Check out repository code
+        uses: actions/checkout@v4
+
+      - name: Set JDK
+        uses: actions/setup-java@v4
+        with:
+          java-version: ${{ matrix.jdk }}
+          distribution: 'temurin'
+          cache: 'maven'
+
+      - name: Set Maven 3.8.6
+        uses: stCarolas/setup-maven@v5
+        with:
+          maven-version: 3.8.6
+
+      - name: "Determine Flink binary url"
+        run: |
+          binary_url=""
+          if [[ "${{ matrix.flink }}" = *-SNAPSHOT ]]; then
+            binary_url=https://s3.amazonaws.com/flink-nightly/flink-${{ 
matrix.flink }}-bin-scala_2.12.tgz
+            cache_binary=false
+          else
+            binary_url=https://archive.apache.org/dist/flink/flink-${{ 
matrix.flink }}/flink-${{ matrix.flink }}-bin-scala_2.12.tgz
+            cache_binary=true
+          fi
+          echo "binary_url=$binary_url" >> ${GITHUB_ENV}
+
+      - name: "Print Flink binary url / caching"
+        run: echo "${{ env.binary_url }} / caching=${{ env.cache_binary }}"
+
+      - name: Create cache dirs
+        run: mkdir -p ${{ env.FLINK_CACHE_DIR }}
+
+      - name: Download Flink binary
+        working-directory: ${{ env.FLINK_CACHE_DIR }}
+        run: wget -q -c ${{ env.binary_url }} -O - | tar -xz
+
+      - name: Install JDBC Libs
+        run: |
+          set -o pipefail
+
+          mvn clean install -U -B --no-transfer-progress -Dflink.version=${{ 
matrix.flink }} \
+            -DskipTests \
+            -DaltDeploymentRepository=validation_repository::default::file:${{ 
env.MVN_VALIDATION_DIR }} \
+            -Dscala-2.12 \
+            -DdistDir=${{ env.FLINK_CACHE_DIR }}/flink-${{ matrix.flink }} \
+            ${{ env.MVN_CONNECTION_OPTIONS }} \
+            -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties \
+            | tee ${{ env.MVN_BUILD_OUTPUT_FILE }}
+
+      - name: Test backwards compatibility
+        working-directory: ./flink-connector-jdbc-backward-compatibility
+        run: |
+          set -o pipefail
+
+          mvn clean test -U -B --no-transfer-progress -Dflink.version=${{ 
matrix.flink }} \
+            -DaltDeploymentRepository=validation_repository::default::file:${{ 
env.MVN_VALIDATION_DIR }} \
+            -Dscala-2.12 \
+            -DdistDir=${{ env.FLINK_CACHE_DIR }}/flink-${{ matrix.flink }} \
+            ${{ env.MVN_CONNECTION_OPTIONS }} \
+            -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties \
+            | tee ${{ env.MVN_BUILD_OUTPUT_FILE }}
\ No newline at end of file
diff --git a/flink-connector-jdbc-backward-compatibility/pom.xml 
b/flink-connector-jdbc-backward-compatibility/pom.xml
new file mode 100644
index 00000000..ac8c9874
--- /dev/null
+++ b/flink-connector-jdbc-backward-compatibility/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-connector-jdbc-parent</artifactId>
+        <version>3.2-SNAPSHOT</version>
+    </parent>
+
+    <groupId>org.apache.flink</groupId>
+    <artifactId>flink-connector-jdbc-backward-compatibility</artifactId>
+    <version>3.2-SNAPSHOT</version>
+    <name>Flink : Connectors : JDBC : Backward Compatibility</name>
+    <packaging>jar</packaging>
+
+    <properties>
+        <surefire.module.config>
+            --add-opens=java.base/java.util=ALL-UNNAMED
+            --add-opens=java.base/java.lang=ALL-UNNAMED
+        </surefire.module.config>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_2.12</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_2.12</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-jdbc</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-jdbc</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>42.7.3</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSinkTest.java
 
b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSinkTest.java
new file mode 100644
index 00000000..30f97812
--- /dev/null
+++ 
b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSinkTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.backward.compatibility;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
+import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import org.apache.flink.connector.jdbc.JdbcSink;
+import org.apache.flink.connector.jdbc.databases.postgres.PostgresTestBase;
+import org.apache.flink.connector.jdbc.testutils.TableManaged;
+import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for backward compatibility. */
+public class DataStreamSinkTest implements PostgresTestBase {
+
+    private static final BooksTable TEST_TABLE = new BooksTable("SinkTable");
+
+    private static final List<BooksTable.BookEntry> BOOKS =
+            Arrays.stream(TEST_DATA)
+                    .map(
+                            book ->
+                                    new BooksTable.BookEntry(
+                                            book.id, book.title, book.author, 
book.price, book.qty))
+                    .collect(Collectors.toList());
+
+    @Override
+    public List<TableManaged> getManagedTables() {
+        return Collections.singletonList(TEST_TABLE);
+    }
+
+    @RegisterExtension
+    static final MiniClusterExtension MINI_CLUSTER =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(2)
+                            .setNumberSlotsPerTaskManager(2)
+                            .build());
+
+    @Test
+    public void testAtLeastOnce() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        env.setParallelism(1);
+
+        assertResult(new ArrayList<>());
+
+        env.fromCollection(BOOKS)
+                .sinkTo(
+                        JdbcSink.<BooksTable.BookEntry>builder()
+                                .withQueryStatement(
+                                        TEST_TABLE.getInsertIntoQuery(),
+                                        TEST_TABLE.getStatementBuilder())
+                                .buildAtLeastOnce(
+                                        new 
JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
+                                                
.withUrl(getMetadata().getJdbcUrl())
+                                                
.withUsername(getMetadata().getUsername())
+                                                
.withPassword(getMetadata().getPassword())
+                                                
.withDriverName(getMetadata().getDriverClass())
+                                                .build()));
+        env.execute();
+
+        assertResult(BOOKS);
+    }
+
+    @Test
+    public void testExactlyOnce() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        env.setParallelism(1);
+
+        assertResult(new ArrayList<>());
+
+        env.fromCollection(BOOKS)
+                .sinkTo(
+                        JdbcSink.<BooksTable.BookEntry>builder()
+                                .withQueryStatement(
+                                        TEST_TABLE.getInsertIntoQuery(),
+                                        TEST_TABLE.getStatementBuilder())
+                                .withExecutionOptions(
+                                        
JdbcExecutionOptions.builder().withMaxRetries(0).build())
+                                .buildExactlyOnce(
+                                        JdbcExactlyOnceOptions.defaults(),
+                                        getMetadata().getXaSourceSupplier()));
+        env.execute();
+
+        assertResult(BOOKS);
+    }
+
+    private void assertResult(List<BooksTable.BookEntry> expected) throws 
SQLException {
+        
assertThat(TEST_TABLE.selectAllTable(getMetadata().getConnection())).isEqualTo(expected);
+    }
+}
diff --git 
a/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSourceTest.java
 
b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSourceTest.java
new file mode 100644
index 00000000..3ac60f25
--- /dev/null
+++ 
b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSourceTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.backward.compatibility;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.jdbc.JdbcTestFixture;
+import org.apache.flink.connector.jdbc.databases.postgres.PostgresTestBase;
+import org.apache.flink.connector.jdbc.source.JdbcSource;
+import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor;
+import 
org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider;
+import org.apache.flink.connector.jdbc.testutils.TableManaged;
+import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for backward compatibility. */
+public class DataStreamSourceTest implements PostgresTestBase {
+    public static Queue<JdbcTestFixture.TestEntry> collectedRecords;
+    private static final BooksTable TEST_TABLE = new BooksTable("SourceTable");
+
+    protected final ResultExtractor<JdbcTestFixture.TestEntry> extractor =
+            resultSet ->
+                    new JdbcTestFixture.TestEntry(
+                            resultSet.getInt("id"),
+                            resultSet.getString("title"),
+                            resultSet.getString("author"),
+                            // Avoid the 'null -> 0.0d' bug on calling 
'getDouble'
+                            (Double) resultSet.getObject("price"),
+                            resultSet.getInt("qty"));
+
+    @Override
+    public List<TableManaged> getManagedTables() {
+        return Collections.singletonList(TEST_TABLE);
+    }
+
+    @RegisterExtension
+    static final MiniClusterExtension MINI_CLUSTER =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(2)
+                            .setNumberSlotsPerTaskManager(2)
+                            .build());
+
+    @BeforeEach
+    void init() throws SQLException {
+        collectedRecords = new ConcurrentLinkedDeque<>();
+        try (Connection conn = getMetadata().getConnection();
+                PreparedStatement ps = 
conn.prepareStatement(TEST_TABLE.getInsertIntoQuery())) {
+            for (int i = 0; i < TEST_DATA.length; i++) {
+                ps.setInt(1, TEST_DATA[i].id);
+                ps.setString(2, TEST_DATA[i].title);
+                ps.setString(3, TEST_DATA[i].author);
+                if (TEST_DATA[i].price == null) {
+                    ps.setNull(4, Types.DOUBLE);
+                } else {
+                    ps.setDouble(4, TEST_DATA[i].price);
+                }
+                ps.setInt(5, TEST_DATA[i].qty);
+                ps.execute();
+            }
+        }
+    }
+
+    @Test
+    void testReadWithoutParallelismWithoutParamsProvider() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        env.setParallelism(1);
+        JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
+                JdbcSource.<JdbcTestFixture.TestEntry>builder()
+                        
.setTypeInformation(TypeInformation.of(JdbcTestFixture.TestEntry.class))
+                        .setSql(TEST_TABLE.getSelectAllQuery())
+                        .setDBUrl(getMetadata().getJdbcUrl())
+                        .setUsername(getMetadata().getUsername())
+                        .setPassword(getMetadata().getPassword())
+                        .setDriverName(getMetadata().getDriverClass())
+                        .setResultExtractor(extractor)
+                        .build();
+        env.fromSource(jdbcSource, WatermarkStrategy.noWatermarks(), 
"TestSource")
+                .addSink(new TestingSinkFunction());
+        env.execute();
+        assertThat(collectedRecords).containsExactlyInAnyOrder(TEST_DATA);
+    }
+
+    @Test
+    void testReadWithoutParallelismWithParamsProvider() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        env.setParallelism(1);
+        JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
+                JdbcSource.<JdbcTestFixture.TestEntry>builder()
+                        
.setTypeInformation(TypeInformation.of(JdbcTestFixture.TestEntry.class))
+                        .setSql(TEST_TABLE.getSelectByIdBetweenQuery())
+                        .setJdbcParameterValuesProvider(
+                                new JdbcGenericParameterValuesProvider(
+                                        new Serializable[][] {{1001, 1005}, 
{1006, 1010}}))
+                        .setDBUrl(getMetadata().getJdbcUrl())
+                        .setUsername(getMetadata().getUsername())
+                        .setPassword(getMetadata().getPassword())
+                        .setDriverName(getMetadata().getDriverClass())
+                        .setResultExtractor(extractor)
+                        .build();
+        env.fromSource(jdbcSource, WatermarkStrategy.noWatermarks(), 
"TestSource")
+                .addSink(new TestingSinkFunction());
+        env.execute();
+        assertThat(collectedRecords).containsExactlyInAnyOrder(TEST_DATA);
+    }
+
+    @Test
+    void testReadWithParallelismWithoutParamsProvider() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        env.setParallelism(2);
+        JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
+                JdbcSource.<JdbcTestFixture.TestEntry>builder()
+                        
.setTypeInformation(TypeInformation.of(JdbcTestFixture.TestEntry.class))
+                        .setSql(TEST_TABLE.getSelectAllQuery())
+                        .setDBUrl(getMetadata().getJdbcUrl())
+                        .setUsername(getMetadata().getUsername())
+                        .setPassword(getMetadata().getPassword())
+                        .setDriverName(getMetadata().getDriverClass())
+                        .setResultExtractor(extractor)
+                        .build();
+        env.fromSource(jdbcSource, WatermarkStrategy.noWatermarks(), 
"TestSource")
+                .addSink(new TestingSinkFunction());
+        env.execute();
+        assertThat(collectedRecords).containsExactlyInAnyOrder(TEST_DATA);
+    }
+
+    @Test
+    void testReadWithParallelismWithParamsProvider() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRestartStrategy(new 
RestartStrategies.NoRestartStrategyConfiguration());
+        env.setParallelism(2);
+        JdbcSource<JdbcTestFixture.TestEntry> jdbcSource =
+                JdbcSource.<JdbcTestFixture.TestEntry>builder()
+                        
.setTypeInformation(TypeInformation.of(JdbcTestFixture.TestEntry.class))
+                        .setSql(TEST_TABLE.getSelectByIdBetweenQuery())
+                        .setJdbcParameterValuesProvider(
+                                new JdbcGenericParameterValuesProvider(
+                                        new Serializable[][] {{1001, 1005}, 
{1006, 1010}}))
+                        .setDBUrl(getMetadata().getJdbcUrl())
+                        .setUsername(getMetadata().getUsername())
+                        .setPassword(getMetadata().getPassword())
+                        .setDriverName(getMetadata().getDriverClass())
+                        .setResultExtractor(extractor)
+                        .build();
+        env.fromSource(jdbcSource, WatermarkStrategy.noWatermarks(), 
"TestSource")
+                .addSink(new TestingSinkFunction());
+        env.execute();
+        assertThat(collectedRecords).containsExactlyInAnyOrder(TEST_DATA);
+    }
+
+    /** A sink function to collect the records. */
+    static class TestingSinkFunction implements 
SinkFunction<JdbcTestFixture.TestEntry> {
+
+        @Override
+        public void invoke(JdbcTestFixture.TestEntry value, Context context) 
throws Exception {
+            collectedRecords.add(value);
+        }
+    }
+}
diff --git 
a/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DynamicTableSinkTest.java
 
b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DynamicTableSinkTest.java
new file mode 100644
index 00000000..c926e6bf
--- /dev/null
+++ 
b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DynamicTableSinkTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.backward.compatibility;
+
+import org.apache.flink.connector.jdbc.databases.postgres.PostgresTestBase;
+import org.apache.flink.connector.jdbc.testutils.TableManaged;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField;
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.tableRow;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for backward compatibility. */
+public class DynamicTableSinkTest implements PostgresTestBase {
+
+    @RegisterExtension
+    static final MiniClusterExtension MINI_CLUSTER =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(2)
+                            .setNumberSlotsPerTaskManager(2)
+                            .build());
+
+    protected final TableRow upsertOutputTable = createUpsertOutputTable();
+    protected final TableRow appendOutputTable = createAppendOutputTable();
+    protected final TableRow batchOutputTable = createBatchOutputTable();
+    protected final TableRow userOutputTable = createUserOutputTable();
+
+    protected TableRow createUpsertOutputTable() {
+        return tableRow(
+                "dynamicSinkForUpsert",
+                pkField("cnt", DataTypes.BIGINT().notNull()),
+                field("lencnt", DataTypes.BIGINT().notNull()),
+                pkField("cTag", DataTypes.INT().notNull()),
+                field("ts", dbType("TIMESTAMP"), DataTypes.TIMESTAMP()));
+    }
+
+    protected TableRow createAppendOutputTable() {
+        return tableRow(
+                "dynamicSinkForAppend",
+                field("id", DataTypes.INT().notNull()),
+                field("num", DataTypes.BIGINT().notNull()),
+                field("ts", dbType("TIMESTAMP"), DataTypes.TIMESTAMP()));
+    }
+
+    protected TableRow createBatchOutputTable() {
+        return tableRow(
+                "dynamicSinkForBatch",
+                field("NAME", DataTypes.VARCHAR(20).notNull()),
+                field("SCORE", DataTypes.BIGINT().notNull()));
+    }
+
+    protected TableRow createUserOutputTable() {
+        return tableRow(
+                "USER_TABLE",
+                pkField("user_id", DataTypes.VARCHAR(20).notNull()),
+                field("user_name", DataTypes.VARCHAR(20).notNull()),
+                field("email", DataTypes.VARCHAR(255)),
+                field("balance", DataTypes.DECIMAL(18, 2)),
+                field("balance2", DataTypes.DECIMAL(18, 2)));
+    }
+
+    @Override
+    public List<TableManaged> getManagedTables() {
+        return Arrays.asList(
+                upsertOutputTable, appendOutputTable, batchOutputTable, 
userOutputTable);
+    }
+
+    @AfterEach
+    void afterEach() {
+        TestValuesTableFactory.clearAllData();
+    }
+
+    protected List<Row> testUserData() {
+        return Arrays.asList(
+                Row.of(
+                        "user1",
+                        "Tom",
+                        "[email protected]",
+                        new BigDecimal("8.10"),
+                        new BigDecimal("16.20")),
+                Row.of(
+                        "user3",
+                        "Bailey",
+                        "[email protected]",
+                        new BigDecimal("9.99"),
+                        new BigDecimal("19.98")),
+                Row.of(
+                        "user4",
+                        "Tina",
+                        "[email protected]",
+                        new BigDecimal("11.30"),
+                        new BigDecimal("22.60")));
+    }
+
+    protected List<Row> testData() {
+        return Arrays.asList(
+                Row.of(1, 1L, "Hi", Timestamp.valueOf("1970-01-01 
00:00:00.001")),
+                Row.of(2, 2L, "Hello", Timestamp.valueOf("1970-01-01 
00:00:00.002")),
+                Row.of(3, 2L, "Hello world", Timestamp.valueOf("1970-01-01 
00:00:00.003")),
+                Row.of(
+                        4,
+                        3L,
+                        "Hello world, how are you?",
+                        Timestamp.valueOf("1970-01-01 00:00:00.004")),
+                Row.of(5, 3L, "I am fine.", Timestamp.valueOf("1970-01-01 
00:00:00.005")),
+                Row.of(6, 3L, "Luke Skywalker", Timestamp.valueOf("1970-01-01 
00:00:00.006")),
+                Row.of(7, 4L, "Comment#1", Timestamp.valueOf("1970-01-01 
00:00:00.007")),
+                Row.of(8, 4L, "Comment#2", Timestamp.valueOf("1970-01-01 
00:00:00.008")),
+                Row.of(9, 4L, "Comment#3", Timestamp.valueOf("1970-01-01 
00:00:00.009")),
+                Row.of(10, 4L, "Comment#4", Timestamp.valueOf("1970-01-01 
00:00:00.010")),
+                Row.of(11, 5L, "Comment#5", Timestamp.valueOf("1970-01-01 
00:00:00.011")),
+                Row.of(12, 5L, "Comment#6", Timestamp.valueOf("1970-01-01 
00:00:00.012")),
+                Row.of(13, 5L, "Comment#7", Timestamp.valueOf("1970-01-01 
00:00:00.013")),
+                Row.of(14, 5L, "Comment#8", Timestamp.valueOf("1970-01-01 
00:00:00.014")),
+                Row.of(15, 5L, "Comment#9", Timestamp.valueOf("1970-01-01 
00:00:00.015")),
+                Row.of(16, 6L, "Comment#10", Timestamp.valueOf("1970-01-01 
00:00:00.016")),
+                Row.of(17, 6L, "Comment#11", Timestamp.valueOf("1970-01-01 
00:00:00.017")),
+                Row.of(18, 6L, "Comment#12", Timestamp.valueOf("1970-01-01 
00:00:00.018")),
+                Row.of(19, 6L, "Comment#13", Timestamp.valueOf("1970-01-01 
00:00:00.019")),
+                Row.of(20, 6L, "Comment#14", Timestamp.valueOf("1970-01-01 
00:00:00.020")),
+                Row.of(21, 6L, "Comment#15", Timestamp.valueOf("1970-01-01 
00:00:00.021")));
+    }
+
+    protected Map<Integer, Row> testDataMap() {
+        return testData().stream()
+                .collect(Collectors.toMap(r -> r.getFieldAs(0), 
Function.identity()));
+    }
+
+    private void createTestDataTempView(StreamTableEnvironment tEnv, String 
viewName) {
+        Table table = tEnv.fromValues(testData()).as("id", "num", "text", 
"ts");
+
+        tEnv.createTemporaryView(viewName, table);
+    }
+
+    @Test
+    protected void testUpsert() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        String viewName = "testData";
+        createTestDataTempView(tEnv, viewName);
+
+        String tableName = "upsertSink";
+        tEnv.executeSql(
+                upsertOutputTable.getCreateQueryForFlink(
+                        getMetadata(),
+                        tableName,
+                        Arrays.asList(
+                                "'sink.buffer-flush.max-rows' = '2'",
+                                "'sink.buffer-flush.interval' = '0'",
+                                "'sink.max-retries' = '0'")));
+
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO %s "
+                                        + " SELECT cnt, COUNT(len) AS lencnt, 
cTag, MAX(ts) AS ts "
+                                        + " FROM ( "
+                                        + "  SELECT len, COUNT(id) as cnt, 
cTag, MAX(ts) AS ts "
+                                        + "  FROM (SELECT id, 
CHAR_LENGTH(text) AS len, (CASE WHEN id > 0 THEN 1 ELSE 0 END) cTag, ts FROM 
%s) "
+                                        + "  GROUP BY len, cTag "
+                                        + " ) "
+                                        + " GROUP BY cnt, cTag",
+                                tableName, viewName))
+                .await();
+
+        Map<Integer, Row> mapTestData = testDataMap();
+        assertThat(upsertOutputTable.selectAllTable(getMetadata()))
+                .containsExactlyInAnyOrder(
+                        Row.of(1L, 5L, 1, mapTestData.get(6).getField(3)),
+                        Row.of(7L, 1L, 1, mapTestData.get(21).getField(3)),
+                        Row.of(9L, 1L, 1, mapTestData.get(15).getField(3)));
+    }
+
+    @Test
+    void testAppend() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
+        env.getConfig().setParallelism(1);
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        String viewName = "testData";
+        createTestDataTempView(tEnv, viewName);
+
+        String tableName = "appendSink";
+        
tEnv.executeSql(appendOutputTable.getCreateQueryForFlink(getMetadata(), 
tableName));
+
+        Set<Integer> searchIds = new HashSet<>(Arrays.asList(2, 10, 20));
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO %s SELECT id, num, ts FROM %s 
WHERE id IN (%s)",
+                                tableName,
+                                viewName,
+                                searchIds.stream()
+                                        .map(Object::toString)
+                                        .collect(Collectors.joining(","))))
+                .await();
+
+        List<Row> tableRows = appendOutputTable.selectAllTable(getMetadata());
+        assertThat(tableRows.size()).isEqualTo(3);
+
+        Map<Integer, Row> mapTestData = testDataMap();
+        assertThat(tableRows)
+                .containsExactlyInAnyOrderElementsOf(
+                        searchIds.stream()
+                                .map(mapTestData::get)
+                                .map(d -> Row.of(d.getField(0), d.getField(1), 
d.getField(3)))
+                                .collect(Collectors.toList()));
+    }
+
+    @Test
+    void testBatchSink() throws Exception {
+        TableEnvironment tEnv = 
TableEnvironment.create(EnvironmentSettings.inBatchMode());
+
+        String tableName = "batchSink";
+        tEnv.executeSql(
+                batchOutputTable.getCreateQueryForFlink(
+                        getMetadata(),
+                        tableName,
+                        Arrays.asList(
+                                "'sink.buffer-flush.max-rows' = '2'",
+                                "'sink.buffer-flush.interval' = '300ms'",
+                                "'sink.max-retries' = '4'")));
+
+        TableResult tableResult =
+                tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO %s "
+                                        + " SELECT user_name, score "
+                                        + " FROM (VALUES (1, 'Bob'), (22, 
'Tom'), (42, 'Kim'), (42, 'Kim'), (1, 'Bob')) "
+                                        + " AS UserCountTable(score, 
user_name) ",
+                                tableName));
+        tableResult.await();
+
+        assertThat(batchOutputTable.selectAllTable(getMetadata()))
+                .containsExactlyInAnyOrder(
+                        Row.of("Bob", 1L),
+                        Row.of("Tom", 22L),
+                        Row.of("Kim", 42L),
+                        Row.of("Kim", 42L),
+                        Row.of("Bob", 1L));
+    }
+
+    @Test
+    protected void testReadingFromChangelogSource() throws Exception {
+        TableEnvironment tEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().build());
+        String dataId = 
TestValuesTableFactory.registerData(TestData.userChangelog());
+
+        String userTableLogs = "user_logs";
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE %s ( "
+                                + "  user_id STRING, "
+                                + "  user_name STRING, "
+                                + "  email STRING, "
+                                + "  balance DECIMAL(18,2), "
+                                + "  balance2 AS balance * 2 "
+                                + ") WITH ( "
+                                + " 'connector' = 'values', "
+                                + " 'data-id' = '%s', "
+                                + " 'changelog-mode' = 'I,UA,UB,D' "
+                                + ")",
+                        userTableLogs, dataId));
+
+        String userTableSink = "user_sink";
+        tEnv.executeSql(
+                userOutputTable.getCreateQueryForFlink(
+                        getMetadata(),
+                        userTableSink,
+                        Arrays.asList(
+                                "'sink.buffer-flush.max-rows' = '2'",
+                                "'sink.buffer-flush.interval' = '0'")));
+
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO %s SELECT * FROM %s", 
userTableSink, userTableLogs))
+                .await();
+
+        assertThat(userOutputTable.selectAllTable(getMetadata()))
+                .containsExactlyInAnyOrderElementsOf(testUserData());
+    }
+}
diff --git 
a/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DynamicTableSourceTest.java
 
b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DynamicTableSourceTest.java
new file mode 100644
index 00000000..abf92aba
--- /dev/null
+++ 
b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DynamicTableSourceTest.java
@@ -0,0 +1,605 @@
+package org.apache.flink.connector.jdbc.backward.compatibility;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.databases.postgres.PostgresTestBase;
+import org.apache.flink.connector.jdbc.testutils.TableManaged;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.StreamTestSink;
+import 
org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager;
+import org.apache.flink.table.test.lookup.cache.LookupCacheAssert;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.TemporalUnit;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType;
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
+import static 
org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.tableRow;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for backward compatibility. */
+public class DynamicTableSourceTest implements PostgresTestBase {
+
+    @RegisterExtension
+    static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setConfiguration(new Configuration())
+                            .build());
+
+    public static final String CREATE_TABLE_WITH_NAME_STATEMENT =
+            "CREATE TABLE value_source ( "
+                    + " `id` BIGINT, "
+                    + " `name` STRING, "
+                    + " `proctime` AS PROCTIME()"
+                    + ") WITH ("
+                    + " 'connector' = 'values', "
+                    + " 'data-id' = '%s'"
+                    + ")";
+
+    public static final String CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT =
+            "CREATE TABLE value_source ( "
+                    + " `id` BIGINT, "
+                    + " `name` STRING, "
+                    + " `nickname` STRING, "
+                    + " `proctime` AS PROCTIME()"
+                    + ") WITH ("
+                    + " 'connector' = 'values', "
+                    + " 'data-id' = '%s'"
+                    + ")";
+
+    private final TableRow inputTable = createInputTable();
+
+    public static StreamExecutionEnvironment env;
+    public static TableEnvironment tEnv;
+
+    protected TableRow createInputTable() {
+        return tableRow(
+                "jdbDynamicTableSource",
+                field("id", DataTypes.BIGINT().notNull()),
+                field("decimal_col", DataTypes.DECIMAL(10, 4)),
+                field("timestamp6_col", DataTypes.TIMESTAMP(6)),
+                // other fields
+                field("real_col", dbType("REAL"), DataTypes.FLOAT()),
+                field("double_col", dbType("DOUBLE PRECISION"), 
DataTypes.DOUBLE()),
+                field("time_col", dbType("TIME"), DataTypes.TIME()));
+    }
+
+    @Override
+    public List<TableManaged> getManagedTables() {
+        return Collections.singletonList(inputTable);
+    }
+
+    protected List<Row> getTestData() {
+        return Arrays.asList(
+                Row.of(
+                        1L,
+                        BigDecimal.valueOf(100.1234),
+                        LocalDateTime.parse("2020-01-01T15:35:00.123456"),
+                        1.175E-37F,
+                        1.79769E308D,
+                        LocalTime.parse("15:35")),
+                Row.of(
+                        2L,
+                        BigDecimal.valueOf(101.1234),
+                        LocalDateTime.parse("2020-01-01T15:36:01.123456"),
+                        -1.175E-37F,
+                        -1.79769E308,
+                        LocalTime.parse("15:36:01")));
+    }
+
+    @BeforeEach
+    void beforeEach() throws SQLException {
+        try (Connection conn = getMetadata().getConnection()) {
+            inputTable.insertIntoTableValues(conn, getTestData());
+        }
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+    }
+
+    @AfterEach
+    void afterEach() {
+        StreamTestSink.clear();
+    }
+
+    @Test
+    void testJdbcSource() {
+        String testTable = "testTable";
+        tEnv.executeSql(inputTable.getCreateQueryForFlink(getMetadata(), 
testTable));
+
+        List<Row> collected = executeQuery("SELECT * FROM " + testTable);
+
+        
assertThat(collected).containsExactlyInAnyOrderElementsOf(getTestData());
+    }
+
+    @Test
+    void testProject() {
+        String testTable = "testTable";
+        tEnv.executeSql(
+                inputTable.getCreateQueryForFlink(
+                        getMetadata(),
+                        testTable,
+                        Arrays.asList(
+                                "'scan.partition.column'='id'",
+                                "'scan.partition.num'='2'",
+                                "'scan.partition.lower-bound'='0'",
+                                "'scan.partition.upper-bound'='100'")));
+
+        String fields = String.join(",", 
Arrays.copyOfRange(inputTable.getTableFields(), 0, 3));
+        List<Row> collected = executeQuery(String.format("SELECT %s FROM %s", 
fields, testTable));
+
+        List<Row> expected =
+                getTestData().stream()
+                        .map(row -> Row.of(row.getField(0), row.getField(1), 
row.getField(2)))
+                        .collect(Collectors.toList());
+
+        assertThat(collected).containsExactlyInAnyOrderElementsOf(expected);
+    }
+
+    @Test
+    public void testLimit() {
+        String testTable = "testTable";
+        tEnv.executeSql(
+                inputTable.getCreateQueryForFlink(
+                        getMetadata(),
+                        testTable,
+                        Arrays.asList(
+                                "'scan.partition.column'='id'",
+                                "'scan.partition.num'='2'",
+                                "'scan.partition.lower-bound'='1'",
+                                "'scan.partition.upper-bound'='2'")));
+
+        List<Row> collected = executeQuery("SELECT * FROM " + testTable + " 
LIMIT 1");
+
+        assertThat(collected).hasSize(1);
+        assertThat(getTestData())
+                .as("The actual output is not a subset of the expected set.")
+                .containsAll(collected);
+    }
+
+    @Test
+    public void testFilter() {
+        String testTable = "testTable";
+        tEnv.executeSql(inputTable.getCreateQueryForFlink(getMetadata(), 
testTable));
+
+        // create a partitioned table to ensure no regression
+        String partitionedTable = "PARTITIONED_TABLE";
+        tEnv.executeSql(
+                inputTable.getCreateQueryForFlink(
+                        getMetadata(),
+                        partitionedTable,
+                        Arrays.asList(
+                                "'scan.partition.column'='id'",
+                                "'scan.partition.num'='1'",
+                                "'scan.partition.lower-bound'='1'",
+                                "'scan.partition.upper-bound'='1'")));
+
+        // we create a VIEW here to test column remapping, ie. would filter 
push down work if we
+        // create a view that depends on our source table
+        tEnv.executeSql(
+                String.format(
+                        "CREATE VIEW FAKE_TABLE (idx, %s) as (SELECT * from %s 
)",
+                        Arrays.stream(inputTable.getTableFields())
+                                .filter(f -> !f.equals("id"))
+                                .collect(Collectors.joining(",")),
+                        testTable));
+
+        Row onlyRow1 =
+                getTestData().stream()
+                        .filter(row -> row.getFieldAs(0).equals(1L))
+                        .findAny()
+                        .orElseThrow(NullPointerException::new);
+
+        Row onlyRow2 =
+                getTestData().stream()
+                        .filter(row -> row.getFieldAs(0).equals(2L))
+                        .findAny()
+                        .orElseThrow(NullPointerException::new);
+
+        List<Row> twoRows = getTestData();
+
+        // test simple filter
+        assertThat(executeQuery("SELECT * FROM FAKE_TABLE WHERE idx = 1"))
+                .containsExactly(onlyRow1);
+
+        // test TIMESTAMP filter
+        assertThat(
+                        executeQuery(
+                                "SELECT * FROM FAKE_TABLE "
+                                        + "WHERE timestamp6_col > TIMESTAMP 
'2020-01-01 15:35:00'"
+                                        + "  AND timestamp6_col < TIMESTAMP 
'2020-01-01 15:35:01'"))
+                .containsExactly(onlyRow1);
+
+        // test the IN operator
+        assertThat(
+                        executeQuery(
+                                "SELECT * FROM FAKE_TABLE WHERE 1 = idx AND 
decimal_col IN (100.1234, 101.1234)"))
+                .containsExactly(onlyRow1);
+
+        // test mixing AND and OR operator
+        assertThat(
+                        executeQuery(
+                                "SELECT * FROM FAKE_TABLE WHERE idx = 1 AND 
decimal_col = 100.1234 OR decimal_col = 101.1234"))
+                .containsExactlyInAnyOrderElementsOf(twoRows);
+
+        // test mixing AND/OR with parenthesis, and the swapping the operand 
of equal expression
+        assertThat(
+                        executeQuery(
+                                "SELECT * FROM FAKE_TABLE WHERE (2 = idx AND 
decimal_col = 100.1234) OR decimal_col = 101.1234"))
+                .containsExactly(onlyRow2);
+
+        // test Greater than, just to make sure we didnt break anything that 
we cannot pushdown
+        assertThat(
+                        executeQuery(
+                                "SELECT * FROM FAKE_TABLE WHERE idx = 2 AND 
decimal_col > 100 OR decimal_col = 101.123"))
+                .containsExactly(onlyRow2);
+
+        // One more test of parenthesis
+        assertThat(
+                        executeQuery(
+                                "SELECT * FROM FAKE_TABLE WHERE 2 = idx AND 
(decimal_col = 100.1234 OR decimal_col = 102.1234)"))
+                .isEmpty();
+
+        assertThat(
+                        executeQuery(
+                                "SELECT * FROM "
+                                        + partitionedTable
+                                        + " WHERE id = 2 AND decimal_col > 100 
OR decimal_col = 101.123"))
+                .isEmpty();
+
+        assertThat(
+                        executeQuery(
+                                "SELECT * FROM "
+                                        + partitionedTable
+                                        + " WHERE 1 = id AND decimal_col IN 
(100.1234, 101.1234)"))
+                .containsExactly(onlyRow1);
+    }
+
+    @ParameterizedTest
+    @EnumSource(Caching.class)
+    void testLookupJoin(Caching caching) {
+
+        String selectStatement =
+                "SELECT S.id, S.name, D.id, D.timestamp6_col, D.decimal_col 
FROM value_source"
+                        + " AS S JOIN jdbc_lookup for system_time as of 
S.proctime AS D ON S.id = D.id";
+        List<Row> expectedResultSetRows =
+                Arrays.asList(
+                        Row.of(
+                                1L,
+                                "Alice",
+                                1L,
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")),
+                                BigDecimal.valueOf(100.1234)),
+                        Row.of(
+                                1L,
+                                "Alice",
+                                1L,
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")),
+                                BigDecimal.valueOf(100.1234)),
+                        Row.of(
+                                2L,
+                                "Bob",
+                                2L,
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")),
+                                BigDecimal.valueOf(101.1234)));
+
+        RowData key1 = GenericRowData.of(1L);
+        RowData value1 =
+                GenericRowData.of(
+                        1L,
+                        
DecimalData.fromBigDecimal(BigDecimal.valueOf(100.1234), 10, 4),
+                        TimestampData.fromLocalDateTime(
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456"))));
+
+        RowData key2 = GenericRowData.of(2L);
+        RowData value2 =
+                GenericRowData.of(
+                        2L,
+                        
DecimalData.fromBigDecimal(BigDecimal.valueOf(101.1234), 10, 4),
+                        TimestampData.fromLocalDateTime(
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456"))));
+
+        RowData key3 = GenericRowData.of(3L);
+
+        Map<RowData, Collection<RowData>> expectedCachedEntries = new 
HashMap<>();
+        expectedCachedEntries.put(key1, Collections.singletonList(value1));
+        expectedCachedEntries.put(key2, Collections.singletonList(value2));
+        expectedCachedEntries.put(key3, Collections.emptyList());
+
+        lookupTableTest(
+                caching,
+                sampleTableData(),
+                CREATE_TABLE_WITH_NAME_STATEMENT,
+                selectStatement,
+                expectedResultSetRows,
+                expectedCachedEntries);
+    }
+
+    private void lookupTableTest(
+            Caching caching,
+            Collection<Row> dataToRegister,
+            String createTableStatement,
+            String selectStatement,
+            List<Row> expectedResultSetRows,
+            Map<RowData, Collection<RowData>> expectedCachedEntries) {
+        // Create JDBC lookup table
+        List<String> cachingOptions = Collections.emptyList();
+        if (caching == Caching.ENABLE_CACHE) {
+            cachingOptions =
+                    Arrays.asList(
+                            "'lookup.cache.max-rows' = '100'", 
"'lookup.cache.ttl' = '10min'");
+        }
+        tEnv.executeSql(
+                inputTable.getCreateQueryForFlink(getMetadata(), 
"jdbc_lookup", cachingOptions));
+
+        // Create and prepare a value source
+        String dataId = TestValuesTableFactory.registerData(dataToRegister);
+        tEnv.executeSql(String.format(createTableStatement, dataId));
+
+        if (caching == Caching.ENABLE_CACHE) {
+            LookupCacheManager.keepCacheOnRelease(true);
+        }
+
+        // Execute lookup join
+        try {
+            List<Row> collected = executeQuery(selectStatement);
+            int expectedSize = expectedResultSetRows.size();
+
+            // check we go the expected number of rows
+            assertThat(collected)
+                    .as("Actual output is not size " + expectedSize)
+                    .hasSize(expectedSize)
+                    .as("The actual output is not a subset of the expected 
set")
+                    .containsAll(expectedResultSetRows);
+
+            if (caching == Caching.ENABLE_CACHE) {
+                validateCachedValues(expectedCachedEntries);
+            }
+        } finally {
+            if (caching == Caching.ENABLE_CACHE) {
+                LookupCacheManager.getInstance().checkAllReleased();
+                LookupCacheManager.getInstance().clear();
+                LookupCacheManager.keepCacheOnRelease(false);
+            }
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(Caching.class)
+    void testLookupJoinWithFilter(Caching caching) {
+        List<Row> expectedResultSetRows =
+                Arrays.asList(
+                        Row.of(
+                                2L,
+                                "Bob",
+                                2L,
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")),
+                                BigDecimal.valueOf(101.1234)));
+
+        RowData key2 = GenericRowData.of(2L);
+        RowData value2 =
+                GenericRowData.of(
+                        2L,
+                        
DecimalData.fromBigDecimal(BigDecimal.valueOf(101.1234), 10, 4),
+                        TimestampData.fromLocalDateTime(
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456"))));
+
+        Map<RowData, Collection<RowData>> expectedCachedEntries = new 
HashMap<>();
+        expectedCachedEntries.put(key2, Collections.singletonList(value2));
+
+        lookupTableTest(
+                caching,
+                sampleTableData(),
+                CREATE_TABLE_WITH_NAME_STATEMENT,
+                "SELECT S.id, S.name, D.id, D.timestamp6_col, D.decimal_col 
FROM value_source"
+                        + " AS S JOIN jdbc_lookup for system_time as of 
S.proctime AS D ON "
+                        + "S.id = D.id AND S.name = \'Bob\'",
+                expectedResultSetRows,
+                expectedCachedEntries);
+    }
+
+    private static List<Row> sampleTableData() {
+        return Arrays.asList(
+                Row.of(1L, "Alice"), Row.of(1L, "Alice"), Row.of(2L, "Bob"), 
Row.of(3L, "Charlie"));
+    }
+
+    private static List<Row> sampleTableDataWithNickNames() {
+        return Arrays.asList(
+                Row.of(1L, "Alice", "ABC"),
+                Row.of(1L, "Alice", "ADD"),
+                Row.of(2L, "Bob", "BGH"),
+                Row.of(3L, "Charlie", "CHJ"));
+    }
+
+    @ParameterizedTest
+    @EnumSource(Caching.class)
+    void testLookupJoinWithMultipleFilters(Caching caching) {
+
+        List<Row> expectedResultSetRows =
+                Arrays.asList(
+                        Row.of(
+                                1L,
+                                "Alice",
+                                "ADD",
+                                1L,
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")),
+                                BigDecimal.valueOf(100.1234)));
+
+        RowData key1 = GenericRowData.of(1L);
+        RowData value1 =
+                GenericRowData.of(
+                        1L,
+                        
DecimalData.fromBigDecimal(BigDecimal.valueOf(100.1234), 10, 4),
+                        TimestampData.fromLocalDateTime(
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456"))));
+
+        Map<RowData, Collection<RowData>> expectedCachedEntries = new 
HashMap<>();
+        expectedCachedEntries.put(key1, Collections.singletonList(value1));
+
+        lookupTableTest(
+                caching,
+                sampleTableDataWithNickNames(),
+                CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT,
+                "SELECT S.id, S.name, S.nickname, D.id, D.timestamp6_col, 
D.decimal_col FROM value_source"
+                        + " AS S JOIN jdbc_lookup for system_time as of 
S.proctime AS D ON "
+                        + "S.id = D.id AND S.name = 'Alice' AND S.nickname = 
'ADD'",
+                expectedResultSetRows,
+                expectedCachedEntries);
+    }
+
+    @ParameterizedTest
+    @EnumSource(Caching.class)
+    void testLookupJoinWithLikeFilter(Caching caching) {
+
+        List<Row> expectedResultSetRows =
+                Arrays.asList(
+                        Row.of(
+                                1L,
+                                "Alice",
+                                "ABC",
+                                1L,
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")),
+                                BigDecimal.valueOf(100.1234)));
+
+        RowData key1 = GenericRowData.of(1L);
+        RowData value1 =
+                GenericRowData.of(
+                        1L,
+                        
DecimalData.fromBigDecimal(BigDecimal.valueOf(100.1234), 10, 4),
+                        TimestampData.fromLocalDateTime(
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456"))));
+
+        Map<RowData, Collection<RowData>> expectedCachedEntries = new 
HashMap<>();
+        expectedCachedEntries.put(key1, Collections.singletonList(value1));
+
+        lookupTableTest(
+                caching,
+                Arrays.asList(
+                        Row.of(1L, "Alice", "ABC"),
+                        Row.of(1L, "Alice", "ADD"),
+                        Row.of(2L, "Bob", "BGH"),
+                        Row.of(3L, "Charlie", "CHJ")),
+                CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT,
+                "SELECT S.id, S.name, S.nickname, D.id, D.timestamp6_col, 
D.decimal_col FROM value_source"
+                        + " AS S JOIN jdbc_lookup for system_time as of 
S.proctime AS D ON "
+                        + "S.id = D.id AND S.name LIKE 'Al%' AND S.nickname = 
'ABC' ",
+                expectedResultSetRows,
+                expectedCachedEntries);
+    }
+
+    @ParameterizedTest
+    @EnumSource(Caching.class)
+    void testLookupJoinWithORFilter(Caching caching) {
+
+        List<Row> expectedResultSetRows =
+                Arrays.asList(
+                        Row.of(
+                                1L,
+                                "Alice",
+                                "ABC",
+                                1L,
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")),
+                                BigDecimal.valueOf(100.1234)),
+                        Row.of(
+                                2L,
+                                "Bob",
+                                "BGH",
+                                2L,
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")),
+                                BigDecimal.valueOf(101.1234)));
+
+        RowData key1 = GenericRowData.of(1L);
+        RowData value1 =
+                GenericRowData.of(
+                        1L,
+                        
DecimalData.fromBigDecimal(BigDecimal.valueOf(100.1234), 10, 4),
+                        TimestampData.fromLocalDateTime(
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456"))));
+
+        RowData key2 = GenericRowData.of(2L);
+        RowData value2 =
+                GenericRowData.of(
+                        2L,
+                        
DecimalData.fromBigDecimal(BigDecimal.valueOf(101.1234), 10, 4),
+                        TimestampData.fromLocalDateTime(
+                                
truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456"))));
+
+        Map<RowData, Collection<RowData>> expectedCachedEntries = new 
HashMap<>();
+        expectedCachedEntries.put(key1, Collections.singletonList(value1));
+        expectedCachedEntries.put(key2, Collections.singletonList(value2));
+
+        lookupTableTest(
+                caching,
+                Arrays.asList(
+                        Row.of(1L, "Alice", "ABC"),
+                        Row.of(1L, "Alice", "ADD"),
+                        Row.of(2L, "Bob", "BGH"),
+                        Row.of(3L, "Charlie", "CHJ")),
+                CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT,
+                "SELECT S.id, S.name, S.nickname, D.id, D.timestamp6_col, 
D.decimal_col FROM value_source"
+                        + " AS S JOIN jdbc_lookup for system_time as of 
S.proctime AS D ON "
+                        + "S.id = D.id AND (S.name = \'Bob\' OR S.nickname = 
\'ABC\')",
+                expectedResultSetRows,
+                expectedCachedEntries);
+    }
+
+    protected TemporalUnit timestampPrecision() {
+        return ChronoUnit.MICROS;
+    }
+
+    private LocalDateTime truncateTime(LocalDateTime value) {
+        return value.truncatedTo(timestampPrecision());
+    }
+
+    private List<Row> executeQuery(String query) {
+        return CollectionUtil.iteratorToList(tEnv.executeSql(query).collect());
+    }
+
+    private void validateCachedValues(Map<RowData, Collection<RowData>> 
expectedCachedEntries) {
+        // Validate cache
+        Map<String, LookupCacheManager.RefCountedCache> managedCaches =
+                LookupCacheManager.getInstance().getManagedCaches();
+        assertThat(managedCaches).as("There should be only 1 shared cache 
registered").hasSize(1);
+        LookupCache cache = 
managedCaches.get(managedCaches.keySet().iterator().next()).getCache();
+        // jdbc does support project push down, the cached row has been 
projected
+        
LookupCacheAssert.assertThat(cache).containsExactlyEntriesOf(expectedCachedEntries);
+    }
+
+    private enum Caching {
+        ENABLE_CACHE,
+        DISABLE_CACHE
+    }
+}
diff --git 
a/flink-connector-jdbc-backward-compatibility/src/test/resources/log4j2-test.properties
 
b/flink-connector-jdbc-backward-compatibility/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000..5a383778
--- /dev/null
+++ 
b/flink-connector-jdbc-backward-compatibility/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = WARN
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n

Reply via email to