This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit b5bcb22f28ace028f824cef4512aaf90ec18b69a Author: Jark Wu <[email protected]> AuthorDate: Mon May 18 17:48:25 2020 +0800 [FLINK-17798][connector/jdbc] Align the behavior between the new and legacy JDBC table source This closes #12221 --- flink-connectors/flink-connector-jdbc/pom.xml | 2 +- .../jdbc/table/JdbcDynamicTableSource.java | 48 ++++++++++--------- .../table/JdbcDynamicTableSourceSinkFactory.java | 7 +-- .../jdbc/table/JdbcDynamicTableSourceITCase.java | 45 ++++++++++-------- ...ctionITCase.java => JdbcLookupTableITCase.java} | 43 +++++++++-------- .../connector/jdbc/table/JdbcTablePlanTest.java | 54 ++++++++++++++++++++++ .../jdbc/table/JdbcTableSourceITCase.java | 1 - .../connector/jdbc/table/JdbcTablePlanTest.xml | 35 ++++++++++++++ 8 files changed, 168 insertions(+), 67 deletions(-) diff --git a/flink-connectors/flink-connector-jdbc/pom.xml b/flink-connectors/flink-connector-jdbc/pom.xml index a959546..29467e2 100644 --- a/flink-connectors/flink-connector-jdbc/pom.xml +++ b/flink-connectors/flink-connector-jdbc/pom.xml @@ -94,7 +94,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_${scala.binary.version}</artifactId> + <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId> <version>${project.version}</version> <type>test-jar</type> <scope>test</scope> diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java index 248ffe1..21a80a2 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java @@ -32,37 +32,35 @@ import org.apache.flink.table.connector.source.InputFormatProvider; import org.apache.flink.table.connector.source.LookupTableSource; import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.TableFunctionProvider; +import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.util.Preconditions; -import java.util.Arrays; import java.util.Objects; /** * A {@link DynamicTableSource} for JDBC. */ @Internal -public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSource { +public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown { private final JdbcOptions options; private final JdbcReadOptions readOptions; private final JdbcLookupOptions lookupOptions; - private final TableSchema schema; - private final int[] selectFields; + private TableSchema physicalSchema; private final String dialectName; public JdbcDynamicTableSource( JdbcOptions options, JdbcReadOptions readOptions, JdbcLookupOptions lookupOptions, - TableSchema schema, - int[] selectFields) { + TableSchema physicalSchema) { this.options = options; this.readOptions = readOptions; this.lookupOptions = lookupOptions; - this.schema = schema; - this.selectFields = selectFields; + this.physicalSchema = physicalSchema; this.dialectName = options.getDialect().dialectName(); } @@ -74,15 +72,15 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc int[] innerKeyArr = context.getKeys()[i]; Preconditions.checkArgument(innerKeyArr.length == 1, "JDBC only support non-nested look up keys"); - keyNames[i] = schema.getFieldNames()[innerKeyArr[0]]; + keyNames[i] = physicalSchema.getFieldNames()[innerKeyArr[0]]; } - final RowType rowType = (RowType) schema.toRowDataType().getLogicalType(); + final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType(); return TableFunctionProvider.of(new JdbcRowDataLookupFunction( options, lookupOptions, - schema.getFieldNames(), - schema.getFieldDataTypes(), + physicalSchema.getFieldNames(), + physicalSchema.getFieldDataTypes(), keyNames, rowType)); } @@ -101,7 +99,7 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc } final JdbcDialect dialect = options.getDialect(); String query = dialect.getSelectFromStatement( - options.getTableName(), schema.getFieldNames(), new String[0]); + options.getTableName(), physicalSchema.getFieldNames(), new String[0]); if (readOptions.getPartitionColumnName().isPresent()) { long lowerBound = readOptions.getPartitionLowerBound().get(); long upperBound = readOptions.getPartitionUpperBound().get(); @@ -113,10 +111,10 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc " BETWEEN ? AND ?"; } builder.setQuery(query); - final RowType rowType = (RowType) schema.toRowDataType().getLogicalType(); + final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType(); builder.setRowConverter(dialect.getRowConverter(rowType)); builder.setRowDataTypeInfo((TypeInformation<RowData>) runtimeProviderContext - .createTypeInformation(schema.toRowDataType())); + .createTypeInformation(physicalSchema.toRowDataType())); return InputFormatProvider.of(builder.build()); } @@ -127,8 +125,19 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc } @Override + public boolean supportsNestedProjection() { + // JDBC doesn't support nested projection + return false; + } + + @Override + public void applyProjection(int[][] projectedFields) { + this.physicalSchema = TableSchemaUtils.projectSchema(physicalSchema, projectedFields); + } + + @Override public DynamicTableSource copy() { - return new JdbcDynamicTableSource(options, readOptions, lookupOptions, schema, selectFields); + return new JdbcDynamicTableSource(options, readOptions, lookupOptions, physicalSchema); } @Override @@ -148,15 +157,12 @@ public class JdbcDynamicTableSource implements ScanTableSource, LookupTableSourc return Objects.equals(options, that.options) && Objects.equals(readOptions, that.readOptions) && Objects.equals(lookupOptions, that.lookupOptions) && - Objects.equals(schema, that.schema) && - Arrays.equals(selectFields, that.selectFields) && + Objects.equals(physicalSchema, that.physicalSchema) && Objects.equals(dialectName, that.dialectName); } @Override public int hashCode() { - int result = Objects.hash(options, readOptions, lookupOptions, schema, dialectName); - result = 31 * result + Arrays.hashCode(selectFields); - return result; + return Objects.hash(options, readOptions, lookupOptions, physicalSchema, dialectName); } } diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java index 28a129d..930a1b0 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java @@ -173,16 +173,11 @@ public class JdbcDynamicTableSourceSinkFactory implements DynamicTableSourceFact helper.validate(); validateConfigOptions(config); TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema()); - int[] selectFields = new int[physicalSchema.getFieldNames().length]; - for (int i = 0; i < selectFields.length; i++) { - selectFields[i] = i; - } return new JdbcDynamicTableSource( getJdbcOptions(helper.getOptions()), getJdbcReadOptions(helper.getOptions()), getJdbcLookupOptions(helper.getOptions()), - physicalSchema, - selectFields); + physicalSchema); } private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) { diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java index 6f93307..48be89e 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java @@ -22,10 +22,12 @@ import org.apache.flink.connector.jdbc.JdbcTestBase; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.java.StreamTableEnvironment; -import org.apache.flink.table.runtime.utils.StreamITCase; +import org.apache.flink.table.planner.runtime.utils.StreamTestSink; import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.types.Row; +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; + import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -34,8 +36,12 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; -import java.util.Arrays; +import java.util.Iterator; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; /** * ITCase for {@link JdbcDynamicTableSource}. @@ -79,6 +85,7 @@ public class JdbcDynamicTableSourceITCase extends AbstractTestBase { Statement stat = conn.createStatement()) { stat.executeUpdate("DROP TABLE " + INPUT_TABLE); } + StreamTestSink.clear(); } @Test @@ -106,16 +113,17 @@ public class JdbcDynamicTableSourceITCase extends AbstractTestBase { ")" ); - StreamITCase.clear(); - tEnv.toAppendStream(tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE), Row.class) - .addSink(new StreamITCase.StringSink<>()); - env.execute(); - + Iterator<Row> collected = tEnv.executeSql("SELECT * FROM " + INPUT_TABLE).collect(); + List<String> result = Lists.newArrayList(collected).stream() + .map(Row::toString) + .sorted() + .collect(Collectors.toList()); List<String> expected = - Arrays.asList( + Stream.of( "1,2020-01-01T15:35:00.123456,2020-01-01T15:35:00.123456789,15:35,1.175E-37,1.79769E308,100.1234", - "2,2020-01-01T15:36:01.123456,2020-01-01T15:36:01.123456789,15:36:01,-1.175E-37,-1.79769E308,101.1234"); - StreamITCase.compareWithList(expected); + "2,2020-01-01T15:36:01.123456,2020-01-01T15:36:01.123456789,15:36:01,-1.175E-37,-1.79769E308,101.1234") + .sorted().collect(Collectors.toList()); + assertEquals(expected, result); } @Test @@ -147,15 +155,16 @@ public class JdbcDynamicTableSourceITCase extends AbstractTestBase { ")" ); - StreamITCase.clear(); - tEnv.toAppendStream(tEnv.sqlQuery("SELECT id,timestamp6_col,decimal_col FROM " + INPUT_TABLE), Row.class) - .addSink(new StreamITCase.StringSink<>()); - env.execute(); - + Iterator<Row> collected = tEnv.executeSql("SELECT id,timestamp6_col,decimal_col FROM " + INPUT_TABLE).collect(); + List<String> result = Lists.newArrayList(collected).stream() + .map(Row::toString) + .sorted() + .collect(Collectors.toList()); List<String> expected = - Arrays.asList( + Stream.of( "1,2020-01-01T15:35:00.123456,100.1234", - "2,2020-01-01T15:36:01.123456,101.1234"); - StreamITCase.compareWithList(expected); + "2,2020-01-01T15:36:01.123456,101.1234") + .sorted().collect(Collectors.toList()); + assertEquals(expected, result); } } diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupFunctionITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java similarity index 86% rename from flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupFunctionITCase.java rename to flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java index 8d40cdd..793ea9d 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupFunctionITCase.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java @@ -22,17 +22,17 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.jdbc.JdbcTestFixture; import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions; import org.apache.flink.connector.jdbc.internal.options.JdbcOptions; -import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.java.StreamTableEnvironment; -import org.apache.flink.table.runtime.utils.StreamITCase; import org.apache.flink.table.types.DataType; import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.types.Row; +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; + import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -46,16 +46,20 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.stream.Collectors; import static org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB; import static org.apache.flink.table.api.Expressions.$; +import static org.junit.Assert.assertEquals; /** - * IT case for {@link JdbcLookupFunction}. + * IT case for lookup source of JDBC connector. */ @RunWith(Parameterized.class) -public class JdbcLookupFunctionITCase extends AbstractTestBase { +public class JdbcLookupTableITCase extends AbstractTestBase { public static final String DB_URL = "jdbc:derby:memory:lookup"; public static final String LOOKUP_TABLE = "lookup_table"; @@ -63,7 +67,7 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase { private final String tableFactory; private final boolean useCache; - public JdbcLookupFunctionITCase(String tableFactory, boolean useCache) { + public JdbcLookupTableITCase(String tableFactory, boolean useCache) { this.useCache = useCache; this.tableFactory = tableFactory; } @@ -143,16 +147,20 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase { } @Test - public void test() throws Exception { + public void testLookup() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); - StreamITCase.clear(); + Iterator<Row> collected; if ("legacyFactory".equals(tableFactory)) { - useLegacyTableFactory(env, tEnv); + collected = useLegacyTableFactory(env, tEnv); } else { - useDynamicTableFactory(env, tEnv); + collected = useDynamicTableFactory(env, tEnv); } + List<String> result = Lists.newArrayList(collected).stream() + .map(Row::toString) + .sorted() + .collect(Collectors.toList()); List<String> expected = new ArrayList<>(); expected.add("1,1,11-c1-v1,11-c2-v1"); @@ -162,11 +170,12 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase { expected.add("2,3,null,23-c2"); expected.add("2,5,25-c1,25-c2"); expected.add("3,8,38-c1,38-c2"); + Collections.sort(expected); - StreamITCase.compareWithList(expected); + assertEquals(expected, result); } - private void useLegacyTableFactory(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) throws Exception { + private Iterator<Row> useLegacyTableFactory(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) throws Exception { Table t = tEnv.fromDataStream(env.fromCollection(Arrays.asList( new Tuple2<>(1, "1"), new Tuple2<>(1, "1"), @@ -195,13 +204,10 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase { String sqlQuery = "SELECT id1, id2, comment1, comment2 FROM T, " + "LATERAL TABLE(jdbcLookup(id1, id2)) AS S(l_id1, l_id2, comment1, comment2)"; - Table result = tEnv.sqlQuery(sqlQuery); - DataStream<Row> resultSet = tEnv.toAppendStream(result, Row.class); - resultSet.addSink(new StreamITCase.StringSink<>()); - env.execute(); + return tEnv.executeSql(sqlQuery).collect(); } - private void useDynamicTableFactory(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) throws Exception { + private Iterator<Row> useDynamicTableFactory(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) throws Exception { Table t = tEnv.fromDataStream(env.fromCollection(Arrays.asList( new Tuple2<>(1, "1"), new Tuple2<>(1, "1"), @@ -229,9 +235,6 @@ public class JdbcLookupFunctionITCase extends AbstractTestBase { String sqlQuery = "SELECT source.id1, source.id2, L.comment1, L.comment2 FROM T AS source " + "JOIN lookup for system_time as of source.proctime AS L " + "ON source.id1 = L.id1 and source.id2 = L.id2"; - Table result = tEnv.sqlQuery(sqlQuery); - DataStream<Row> resultSet = tEnv.toAppendStream(result, Row.class); - resultSet.addSink(new StreamITCase.StringSink<>()); - env.execute(); + return tEnv.executeSql(sqlQuery).collect(); } } diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java new file mode 100644 index 0000000..4efcb47 --- /dev/null +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.table; + +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.planner.utils.StreamTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; + +import org.junit.Test; + +/** + * Plan tests for JDBC connector, for example, testing projection push down. + */ +public class JdbcTablePlanTest extends TableTestBase { + + private final StreamTableTestUtil util = streamTestUtil(new TableConfig()); + + @Test + public void testProjectionPushDown() { + util.tableEnv().executeSql( + "CREATE TABLE jdbc (" + + "id BIGINT," + + "timestamp6_col TIMESTAMP(6)," + + "timestamp9_col TIMESTAMP(9)," + + "time_col TIME," + + "real_col FLOAT," + + "double_col DOUBLE," + + "decimal_col DECIMAL(10, 4)" + + ") WITH (" + + " 'connector'='jdbc'," + + " 'url'='jdbc:derby:memory:test'," + + " 'table-name'='test_table'" + + ")" + ); + util.verifyPlan("SELECT decimal_col, timestamp9_col, id FROM jdbc"); + } + +} diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java index 277191c..8115696 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java @@ -44,7 +44,6 @@ import java.util.stream.StreamSupport; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; - /** * ITCase for {@link JdbcTableSource}. */ diff --git a/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml b/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml new file mode 100644 index 0000000..9219fc8 --- /dev/null +++ b/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<Root> + <TestCase name="testProjectionPushDown"> + <Resource name="sql"> + <![CDATA[SELECT decimal_col, timestamp9_col, id FROM jdbc]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(decimal_col=[$6], timestamp9_col=[$2], id=[$0]) ++- LogicalTableScan(table=[[default_catalog, default_database, jdbc]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +TableSourceScan(table=[[default_catalog, default_database, jdbc, project=[decimal_col, timestamp9_col, id]]], fields=[decimal_col, timestamp9_col, id]) +]]> + </Resource> + </TestCase> +</Root>
