This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 73520ca19e76d0895c38ec956250cb588eca740c
Author: Jark Wu <[email protected]>
AuthorDate: Mon May 18 17:48:25 2020 +0800

    [FLINK-17798][connector/jdbc] Align the behavior between the new and legacy 
JDBC table source
    
    This closes #12221
---
 flink-connectors/flink-connector-jdbc/pom.xml      |  2 +-
 .../jdbc/table/JdbcDynamicTableSource.java         | 48 ++++++++++---------
 .../table/JdbcDynamicTableSourceSinkFactory.java   |  7 +--
 .../jdbc/table/JdbcDynamicTableSourceITCase.java   | 45 ++++++++++--------
 ...ctionITCase.java => JdbcLookupTableITCase.java} | 43 +++++++++--------
 .../connector/jdbc/table/JdbcTablePlanTest.java    | 54 ++++++++++++++++++++++
 .../jdbc/table/JdbcTableSourceITCase.java          |  1 -
 .../connector/jdbc/table/JdbcTablePlanTest.xml     | 35 ++++++++++++++
 8 files changed, 168 insertions(+), 67 deletions(-)

diff --git a/flink-connectors/flink-connector-jdbc/pom.xml 
b/flink-connectors/flink-connector-jdbc/pom.xml
index d259ce3..be107af 100644
--- a/flink-connectors/flink-connector-jdbc/pom.xml
+++ b/flink-connectors/flink-connector-jdbc/pom.xml
@@ -94,7 +94,7 @@ under the License.
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+                       
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                        <version>${project.version}</version>
                        <type>test-jar</type>
                        <scope>test</scope>
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
index 248ffe1..21a80a2 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java
@@ -32,37 +32,35 @@ import 
org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.TableFunctionProvider;
+import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.util.Preconditions;
 
-import java.util.Arrays;
 import java.util.Objects;
 
 /**
  * A {@link DynamicTableSource} for JDBC.
  */
 @Internal
-public class JdbcDynamicTableSource implements ScanTableSource, 
LookupTableSource {
+public class JdbcDynamicTableSource implements ScanTableSource, 
LookupTableSource, SupportsProjectionPushDown {
 
        private final JdbcOptions options;
        private final JdbcReadOptions readOptions;
        private final JdbcLookupOptions lookupOptions;
-       private final TableSchema schema;
-       private final int[] selectFields;
+       private TableSchema physicalSchema;
        private final String dialectName;
 
        public JdbcDynamicTableSource(
                        JdbcOptions options,
                        JdbcReadOptions readOptions,
                        JdbcLookupOptions lookupOptions,
-                       TableSchema schema,
-                       int[] selectFields) {
+                       TableSchema physicalSchema) {
                this.options = options;
                this.readOptions = readOptions;
                this.lookupOptions = lookupOptions;
-               this.schema = schema;
-               this.selectFields = selectFields;
+               this.physicalSchema = physicalSchema;
                this.dialectName = options.getDialect().dialectName();
        }
 
@@ -74,15 +72,15 @@ public class JdbcDynamicTableSource implements 
ScanTableSource, LookupTableSourc
                        int[] innerKeyArr = context.getKeys()[i];
                        Preconditions.checkArgument(innerKeyArr.length == 1,
                                "JDBC only support non-nested look up keys");
-                       keyNames[i] = schema.getFieldNames()[innerKeyArr[0]];
+                       keyNames[i] = 
physicalSchema.getFieldNames()[innerKeyArr[0]];
                }
-               final RowType rowType = (RowType) 
schema.toRowDataType().getLogicalType();
+               final RowType rowType = (RowType) 
physicalSchema.toRowDataType().getLogicalType();
 
                return TableFunctionProvider.of(new JdbcRowDataLookupFunction(
                        options,
                        lookupOptions,
-                       schema.getFieldNames(),
-                       schema.getFieldDataTypes(),
+                       physicalSchema.getFieldNames(),
+                       physicalSchema.getFieldDataTypes(),
                        keyNames,
                        rowType));
        }
@@ -101,7 +99,7 @@ public class JdbcDynamicTableSource implements 
ScanTableSource, LookupTableSourc
                }
                final JdbcDialect dialect = options.getDialect();
                String query = dialect.getSelectFromStatement(
-                       options.getTableName(), schema.getFieldNames(), new 
String[0]);
+                       options.getTableName(), physicalSchema.getFieldNames(), 
new String[0]);
                if (readOptions.getPartitionColumnName().isPresent()) {
                        long lowerBound = 
readOptions.getPartitionLowerBound().get();
                        long upperBound = 
readOptions.getPartitionUpperBound().get();
@@ -113,10 +111,10 @@ public class JdbcDynamicTableSource implements 
ScanTableSource, LookupTableSourc
                                " BETWEEN ? AND ?";
                }
                builder.setQuery(query);
