This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-jdbc.git
commit 26655e5e8864511d20208ebf6d8a2d0314a4c4b2 Author: Joao Boto <[email protected]> AuthorDate: Fri May 24 17:22:07 2024 +0200 [FLINK-35370] Add temp module to test backward compatibility --- .github/workflows/backwards_compatibility.yml | 103 ++++ .../pom.xml | 86 +++ .../backward/compatibility/DataStreamSinkTest.java | 125 +++++ .../compatibility/DataStreamSourceTest.java | 199 +++++++ .../compatibility/DynamicTableSinkTest.java | 327 +++++++++++ .../compatibility/DynamicTableSourceTest.java | 605 +++++++++++++++++++++ .../src/test/resources/log4j2-test.properties | 28 + 7 files changed, 1473 insertions(+) diff --git a/.github/workflows/backwards_compatibility.yml b/.github/workflows/backwards_compatibility.yml new file mode 100644 index 00000000..139b3d9e --- /dev/null +++ b/.github/workflows/backwards_compatibility.yml @@ -0,0 +1,103 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# We need to specify repo related information here since Apache INFRA doesn't differentiate +# between several workflows with the same names while preparing a report for GHA usage +# https://infra-reports.apache.org/#ghactions +name: Flink Connector JDBC CI Backwards Compatibility +on: [push, pull_request] +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true +jobs: + backwards_compatibility_test: + runs-on: ubuntu-latest + strategy: + matrix: + flink: [1.18-SNAPSHOT, 1.19-SNAPSHOT] + jdk: [8, 11, 17] + + env: + MVN_CONNECTION_OPTIONS: -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 + FLINK_CACHE_DIR: "/tmp/cache/flink" + MVN_BUILD_OUTPUT_FILE: "/tmp/mvn_build_output.out" + MVN_VALIDATION_DIR: "/tmp/flink-validation-deployment" + + steps: + - name: Check out repository code + uses: actions/checkout@v4 + + - name: Set JDK + uses: actions/setup-java@v4 + with: + java-version: ${{ matrix.jdk }} + distribution: 'temurin' + cache: 'maven' + + - name: Set Maven 3.8.6 + uses: stCarolas/setup-maven@v5 + with: + maven-version: 3.8.6 + + - name: "Determine Flink binary url" + run: | + binary_url="" + if [[ "${{ matrix.flink }}" = *-SNAPSHOT ]]; then + binary_url=https://s3.amazonaws.com/flink-nightly/flink-${{ matrix.flink }}-bin-scala_2.12.tgz + cache_binary=false + else + binary_url=https://archive.apache.org/dist/flink/flink-${{ matrix.flink }}/flink-${{ matrix.flink }}-bin-scala_2.12.tgz + cache_binary=true + fi + echo "binary_url=$binary_url" >> ${GITHUB_ENV} + + - name: "Print Flink binary url / caching" + run: echo "${{ env.binary_url }} / caching=${{ env.cache_binary }}" + + - name: Create cache dirs + run: mkdir -p ${{ env.FLINK_CACHE_DIR }} + + - name: Download Flink binary + working-directory: ${{ env.FLINK_CACHE_DIR }} + run: wget -q -c ${{ env.binary_url }} -O - | tar -xz + + - name: Install JDBC Libs + run: | + set -o pipefail + + mvn clean install -U -B --no-transfer-progress -Dflink.version=${{ matrix.flink }} \ + -DskipTests \ + -DaltDeploymentRepository=validation_repository::default::file:${{ env.MVN_VALIDATION_DIR }} \ + -Dscala-2.12 \ + -DdistDir=${{ env.FLINK_CACHE_DIR }}/flink-${{ matrix.flink }} \ + ${{ env.MVN_CONNECTION_OPTIONS }} \ + -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties \ + | tee ${{ env.MVN_BUILD_OUTPUT_FILE }} + + - name: Test backwards compatibility + working-directory: ./flink-connector-jdbc-backward-compatibility + run: | + set -o pipefail + + mvn clean test -U -B --no-transfer-progress -Dflink.version=${{ matrix.flink }} \ + -DaltDeploymentRepository=validation_repository::default::file:${{ env.MVN_VALIDATION_DIR }} \ + -Dscala-2.12 \ + -DdistDir=${{ env.FLINK_CACHE_DIR }}/flink-${{ matrix.flink }} \ + ${{ env.MVN_CONNECTION_OPTIONS }} \ + -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties \ + | tee ${{ env.MVN_BUILD_OUTPUT_FILE }} \ No newline at end of file diff --git a/flink-connector-jdbc-backward-compatibility/pom.xml b/flink-connector-jdbc-backward-compatibility/pom.xml new file mode 100644 index 00000000..ac8c9874 --- /dev/null +++ b/flink-connector-jdbc-backward-compatibility/pom.xml @@ -0,0 +1,86 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-jdbc-parent</artifactId> + <version>3.2-SNAPSHOT</version> + </parent> + + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-jdbc-backward-compatibility</artifactId> + <version>3.2-SNAPSHOT</version> + <name>Flink : Connectors : JDBC : Backward Compatibility</name> + <packaging>jar</packaging> + + <properties> + <surefire.module.config> + --add-opens=java.base/java.util=ALL-UNNAMED + --add-opens=java.base/java.lang=ALL-UNNAMED + </surefire.module.config> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_2.12</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_2.12</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${flink.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-jdbc</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-jdbc</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>postgresql</artifactId> + <version>${testcontainers.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + <version>42.7.3</version> + <scope>test</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file diff --git a/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSinkTest.java b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSinkTest.java new file mode 100644 index 00000000..30f97812 --- /dev/null +++ b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSinkTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.backward.compatibility; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.connector.jdbc.JdbcConnectionOptions; +import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions; +import org.apache.flink.connector.jdbc.JdbcExecutionOptions; +import org.apache.flink.connector.jdbc.JdbcSink; +import org.apache.flink.connector.jdbc.databases.postgres.PostgresTestBase; +import org.apache.flink.connector.jdbc.testutils.TableManaged; +import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.junit5.MiniClusterExtension; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for backward compatibility. */ +public class DataStreamSinkTest implements PostgresTestBase { + + private static final BooksTable TEST_TABLE = new BooksTable("SinkTable"); + + private static final List<BooksTable.BookEntry> BOOKS = + Arrays.stream(TEST_DATA) + .map( + book -> + new BooksTable.BookEntry( + book.id, book.title, book.author, book.price, book.qty)) + .collect(Collectors.toList()); + + @Override + public List<TableManaged> getManagedTables() { + return Collections.singletonList(TEST_TABLE); + } + + @RegisterExtension + static final MiniClusterExtension MINI_CLUSTER = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(2) + .build()); + + @Test + public void testAtLeastOnce() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + env.setParallelism(1); + + assertResult(new ArrayList<>()); + + env.fromCollection(BOOKS) + .sinkTo( + JdbcSink.<BooksTable.BookEntry>builder() + .withQueryStatement( + TEST_TABLE.getInsertIntoQuery(), + TEST_TABLE.getStatementBuilder()) + .buildAtLeastOnce( + new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() + .withUrl(getMetadata().getJdbcUrl()) + .withUsername(getMetadata().getUsername()) + .withPassword(getMetadata().getPassword()) + .withDriverName(getMetadata().getDriverClass()) + .build())); + env.execute(); + + assertResult(BOOKS); + } + + @Test + public void testExactlyOnce() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + env.setParallelism(1); + + assertResult(new ArrayList<>()); + + env.fromCollection(BOOKS) + .sinkTo( + JdbcSink.<BooksTable.BookEntry>builder() + .withQueryStatement( + TEST_TABLE.getInsertIntoQuery(), + TEST_TABLE.getStatementBuilder()) + .withExecutionOptions( + JdbcExecutionOptions.builder().withMaxRetries(0).build()) + .buildExactlyOnce( + JdbcExactlyOnceOptions.defaults(), + getMetadata().getXaSourceSupplier())); + env.execute(); + + assertResult(BOOKS); + } + + private void assertResult(List<BooksTable.BookEntry> expected) throws SQLException { + assertThat(TEST_TABLE.selectAllTable(getMetadata().getConnection())).isEqualTo(expected); + } +} diff --git a/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSourceTest.java b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSourceTest.java new file mode 100644 index 00000000..3ac60f25 --- /dev/null +++ b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DataStreamSourceTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.backward.compatibility; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.jdbc.JdbcTestFixture; +import org.apache.flink.connector.jdbc.databases.postgres.PostgresTestBase; +import org.apache.flink.connector.jdbc.source.JdbcSource; +import org.apache.flink.connector.jdbc.source.reader.extractor.ResultExtractor; +import org.apache.flink.connector.jdbc.split.JdbcGenericParameterValuesProvider; +import org.apache.flink.connector.jdbc.testutils.TableManaged; +import org.apache.flink.connector.jdbc.testutils.tables.templates.BooksTable; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.test.junit5.MiniClusterExtension; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.io.Serializable; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Types; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedDeque; + +import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for backward compatibility. */ +public class DataStreamSourceTest implements PostgresTestBase { + public static Queue<JdbcTestFixture.TestEntry> collectedRecords; + private static final BooksTable TEST_TABLE = new BooksTable("SourceTable"); + + protected final ResultExtractor<JdbcTestFixture.TestEntry> extractor = + resultSet -> + new JdbcTestFixture.TestEntry( + resultSet.getInt("id"), + resultSet.getString("title"), + resultSet.getString("author"), + // Avoid the 'null -> 0.0d' bug on calling 'getDouble' + (Double) resultSet.getObject("price"), + resultSet.getInt("qty")); + + @Override + public List<TableManaged> getManagedTables() { + return Collections.singletonList(TEST_TABLE); + } + + @RegisterExtension + static final MiniClusterExtension MINI_CLUSTER = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(2) + .build()); + + @BeforeEach + void init() throws SQLException { + collectedRecords = new ConcurrentLinkedDeque<>(); + try (Connection conn = getMetadata().getConnection(); + PreparedStatement ps = conn.prepareStatement(TEST_TABLE.getInsertIntoQuery())) { + for (int i = 0; i < TEST_DATA.length; i++) { + ps.setInt(1, TEST_DATA[i].id); + ps.setString(2, TEST_DATA[i].title); + ps.setString(3, TEST_DATA[i].author); + if (TEST_DATA[i].price == null) { + ps.setNull(4, Types.DOUBLE); + } else { + ps.setDouble(4, TEST_DATA[i].price); + } + ps.setInt(5, TEST_DATA[i].qty); + ps.execute(); + } + } + } + + @Test + void testReadWithoutParallelismWithoutParamsProvider() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + env.setParallelism(1); + JdbcSource<JdbcTestFixture.TestEntry> jdbcSource = + JdbcSource.<JdbcTestFixture.TestEntry>builder() + .setTypeInformation(TypeInformation.of(JdbcTestFixture.TestEntry.class)) + .setSql(TEST_TABLE.getSelectAllQuery()) + .setDBUrl(getMetadata().getJdbcUrl()) + .setUsername(getMetadata().getUsername()) + .setPassword(getMetadata().getPassword()) + .setDriverName(getMetadata().getDriverClass()) + .setResultExtractor(extractor) + .build(); + env.fromSource(jdbcSource, WatermarkStrategy.noWatermarks(), "TestSource") + .addSink(new TestingSinkFunction()); + env.execute(); + assertThat(collectedRecords).containsExactlyInAnyOrder(TEST_DATA); + } + + @Test + void testReadWithoutParallelismWithParamsProvider() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + env.setParallelism(1); + JdbcSource<JdbcTestFixture.TestEntry> jdbcSource = + JdbcSource.<JdbcTestFixture.TestEntry>builder() + .setTypeInformation(TypeInformation.of(JdbcTestFixture.TestEntry.class)) + .setSql(TEST_TABLE.getSelectByIdBetweenQuery()) + .setJdbcParameterValuesProvider( + new JdbcGenericParameterValuesProvider( + new Serializable[][] {{1001, 1005}, {1006, 1010}})) + .setDBUrl(getMetadata().getJdbcUrl()) + .setUsername(getMetadata().getUsername()) + .setPassword(getMetadata().getPassword()) + .setDriverName(getMetadata().getDriverClass()) + .setResultExtractor(extractor) + .build(); + env.fromSource(jdbcSource, WatermarkStrategy.noWatermarks(), "TestSource") + .addSink(new TestingSinkFunction()); + env.execute(); + assertThat(collectedRecords).containsExactlyInAnyOrder(TEST_DATA); + } + + @Test + void testReadWithParallelismWithoutParamsProvider() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + env.setParallelism(2); + JdbcSource<JdbcTestFixture.TestEntry> jdbcSource = + JdbcSource.<JdbcTestFixture.TestEntry>builder() + .setTypeInformation(TypeInformation.of(JdbcTestFixture.TestEntry.class)) + .setSql(TEST_TABLE.getSelectAllQuery()) + .setDBUrl(getMetadata().getJdbcUrl()) + .setUsername(getMetadata().getUsername()) + .setPassword(getMetadata().getPassword()) + .setDriverName(getMetadata().getDriverClass()) + .setResultExtractor(extractor) + .build(); + env.fromSource(jdbcSource, WatermarkStrategy.noWatermarks(), "TestSource") + .addSink(new TestingSinkFunction()); + env.execute(); + assertThat(collectedRecords).containsExactlyInAnyOrder(TEST_DATA); + } + + @Test + void testReadWithParallelismWithParamsProvider() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration()); + env.setParallelism(2); + JdbcSource<JdbcTestFixture.TestEntry> jdbcSource = + JdbcSource.<JdbcTestFixture.TestEntry>builder() + .setTypeInformation(TypeInformation.of(JdbcTestFixture.TestEntry.class)) + .setSql(TEST_TABLE.getSelectByIdBetweenQuery()) + .setJdbcParameterValuesProvider( + new JdbcGenericParameterValuesProvider( + new Serializable[][] {{1001, 1005}, {1006, 1010}})) + .setDBUrl(getMetadata().getJdbcUrl()) + .setUsername(getMetadata().getUsername()) + .setPassword(getMetadata().getPassword()) + .setDriverName(getMetadata().getDriverClass()) + .setResultExtractor(extractor) + .build(); + env.fromSource(jdbcSource, WatermarkStrategy.noWatermarks(), "TestSource") + .addSink(new TestingSinkFunction()); + env.execute(); + assertThat(collectedRecords).containsExactlyInAnyOrder(TEST_DATA); + } + + /** A sink function to collect the records. */ + static class TestingSinkFunction implements SinkFunction<JdbcTestFixture.TestEntry> { + + @Override + public void invoke(JdbcTestFixture.TestEntry value, Context context) throws Exception { + collectedRecords.add(value); + } + } +} diff --git a/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DynamicTableSinkTest.java b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DynamicTableSinkTest.java new file mode 100644 index 00000000..c926e6bf --- /dev/null +++ b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DynamicTableSinkTest.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.backward.compatibility; + +import org.apache.flink.connector.jdbc.databases.postgres.PostgresTestBase; +import org.apache.flink.connector.jdbc.testutils.TableManaged; +import org.apache.flink.connector.jdbc.testutils.tables.TableRow; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.runtime.utils.TestData; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.types.Row; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.pkField; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.tableRow; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for backward compatibility. */ +public class DynamicTableSinkTest implements PostgresTestBase { + + @RegisterExtension + static final MiniClusterExtension MINI_CLUSTER = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(2) + .build()); + + protected final TableRow upsertOutputTable = createUpsertOutputTable(); + protected final TableRow appendOutputTable = createAppendOutputTable(); + protected final TableRow batchOutputTable = createBatchOutputTable(); + protected final TableRow userOutputTable = createUserOutputTable(); + + protected TableRow createUpsertOutputTable() { + return tableRow( + "dynamicSinkForUpsert", + pkField("cnt", DataTypes.BIGINT().notNull()), + field("lencnt", DataTypes.BIGINT().notNull()), + pkField("cTag", DataTypes.INT().notNull()), + field("ts", dbType("TIMESTAMP"), DataTypes.TIMESTAMP())); + } + + protected TableRow createAppendOutputTable() { + return tableRow( + "dynamicSinkForAppend", + field("id", DataTypes.INT().notNull()), + field("num", DataTypes.BIGINT().notNull()), + field("ts", dbType("TIMESTAMP"), DataTypes.TIMESTAMP())); + } + + protected TableRow createBatchOutputTable() { + return tableRow( + "dynamicSinkForBatch", + field("NAME", DataTypes.VARCHAR(20).notNull()), + field("SCORE", DataTypes.BIGINT().notNull())); + } + + protected TableRow createUserOutputTable() { + return tableRow( + "USER_TABLE", + pkField("user_id", DataTypes.VARCHAR(20).notNull()), + field("user_name", DataTypes.VARCHAR(20).notNull()), + field("email", DataTypes.VARCHAR(255)), + field("balance", DataTypes.DECIMAL(18, 2)), + field("balance2", DataTypes.DECIMAL(18, 2))); + } + + @Override + public List<TableManaged> getManagedTables() { + return Arrays.asList( + upsertOutputTable, appendOutputTable, batchOutputTable, userOutputTable); + } + + @AfterEach + void afterEach() { + TestValuesTableFactory.clearAllData(); + } + + protected List<Row> testUserData() { + return Arrays.asList( + Row.of( + "user1", + "Tom", + "[email protected]", + new BigDecimal("8.10"), + new BigDecimal("16.20")), + Row.of( + "user3", + "Bailey", + "[email protected]", + new BigDecimal("9.99"), + new BigDecimal("19.98")), + Row.of( + "user4", + "Tina", + "[email protected]", + new BigDecimal("11.30"), + new BigDecimal("22.60"))); + } + + protected List<Row> testData() { + return Arrays.asList( + Row.of(1, 1L, "Hi", Timestamp.valueOf("1970-01-01 00:00:00.001")), + Row.of(2, 2L, "Hello", Timestamp.valueOf("1970-01-01 00:00:00.002")), + Row.of(3, 2L, "Hello world", Timestamp.valueOf("1970-01-01 00:00:00.003")), + Row.of( + 4, + 3L, + "Hello world, how are you?", + Timestamp.valueOf("1970-01-01 00:00:00.004")), + Row.of(5, 3L, "I am fine.", Timestamp.valueOf("1970-01-01 00:00:00.005")), + Row.of(6, 3L, "Luke Skywalker", Timestamp.valueOf("1970-01-01 00:00:00.006")), + Row.of(7, 4L, "Comment#1", Timestamp.valueOf("1970-01-01 00:00:00.007")), + Row.of(8, 4L, "Comment#2", Timestamp.valueOf("1970-01-01 00:00:00.008")), + Row.of(9, 4L, "Comment#3", Timestamp.valueOf("1970-01-01 00:00:00.009")), + Row.of(10, 4L, "Comment#4", Timestamp.valueOf("1970-01-01 00:00:00.010")), + Row.of(11, 5L, "Comment#5", Timestamp.valueOf("1970-01-01 00:00:00.011")), + Row.of(12, 5L, "Comment#6", Timestamp.valueOf("1970-01-01 00:00:00.012")), + Row.of(13, 5L, "Comment#7", Timestamp.valueOf("1970-01-01 00:00:00.013")), + Row.of(14, 5L, "Comment#8", Timestamp.valueOf("1970-01-01 00:00:00.014")), + Row.of(15, 5L, "Comment#9", Timestamp.valueOf("1970-01-01 00:00:00.015")), + Row.of(16, 6L, "Comment#10", Timestamp.valueOf("1970-01-01 00:00:00.016")), + Row.of(17, 6L, "Comment#11", Timestamp.valueOf("1970-01-01 00:00:00.017")), + Row.of(18, 6L, "Comment#12", Timestamp.valueOf("1970-01-01 00:00:00.018")), + Row.of(19, 6L, "Comment#13", Timestamp.valueOf("1970-01-01 00:00:00.019")), + Row.of(20, 6L, "Comment#14", Timestamp.valueOf("1970-01-01 00:00:00.020")), + Row.of(21, 6L, "Comment#15", Timestamp.valueOf("1970-01-01 00:00:00.021"))); + } + + protected Map<Integer, Row> testDataMap() { + return testData().stream() + .collect(Collectors.toMap(r -> r.getFieldAs(0), Function.identity())); + } + + private void createTestDataTempView(StreamTableEnvironment tEnv, String viewName) { + Table table = tEnv.fromValues(testData()).as("id", "num", "text", "ts"); + + tEnv.createTemporaryView(viewName, table); + } + + @Test + protected void testUpsert() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String viewName = "testData"; + createTestDataTempView(tEnv, viewName); + + String tableName = "upsertSink"; + tEnv.executeSql( + upsertOutputTable.getCreateQueryForFlink( + getMetadata(), + tableName, + Arrays.asList( + "'sink.buffer-flush.max-rows' = '2'", + "'sink.buffer-flush.interval' = '0'", + "'sink.max-retries' = '0'"))); + + tEnv.executeSql( + String.format( + "INSERT INTO %s " + + " SELECT cnt, COUNT(len) AS lencnt, cTag, MAX(ts) AS ts " + + " FROM ( " + + " SELECT len, COUNT(id) as cnt, cTag, MAX(ts) AS ts " + + " FROM (SELECT id, CHAR_LENGTH(text) AS len, (CASE WHEN id > 0 THEN 1 ELSE 0 END) cTag, ts FROM %s) " + + " GROUP BY len, cTag " + + " ) " + + " GROUP BY cnt, cTag", + tableName, viewName)) + .await(); + + Map<Integer, Row> mapTestData = testDataMap(); + assertThat(upsertOutputTable.selectAllTable(getMetadata())) + .containsExactlyInAnyOrder( + Row.of(1L, 5L, 1, mapTestData.get(6).getField(3)), + Row.of(7L, 1L, 1, mapTestData.get(21).getField(3)), + Row.of(9L, 1L, 1, mapTestData.get(15).getField(3))); + } + + @Test + void testAppend() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + env.getConfig().setParallelism(1); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String viewName = "testData"; + createTestDataTempView(tEnv, viewName); + + String tableName = "appendSink"; + tEnv.executeSql(appendOutputTable.getCreateQueryForFlink(getMetadata(), tableName)); + + Set<Integer> searchIds = new HashSet<>(Arrays.asList(2, 10, 20)); + tEnv.executeSql( + String.format( + "INSERT INTO %s SELECT id, num, ts FROM %s WHERE id IN (%s)", + tableName, + viewName, + searchIds.stream() + .map(Object::toString) + .collect(Collectors.joining(",")))) + .await(); + + List<Row> tableRows = appendOutputTable.selectAllTable(getMetadata()); + assertThat(tableRows.size()).isEqualTo(3); + + Map<Integer, Row> mapTestData = testDataMap(); + assertThat(tableRows) + .containsExactlyInAnyOrderElementsOf( + searchIds.stream() + .map(mapTestData::get) + .map(d -> Row.of(d.getField(0), d.getField(1), d.getField(3))) + .collect(Collectors.toList())); + } + + @Test + void testBatchSink() throws Exception { + TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode()); + + String tableName = "batchSink"; + tEnv.executeSql( + batchOutputTable.getCreateQueryForFlink( + getMetadata(), + tableName, + Arrays.asList( + "'sink.buffer-flush.max-rows' = '2'", + "'sink.buffer-flush.interval' = '300ms'", + "'sink.max-retries' = '4'"))); + + TableResult tableResult = + tEnv.executeSql( + String.format( + "INSERT INTO %s " + + " SELECT user_name, score " + + " FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (1, 'Bob')) " + + " AS UserCountTable(score, user_name) ", + tableName)); + tableResult.await(); + + assertThat(batchOutputTable.selectAllTable(getMetadata())) + .containsExactlyInAnyOrder( + Row.of("Bob", 1L), + Row.of("Tom", 22L), + Row.of("Kim", 42L), + Row.of("Kim", 42L), + Row.of("Bob", 1L)); + } + + @Test + protected void testReadingFromChangelogSource() throws Exception { + TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build()); + String dataId = TestValuesTableFactory.registerData(TestData.userChangelog()); + + String userTableLogs = "user_logs"; + tEnv.executeSql( + String.format( + "CREATE TABLE %s ( " + + " user_id STRING, " + + " user_name STRING, " + + " email STRING, " + + " balance DECIMAL(18,2), " + + " balance2 AS balance * 2 " + + ") WITH ( " + + " 'connector' = 'values', " + + " 'data-id' = '%s', " + + " 'changelog-mode' = 'I,UA,UB,D' " + + ")", + userTableLogs, dataId)); + + String userTableSink = "user_sink"; + tEnv.executeSql( + userOutputTable.getCreateQueryForFlink( + getMetadata(), + userTableSink, + Arrays.asList( + "'sink.buffer-flush.max-rows' = '2'", + "'sink.buffer-flush.interval' = '0'"))); + + tEnv.executeSql( + String.format( + "INSERT INTO %s SELECT * FROM %s", userTableSink, userTableLogs)) + .await(); + + assertThat(userOutputTable.selectAllTable(getMetadata())) + .containsExactlyInAnyOrderElementsOf(testUserData()); + } +} diff --git a/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DynamicTableSourceTest.java b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DynamicTableSourceTest.java new file mode 100644 index 00000000..abf92aba --- /dev/null +++ b/flink-connector-jdbc-backward-compatibility/src/test/java/org/apache/flink/connector/jdbc/backward/compatibility/DynamicTableSourceTest.java @@ -0,0 +1,605 @@ +package org.apache.flink.connector.jdbc.backward.compatibility; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.databases.postgres.PostgresTestBase; +import org.apache.flink.connector.jdbc.testutils.TableManaged; +import org.apache.flink.connector.jdbc.testutils.tables.TableRow; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.connector.source.lookup.cache.LookupCache; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.runtime.utils.StreamTestSink; +import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager; +import org.apache.flink.table.test.lookup.cache.LookupCacheAssert; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.math.BigDecimal; +import java.sql.Connection; +import java.sql.SQLException; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalUnit; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.dbType; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field; +import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.tableRow; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for backward compatibility. */ +public class DynamicTableSourceTest implements PostgresTestBase { + + @RegisterExtension + static final MiniClusterExtension MINI_CLUSTER_RESOURCE = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(1) + .setConfiguration(new Configuration()) + .build()); + + public static final String CREATE_TABLE_WITH_NAME_STATEMENT = + "CREATE TABLE value_source ( " + + " `id` BIGINT, " + + " `name` STRING, " + + " `proctime` AS PROCTIME()" + + ") WITH (" + + " 'connector' = 'values', " + + " 'data-id' = '%s'" + + ")"; + + public static final String CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT = + "CREATE TABLE value_source ( " + + " `id` BIGINT, " + + " `name` STRING, " + + " `nickname` STRING, " + + " `proctime` AS PROCTIME()" + + ") WITH (" + + " 'connector' = 'values', " + + " 'data-id' = '%s'" + + ")"; + + private final TableRow inputTable = createInputTable(); + + public static StreamExecutionEnvironment env; + public static TableEnvironment tEnv; + + protected TableRow createInputTable() { + return tableRow( + "jdbDynamicTableSource", + field("id", DataTypes.BIGINT().notNull()), + field("decimal_col", DataTypes.DECIMAL(10, 4)), + field("timestamp6_col", DataTypes.TIMESTAMP(6)), + // other fields + field("real_col", dbType("REAL"), DataTypes.FLOAT()), + field("double_col", dbType("DOUBLE PRECISION"), DataTypes.DOUBLE()), + field("time_col", dbType("TIME"), DataTypes.TIME())); + } + + @Override + public List<TableManaged> getManagedTables() { + return Collections.singletonList(inputTable); + } + + protected List<Row> getTestData() { + return Arrays.asList( + Row.of( + 1L, + BigDecimal.valueOf(100.1234), + LocalDateTime.parse("2020-01-01T15:35:00.123456"), + 1.175E-37F, + 1.79769E308D, + LocalTime.parse("15:35")), + Row.of( + 2L, + BigDecimal.valueOf(101.1234), + LocalDateTime.parse("2020-01-01T15:36:01.123456"), + -1.175E-37F, + -1.79769E308, + LocalTime.parse("15:36:01"))); + } + + @BeforeEach + void beforeEach() throws SQLException { + try (Connection conn = getMetadata().getConnection()) { + inputTable.insertIntoTableValues(conn, getTestData()); + } + env = StreamExecutionEnvironment.getExecutionEnvironment(); + tEnv = StreamTableEnvironment.create(env); + } + + @AfterEach + void afterEach() { + StreamTestSink.clear(); + } + + @Test + void testJdbcSource() { + String testTable = "testTable"; + tEnv.executeSql(inputTable.getCreateQueryForFlink(getMetadata(), testTable)); + + List<Row> collected = executeQuery("SELECT * FROM " + testTable); + + assertThat(collected).containsExactlyInAnyOrderElementsOf(getTestData()); + } + + @Test + void testProject() { + String testTable = "testTable"; + tEnv.executeSql( + inputTable.getCreateQueryForFlink( + getMetadata(), + testTable, + Arrays.asList( + "'scan.partition.column'='id'", + "'scan.partition.num'='2'", + "'scan.partition.lower-bound'='0'", + "'scan.partition.upper-bound'='100'"))); + + String fields = String.join(",", Arrays.copyOfRange(inputTable.getTableFields(), 0, 3)); + List<Row> collected = executeQuery(String.format("SELECT %s FROM %s", fields, testTable)); + + List<Row> expected = + getTestData().stream() + .map(row -> Row.of(row.getField(0), row.getField(1), row.getField(2))) + .collect(Collectors.toList()); + + assertThat(collected).containsExactlyInAnyOrderElementsOf(expected); + } + + @Test + public void testLimit() { + String testTable = "testTable"; + tEnv.executeSql( + inputTable.getCreateQueryForFlink( + getMetadata(), + testTable, + Arrays.asList( + "'scan.partition.column'='id'", + "'scan.partition.num'='2'", + "'scan.partition.lower-bound'='1'", + "'scan.partition.upper-bound'='2'"))); + + List<Row> collected = executeQuery("SELECT * FROM " + testTable + " LIMIT 1"); + + assertThat(collected).hasSize(1); + assertThat(getTestData()) + .as("The actual output is not a subset of the expected set.") + .containsAll(collected); + } + + @Test + public void testFilter() { + String testTable = "testTable"; + tEnv.executeSql(inputTable.getCreateQueryForFlink(getMetadata(), testTable)); + + // create a partitioned table to ensure no regression + String partitionedTable = "PARTITIONED_TABLE"; + tEnv.executeSql( + inputTable.getCreateQueryForFlink( + getMetadata(), + partitionedTable, + Arrays.asList( + "'scan.partition.column'='id'", + "'scan.partition.num'='1'", + "'scan.partition.lower-bound'='1'", + "'scan.partition.upper-bound'='1'"))); + + // we create a VIEW here to test column remapping, ie. would filter push down work if we + // create a view that depends on our source table + tEnv.executeSql( + String.format( + "CREATE VIEW FAKE_TABLE (idx, %s) as (SELECT * from %s )", + Arrays.stream(inputTable.getTableFields()) + .filter(f -> !f.equals("id")) + .collect(Collectors.joining(",")), + testTable)); + + Row onlyRow1 = + getTestData().stream() + .filter(row -> row.getFieldAs(0).equals(1L)) + .findAny() + .orElseThrow(NullPointerException::new); + + Row onlyRow2 = + getTestData().stream() + .filter(row -> row.getFieldAs(0).equals(2L)) + .findAny() + .orElseThrow(NullPointerException::new); + + List<Row> twoRows = getTestData(); + + // test simple filter + assertThat(executeQuery("SELECT * FROM FAKE_TABLE WHERE idx = 1")) + .containsExactly(onlyRow1); + + // test TIMESTAMP filter + assertThat( + executeQuery( + "SELECT * FROM FAKE_TABLE " + + "WHERE timestamp6_col > TIMESTAMP '2020-01-01 15:35:00'" + + " AND timestamp6_col < TIMESTAMP '2020-01-01 15:35:01'")) + .containsExactly(onlyRow1); + + // test the IN operator + assertThat( + executeQuery( + "SELECT * FROM FAKE_TABLE WHERE 1 = idx AND decimal_col IN (100.1234, 101.1234)")) + .containsExactly(onlyRow1); + + // test mixing AND and OR operator + assertThat( + executeQuery( + "SELECT * FROM FAKE_TABLE WHERE idx = 1 AND decimal_col = 100.1234 OR decimal_col = 101.1234")) + .containsExactlyInAnyOrderElementsOf(twoRows); + + // test mixing AND/OR with parenthesis, and the swapping the operand of equal expression + assertThat( + executeQuery( + "SELECT * FROM FAKE_TABLE WHERE (2 = idx AND decimal_col = 100.1234) OR decimal_col = 101.1234")) + .containsExactly(onlyRow2); + + // test Greater than, just to make sure we didnt break anything that we cannot pushdown + assertThat( + executeQuery( + "SELECT * FROM FAKE_TABLE WHERE idx = 2 AND decimal_col > 100 OR decimal_col = 101.123")) + .containsExactly(onlyRow2); + + // One more test of parenthesis + assertThat( + executeQuery( + "SELECT * FROM FAKE_TABLE WHERE 2 = idx AND (decimal_col = 100.1234 OR decimal_col = 102.1234)")) + .isEmpty(); + + assertThat( + executeQuery( + "SELECT * FROM " + + partitionedTable + + " WHERE id = 2 AND decimal_col > 100 OR decimal_col = 101.123")) + .isEmpty(); + + assertThat( + executeQuery( + "SELECT * FROM " + + partitionedTable + + " WHERE 1 = id AND decimal_col IN (100.1234, 101.1234)")) + .containsExactly(onlyRow1); + } + + @ParameterizedTest + @EnumSource(Caching.class) + void testLookupJoin(Caching caching) { + + String selectStatement = + "SELECT S.id, S.name, D.id, D.timestamp6_col, D.decimal_col FROM value_source" + + " AS S JOIN jdbc_lookup for system_time as of S.proctime AS D ON S.id = D.id"; + List<Row> expectedResultSetRows = + Arrays.asList( + Row.of( + 1L, + "Alice", + 1L, + truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")), + BigDecimal.valueOf(100.1234)), + Row.of( + 1L, + "Alice", + 1L, + truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")), + BigDecimal.valueOf(100.1234)), + Row.of( + 2L, + "Bob", + 2L, + truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")), + BigDecimal.valueOf(101.1234))); + + RowData key1 = GenericRowData.of(1L); + RowData value1 = + GenericRowData.of( + 1L, + DecimalData.fromBigDecimal(BigDecimal.valueOf(100.1234), 10, 4), + TimestampData.fromLocalDateTime( + truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")))); + + RowData key2 = GenericRowData.of(2L); + RowData value2 = + GenericRowData.of( + 2L, + DecimalData.fromBigDecimal(BigDecimal.valueOf(101.1234), 10, 4), + TimestampData.fromLocalDateTime( + truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")))); + + RowData key3 = GenericRowData.of(3L); + + Map<RowData, Collection<RowData>> expectedCachedEntries = new HashMap<>(); + expectedCachedEntries.put(key1, Collections.singletonList(value1)); + expectedCachedEntries.put(key2, Collections.singletonList(value2)); + expectedCachedEntries.put(key3, Collections.emptyList()); + + lookupTableTest( + caching, + sampleTableData(), + CREATE_TABLE_WITH_NAME_STATEMENT, + selectStatement, + expectedResultSetRows, + expectedCachedEntries); + } + + private void lookupTableTest( + Caching caching, + Collection<Row> dataToRegister, + String createTableStatement, + String selectStatement, + List<Row> expectedResultSetRows, + Map<RowData, Collection<RowData>> expectedCachedEntries) { + // Create JDBC lookup table + List<String> cachingOptions = Collections.emptyList(); + if (caching == Caching.ENABLE_CACHE) { + cachingOptions = + Arrays.asList( + "'lookup.cache.max-rows' = '100'", "'lookup.cache.ttl' = '10min'"); + } + tEnv.executeSql( + inputTable.getCreateQueryForFlink(getMetadata(), "jdbc_lookup", cachingOptions)); + + // Create and prepare a value source + String dataId = TestValuesTableFactory.registerData(dataToRegister); + tEnv.executeSql(String.format(createTableStatement, dataId)); + + if (caching == Caching.ENABLE_CACHE) { + LookupCacheManager.keepCacheOnRelease(true); + } + + // Execute lookup join + try { + List<Row> collected = executeQuery(selectStatement); + int expectedSize = expectedResultSetRows.size(); + + // check we go the expected number of rows + assertThat(collected) + .as("Actual output is not size " + expectedSize) + .hasSize(expectedSize) + .as("The actual output is not a subset of the expected set") + .containsAll(expectedResultSetRows); + + if (caching == Caching.ENABLE_CACHE) { + validateCachedValues(expectedCachedEntries); + } + } finally { + if (caching == Caching.ENABLE_CACHE) { + LookupCacheManager.getInstance().checkAllReleased(); + LookupCacheManager.getInstance().clear(); + LookupCacheManager.keepCacheOnRelease(false); + } + } + } + + @ParameterizedTest + @EnumSource(Caching.class) + void testLookupJoinWithFilter(Caching caching) { + List<Row> expectedResultSetRows = + Arrays.asList( + Row.of( + 2L, + "Bob", + 2L, + truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")), + BigDecimal.valueOf(101.1234))); + + RowData key2 = GenericRowData.of(2L); + RowData value2 = + GenericRowData.of( + 2L, + DecimalData.fromBigDecimal(BigDecimal.valueOf(101.1234), 10, 4), + TimestampData.fromLocalDateTime( + truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")))); + + Map<RowData, Collection<RowData>> expectedCachedEntries = new HashMap<>(); + expectedCachedEntries.put(key2, Collections.singletonList(value2)); + + lookupTableTest( + caching, + sampleTableData(), + CREATE_TABLE_WITH_NAME_STATEMENT, + "SELECT S.id, S.name, D.id, D.timestamp6_col, D.decimal_col FROM value_source" + + " AS S JOIN jdbc_lookup for system_time as of S.proctime AS D ON " + + "S.id = D.id AND S.name = \'Bob\'", + expectedResultSetRows, + expectedCachedEntries); + } + + private static List<Row> sampleTableData() { + return Arrays.asList( + Row.of(1L, "Alice"), Row.of(1L, "Alice"), Row.of(2L, "Bob"), Row.of(3L, "Charlie")); + } + + private static List<Row> sampleTableDataWithNickNames() { + return Arrays.asList( + Row.of(1L, "Alice", "ABC"), + Row.of(1L, "Alice", "ADD"), + Row.of(2L, "Bob", "BGH"), + Row.of(3L, "Charlie", "CHJ")); + } + + @ParameterizedTest + @EnumSource(Caching.class) + void testLookupJoinWithMultipleFilters(Caching caching) { + + List<Row> expectedResultSetRows = + Arrays.asList( + Row.of( + 1L, + "Alice", + "ADD", + 1L, + truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")), + BigDecimal.valueOf(100.1234))); + + RowData key1 = GenericRowData.of(1L); + RowData value1 = + GenericRowData.of( + 1L, + DecimalData.fromBigDecimal(BigDecimal.valueOf(100.1234), 10, 4), + TimestampData.fromLocalDateTime( + truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")))); + + Map<RowData, Collection<RowData>> expectedCachedEntries = new HashMap<>(); + expectedCachedEntries.put(key1, Collections.singletonList(value1)); + + lookupTableTest( + caching, + sampleTableDataWithNickNames(), + CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT, + "SELECT S.id, S.name, S.nickname, D.id, D.timestamp6_col, D.decimal_col FROM value_source" + + " AS S JOIN jdbc_lookup for system_time as of S.proctime AS D ON " + + "S.id = D.id AND S.name = 'Alice' AND S.nickname = 'ADD'", + expectedResultSetRows, + expectedCachedEntries); + } + + @ParameterizedTest + @EnumSource(Caching.class) + void testLookupJoinWithLikeFilter(Caching caching) { + + List<Row> expectedResultSetRows = + Arrays.asList( + Row.of( + 1L, + "Alice", + "ABC", + 1L, + truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")), + BigDecimal.valueOf(100.1234))); + + RowData key1 = GenericRowData.of(1L); + RowData value1 = + GenericRowData.of( + 1L, + DecimalData.fromBigDecimal(BigDecimal.valueOf(100.1234), 10, 4), + TimestampData.fromLocalDateTime( + truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")))); + + Map<RowData, Collection<RowData>> expectedCachedEntries = new HashMap<>(); + expectedCachedEntries.put(key1, Collections.singletonList(value1)); + + lookupTableTest( + caching, + Arrays.asList( + Row.of(1L, "Alice", "ABC"), + Row.of(1L, "Alice", "ADD"), + Row.of(2L, "Bob", "BGH"), + Row.of(3L, "Charlie", "CHJ")), + CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT, + "SELECT S.id, S.name, S.nickname, D.id, D.timestamp6_col, D.decimal_col FROM value_source" + + " AS S JOIN jdbc_lookup for system_time as of S.proctime AS D ON " + + "S.id = D.id AND S.name LIKE 'Al%' AND S.nickname = 'ABC' ", + expectedResultSetRows, + expectedCachedEntries); + } + + @ParameterizedTest + @EnumSource(Caching.class) + void testLookupJoinWithORFilter(Caching caching) { + + List<Row> expectedResultSetRows = + Arrays.asList( + Row.of( + 1L, + "Alice", + "ABC", + 1L, + truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")), + BigDecimal.valueOf(100.1234)), + Row.of( + 2L, + "Bob", + "BGH", + 2L, + truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")), + BigDecimal.valueOf(101.1234))); + + RowData key1 = GenericRowData.of(1L); + RowData value1 = + GenericRowData.of( + 1L, + DecimalData.fromBigDecimal(BigDecimal.valueOf(100.1234), 10, 4), + TimestampData.fromLocalDateTime( + truncateTime(LocalDateTime.parse("2020-01-01T15:35:00.123456")))); + + RowData key2 = GenericRowData.of(2L); + RowData value2 = + GenericRowData.of( + 2L, + DecimalData.fromBigDecimal(BigDecimal.valueOf(101.1234), 10, 4), + TimestampData.fromLocalDateTime( + truncateTime(LocalDateTime.parse("2020-01-01T15:36:01.123456")))); + + Map<RowData, Collection<RowData>> expectedCachedEntries = new HashMap<>(); + expectedCachedEntries.put(key1, Collections.singletonList(value1)); + expectedCachedEntries.put(key2, Collections.singletonList(value2)); + + lookupTableTest( + caching, + Arrays.asList( + Row.of(1L, "Alice", "ABC"), + Row.of(1L, "Alice", "ADD"), + Row.of(2L, "Bob", "BGH"), + Row.of(3L, "Charlie", "CHJ")), + CREATE_TABLE_WITH_NAME_AND_NICKNAME_STATEMENT, + "SELECT S.id, S.name, S.nickname, D.id, D.timestamp6_col, D.decimal_col FROM value_source" + + " AS S JOIN jdbc_lookup for system_time as of S.proctime AS D ON " + + "S.id = D.id AND (S.name = \'Bob\' OR S.nickname = \'ABC\')", + expectedResultSetRows, + expectedCachedEntries); + } + + protected TemporalUnit timestampPrecision() { + return ChronoUnit.MICROS; + } + + private LocalDateTime truncateTime(LocalDateTime value) { + return value.truncatedTo(timestampPrecision()); + } + + private List<Row> executeQuery(String query) { + return CollectionUtil.iteratorToList(tEnv.executeSql(query).collect()); + } + + private void validateCachedValues(Map<RowData, Collection<RowData>> expectedCachedEntries) { + // Validate cache + Map<String, LookupCacheManager.RefCountedCache> managedCaches = + LookupCacheManager.getInstance().getManagedCaches(); + assertThat(managedCaches).as("There should be only 1 shared cache registered").hasSize(1); + LookupCache cache = managedCaches.get(managedCaches.keySet().iterator().next()).getCache(); + // jdbc does support project push down, the cached row has been projected + LookupCacheAssert.assertThat(cache).containsExactlyEntriesOf(expectedCachedEntries); + } + + private enum Caching { + ENABLE_CACHE, + DISABLE_CACHE + } +} diff --git a/flink-connector-jdbc-backward-compatibility/src/test/resources/log4j2-test.properties b/flink-connector-jdbc-backward-compatibility/src/test/resources/log4j2-test.properties new file mode 100644 index 00000000..5a383778 --- /dev/null +++ b/flink-connector-jdbc-backward-compatibility/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = WARN +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