-               final RowType rowType = (RowType) 
schema.toRowDataType().getLogicalType();
+               final RowType rowType = (RowType) 
physicalSchema.toRowDataType().getLogicalType();
                builder.setRowConverter(dialect.getRowConverter(rowType));
                builder.setRowDataTypeInfo((TypeInformation<RowData>) 
runtimeProviderContext
-                       .createTypeInformation(schema.toRowDataType()));
+                       .createTypeInformation(physicalSchema.toRowDataType()));
 
                return InputFormatProvider.of(builder.build());
        }
@@ -127,8 +125,19 @@ public class JdbcDynamicTableSource implements 
ScanTableSource, LookupTableSourc
        }
 
        @Override
+       public boolean supportsNestedProjection() {
+               // JDBC doesn't support nested projection
+               return false;
+       }
+
+       @Override
+       public void applyProjection(int[][] projectedFields) {
+               this.physicalSchema = 
TableSchemaUtils.projectSchema(physicalSchema, projectedFields);
+       }
+
+       @Override
        public DynamicTableSource copy() {
-               return new JdbcDynamicTableSource(options, readOptions, 
lookupOptions, schema, selectFields);
+               return new JdbcDynamicTableSource(options, readOptions, 
lookupOptions, physicalSchema);
        }
 
        @Override
@@ -148,15 +157,12 @@ public class JdbcDynamicTableSource implements 
ScanTableSource, LookupTableSourc
                return Objects.equals(options, that.options) &&
                        Objects.equals(readOptions, that.readOptions) &&
                        Objects.equals(lookupOptions, that.lookupOptions) &&
-                       Objects.equals(schema, that.schema) &&
-                       Arrays.equals(selectFields, that.selectFields) &&
+                       Objects.equals(physicalSchema, that.physicalSchema) &&
                        Objects.equals(dialectName, that.dialectName);
        }
 
        @Override
        public int hashCode() {
-               int result = Objects.hash(options, readOptions, lookupOptions, 
schema, dialectName);
-               result = 31 * result + Arrays.hashCode(selectFields);
-               return result;
+               return Objects.hash(options, readOptions, lookupOptions, 
physicalSchema, dialectName);
        }
 }
diff --git 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
index 28a129d..930a1b0 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceSinkFactory.java
@@ -173,16 +173,11 @@ public class JdbcDynamicTableSourceSinkFactory implements 
DynamicTableSourceFact
                helper.validate();
                validateConfigOptions(config);
                TableSchema physicalSchema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
-               int[] selectFields = new 
int[physicalSchema.getFieldNames().length];
-               for (int i = 0; i < selectFields.length; i++) {
-                       selectFields[i] = i;
-               }
                return new JdbcDynamicTableSource(
                        getJdbcOptions(helper.getOptions()),
                        getJdbcReadOptions(helper.getOptions()),
                        getJdbcLookupOptions(helper.getOptions()),
-                       physicalSchema,
-                       selectFields);
+                       physicalSchema);
        }
 
        private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
index 6f93307..48be89e 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSourceITCase.java
@@ -22,10 +22,12 @@ import org.apache.flink.connector.jdbc.JdbcTestBase;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.table.planner.runtime.utils.StreamTestSink;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,8 +36,12 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
 
 /**
  * ITCase for {@link JdbcDynamicTableSource}.
@@ -79,6 +85,7 @@ public class JdbcDynamicTableSourceITCase extends 
AbstractTestBase {
                        Statement stat = conn.createStatement()) {
                        stat.executeUpdate("DROP TABLE " + INPUT_TABLE);
                }
+               StreamTestSink.clear();
        }
 
        @Test
@@ -106,16 +113,17 @@ public class JdbcDynamicTableSourceITCase extends 
AbstractTestBase {
                                ")"
                );
 
-               StreamITCase.clear();
-               tEnv.toAppendStream(tEnv.sqlQuery("SELECT * FROM " + 
INPUT_TABLE), Row.class)
-                       .addSink(new StreamITCase.StringSink<>());
-               env.execute();
-
+               Iterator<Row> collected = tEnv.executeSql("SELECT * FROM " + 
INPUT_TABLE).collect();
+               List<String> result = Lists.newArrayList(collected).stream()
+                       .map(Row::toString)
+                       .sorted()
+                       .collect(Collectors.toList());
                List<String> expected =
-                       Arrays.asList(
+                       Stream.of(
                                
"1,2020-01-01T15:35:00.123456,2020-01-01T15:35:00.123456789,15:35,1.175E-37,1.79769E308,100.1234",
-                               
"2,2020-01-01T15:36:01.123456,2020-01-01T15:36:01.123456789,15:36:01,-1.175E-37,-1.79769E308,101.1234");
-               StreamITCase.compareWithList(expected);
+                               
"2,2020-01-01T15:36:01.123456,2020-01-01T15:36:01.123456789,15:36:01,-1.175E-37,-1.79769E308,101.1234")
+                       .sorted().collect(Collectors.toList());
+               assertEquals(expected, result);
        }
 
        @Test
@@ -147,15 +155,16 @@ public class JdbcDynamicTableSourceITCase extends 
AbstractTestBase {
                                ")"
                );
 
-               StreamITCase.clear();
-               tEnv.toAppendStream(tEnv.sqlQuery("SELECT 
id,timestamp6_col,decimal_col FROM " + INPUT_TABLE), Row.class)
-                       .addSink(new StreamITCase.StringSink<>());
-               env.execute();
-
+               Iterator<Row> collected = tEnv.executeSql("SELECT 
id,timestamp6_col,decimal_col FROM " + INPUT_TABLE).collect();
+               List<String> result = Lists.newArrayList(collected).stream()
+                       .map(Row::toString)
+                       .sorted()
+                       .collect(Collectors.toList());
                List<String> expected =
-                       Arrays.asList(
+                       Stream.of(
                                "1,2020-01-01T15:35:00.123456,100.1234",
-                               "2,2020-01-01T15:36:01.123456,101.1234");
-               StreamITCase.compareWithList(expected);
+                               "2,2020-01-01T15:36:01.123456,101.1234")
+                               .sorted().collect(Collectors.toList());
+               assertEquals(expected, result);
        }
 }
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupFunctionITCase.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
similarity index 86%
rename from 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupFunctionITCase.java
rename to 
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
index 8d40cdd..793ea9d 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupFunctionITCase.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcLookupTableITCase.java
@@ -22,17 +22,17 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.jdbc.JdbcTestFixture;
 import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
 import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.runtime.utils.StreamITCase;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -46,16 +46,20 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB;
 import static org.apache.flink.table.api.Expressions.$;
+import static org.junit.Assert.assertEquals;
 
 /**
- * IT case for {@link JdbcLookupFunction}.
+ * IT case for lookup source of JDBC connector.
  */
 @RunWith(Parameterized.class)
-public class JdbcLookupFunctionITCase extends AbstractTestBase {
+public class JdbcLookupTableITCase extends AbstractTestBase {
 
        public static final String DB_URL = "jdbc:derby:memory:lookup";
        public static final String LOOKUP_TABLE = "lookup_table";
@@ -63,7 +67,7 @@ public class JdbcLookupFunctionITCase extends 
AbstractTestBase {
        private final String tableFactory;
        private final boolean useCache;
 
-       public JdbcLookupFunctionITCase(String tableFactory, boolean useCache) {
+       public JdbcLookupTableITCase(String tableFactory, boolean useCache) {
                this.useCache = useCache;
                this.tableFactory = tableFactory;
        }
@@ -143,16 +147,20 @@ public class JdbcLookupFunctionITCase extends 
AbstractTestBase {
        }
 
        @Test
-       public void test() throws Exception {
+       public void testLookup() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(env);
-               StreamITCase.clear();
 
+               Iterator<Row> collected;
                if ("legacyFactory".equals(tableFactory)) {
-                       useLegacyTableFactory(env, tEnv);
+                       collected = useLegacyTableFactory(env, tEnv);
                } else {
-                       useDynamicTableFactory(env, tEnv);
+                       collected = useDynamicTableFactory(env, tEnv);
                }
+               List<String> result = Lists.newArrayList(collected).stream()
+                       .map(Row::toString)
+                       .sorted()
+                       .collect(Collectors.toList());
 
                List<String> expected = new ArrayList<>();
                expected.add("1,1,11-c1-v1,11-c2-v1");
@@ -162,11 +170,12 @@ public class JdbcLookupFunctionITCase extends 
AbstractTestBase {
                expected.add("2,3,null,23-c2");
                expected.add("2,5,25-c1,25-c2");
                expected.add("3,8,38-c1,38-c2");
+               Collections.sort(expected);
 
-               StreamITCase.compareWithList(expected);
+               assertEquals(expected, result);
        }
 
-       private void useLegacyTableFactory(StreamExecutionEnvironment env, 
StreamTableEnvironment tEnv) throws Exception {
+       private Iterator<Row> useLegacyTableFactory(StreamExecutionEnvironment 
env, StreamTableEnvironment tEnv) throws Exception {
                Table t = tEnv.fromDataStream(env.fromCollection(Arrays.asList(
                        new Tuple2<>(1, "1"),
                        new Tuple2<>(1, "1"),
@@ -195,13 +204,10 @@ public class JdbcLookupFunctionITCase extends 
AbstractTestBase {
 
                String sqlQuery = "SELECT id1, id2, comment1, comment2 FROM T, 
" +
                        "LATERAL TABLE(jdbcLookup(id1, id2)) AS S(l_id1, l_id2, 
comment1, comment2)";
-               Table result = tEnv.sqlQuery(sqlQuery);
-               DataStream<Row> resultSet = tEnv.toAppendStream(result, 
Row.class);
-               resultSet.addSink(new StreamITCase.StringSink<>());
-               env.execute();
+               return tEnv.executeSql(sqlQuery).collect();
        }
 
-       private void useDynamicTableFactory(StreamExecutionEnvironment env, 
StreamTableEnvironment tEnv) throws Exception {
+       private Iterator<Row> useDynamicTableFactory(StreamExecutionEnvironment 
env, StreamTableEnvironment tEnv) throws Exception {
                Table t = tEnv.fromDataStream(env.fromCollection(Arrays.asList(
                        new Tuple2<>(1, "1"),
                        new Tuple2<>(1, "1"),
@@ -229,9 +235,6 @@ public class JdbcLookupFunctionITCase extends 
AbstractTestBase {
                String sqlQuery = "SELECT source.id1, source.id2, L.comment1, 
L.comment2 FROM T AS source " +
                        "JOIN lookup for system_time as of source.proctime AS L 
" +
                        "ON source.id1 = L.id1 and source.id2 = L.id2";
-               Table result = tEnv.sqlQuery(sqlQuery);
-               DataStream<Row> resultSet = tEnv.toAppendStream(result, 
Row.class);
-               resultSet.addSink(new StreamITCase.StringSink<>());
-               env.execute();
+               return tEnv.executeSql(sqlQuery).collect();
        }
 }
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
new file mode 100644
index 0000000..4efcb47
--- /dev/null
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.table;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Test;
+
+/**
+ * Plan tests for JDBC connector, for example, testing projection push down.
+ */
+public class JdbcTablePlanTest extends TableTestBase {
+
+       private final StreamTableTestUtil util = streamTestUtil(new 
TableConfig());
+
+       @Test
+       public void testProjectionPushDown() {
+               util.tableEnv().executeSql(
+                       "CREATE TABLE jdbc (" +
+                               "id BIGINT," +
+                               "timestamp6_col TIMESTAMP(6)," +
+                               "timestamp9_col TIMESTAMP(9)," +
+                               "time_col TIME," +
+                               "real_col FLOAT," +
+                               "double_col DOUBLE," +
+                               "decimal_col DECIMAL(10, 4)" +
+                               ") WITH (" +
+                               "  'connector'='jdbc'," +
+                               "  'url'='jdbc:derby:memory:test'," +
+                               "  'table-name'='test_table'" +
+                               ")"
+               );
+               util.verifyPlan("SELECT decimal_col, timestamp9_col, id FROM 
jdbc");
+       }
+
+}
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
index 277191c..8115696 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
@@ -44,7 +44,6 @@ import java.util.stream.StreamSupport;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 
-
 /**
  * ITCase for {@link JdbcTableSource}.
  */
diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
 
b/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
new file mode 100644
index 0000000..9219fc8
--- /dev/null
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/resources/org/apache/flink/connector/jdbc/table/JdbcTablePlanTest.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testProjectionPushDown">
+    <Resource name="sql">
+      <![CDATA[SELECT decimal_col, timestamp9_col, id FROM jdbc]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(decimal_col=[$6], timestamp9_col=[$2], id=[$0])
++- LogicalTableScan(table=[[default_catalog, default_database, jdbc]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+TableSourceScan(table=[[default_catalog, default_database, jdbc, 
project=[decimal_col, timestamp9_col, id]]], fields=[decimal_col, 
timestamp9_col, id])
+]]>
+    </Resource>
+  </TestCase>
+</Root>

Reply via email to