wuchong commented on a change in pull request #10745: 
[FLINK-15445][connectors/jdbc] JDBC Table Source didn't work for Type…
URL: https://github.com/apache/flink/pull/10745#discussion_r376198345
 
 

 ##########
 File path: 
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceITCase.java
 ##########
 @@ -18,93 +18,167 @@
 
 package org.apache.flink.api.java.io.jdbc;
 
-import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayList;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
 import java.util.List;
 
+
 /**
- * IT case for {@link JDBCTableSource}.
+ * ITCase for {@link JDBCTableSource}.
  */
-public class JDBCTableSourceITCase extends JDBCTestBase {
-
-       private static final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-       private static final EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
-       private static final StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(env, bsSettings);
-
-       static final String TABLE_SOURCE_SQL = "CREATE TABLE books (" +
-               " id int, " +
-               " title varchar, " +
-               " author varchar, " +
-               " price double, " +
-               " qty int " +
-               ") with (" +
-               " 'connector.type' = 'jdbc', " +
-               " 'connector.url' = 'jdbc:derby:memory:ebookshop', " +
-               " 'connector.table' = 'books', " +
-               " 'connector.driver' = 'org.apache.derby.jdbc.EmbeddedDriver' " 
+
-               ")";
-
-       @BeforeClass
-       public static void createTable() {
-               tEnv.sqlUpdate(TABLE_SOURCE_SQL);
+public class JDBCTableSourceITCase extends AbstractTestBase {
+
+       public static final String DRIVER_CLASS = 
"org.apache.derby.jdbc.EmbeddedDriver";
+       public static final String DB_URL = "jdbc:derby:memory:test";
+       public static final String INPUT_TABLE = "jdbcSource";
+
+       @Before
+       public void before() throws ClassNotFoundException, SQLException {
+               System.setProperty("derby.stream.error.field", 
JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
+               Class.forName(DRIVER_CLASS);
+
+               try (
+                       Connection conn = DriverManager.getConnection(DB_URL + 
";create=true");
+                       Statement statement = conn.createStatement()) {
+                       statement.executeUpdate("CREATE TABLE " + INPUT_TABLE + 
" (" +
+                                       "id BIGINT NOT NULL," +
+                                       "timestamp6_col TIMESTAMP, " +
+                                       "timestamp9_col TIMESTAMP, " +
+                                       "time_col TIME, " +
+                                       "real_col FLOAT(23), " +    // A 
precision of 23 or less makes FLOAT equivalent to REAL.
+                                       "double_col FLOAT(24)," +   // A 
precision of 24 or greater makes FLOAT equivalent to DOUBLE PRECISION.
+                                       "decimal_col DECIMAL(10, 4))");
+                       statement.executeUpdate("INSERT INTO " + INPUT_TABLE + 
" VALUES (" +
+                                       "1, TIMESTAMP('2020-01-01 
15:35:00.123456'), TIMESTAMP('2020-01-01 15:35:00.123456789'), " +
+                                       "TIME('15:35:00'), 1.175E-37, 
1.79769E+308, 100.1234)");
+                       statement.executeUpdate("INSERT INTO " + INPUT_TABLE + 
" VALUES (" +
+                                       "2, TIMESTAMP('2020-01-01 
15:36:01.123456'), TIMESTAMP('2020-01-01 15:36:01.123456789'), " +
+                                       "TIME('15:36:01'), -1.175E-37, 
-1.79769E+308, 101.1234)");
+               }
+       }
+
+       @After
+       public void clearOutputTable() throws Exception {
+               Class.forName(DRIVER_CLASS);
+               try (
+                       Connection conn = DriverManager.getConnection(DB_URL);
+                       Statement stat = conn.createStatement()) {
+                       stat.execute("DROP TABLE " + INPUT_TABLE);
+               }
        }
 
        @Test
-       public void testFieldsProjection() throws Exception {
-               StreamITCase.clear();
+       public void testJDBCSource() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               EnvironmentSettings envSettings = 
EnvironmentSettings.newInstance()
+                       .useBlinkPlanner()
+                       .inStreamingMode()
+                       .build();
+               StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(env, envSettings);
+
+               tEnv.sqlUpdate(
+                       "CREATE TABLE " + INPUT_TABLE + "(" +
+                               "id BIGINT," +
+                               "timestamp6_col TIMESTAMP(6)," +
+                               "timestamp9_col TIMESTAMP(9)," +
+                               "time_col TIME," +
+                               "real_col FLOAT," +
+                               "double_col DOUBLE," +
+                               "decimal_col DECIMAL(10, 4)" +
+                               ") WITH (" +
+                               "  'connector.type'='jdbc'," +
+                               "  'connector.url'='" + DB_URL + "'," +
+                               "  'connector.table'='" + INPUT_TABLE + "'" +
+                               ")"
+               );
 
-               Table result = tEnv.sqlQuery(SELECT_ID_BOOKS);
-               DataStream<Row> resultSet = tEnv.toAppendStream(result, 
Row.class);
-               resultSet.addSink(new StreamITCase.StringSink<>());
+               StreamITCase.clear();
+               tEnv.toAppendStream(tEnv.sqlQuery("SELECT * FROM " + 
INPUT_TABLE), Row.class)
+                       .addSink(new StreamITCase.StringSink<>());
                env.execute();
 
-               List<String> expected = new ArrayList<>();
-               expected.add("1001");
-               expected.add("1002");
-               expected.add("1003");
-               expected.add("1004");
-               expected.add("1005");
-               expected.add("1006");
-               expected.add("1007");
-               expected.add("1008");
-               expected.add("1009");
-               expected.add("1010");
-
+               List<String> expected =
+                       Arrays.asList(
+                               
"1,2020-01-01T15:35:00.123456,2020-01-01T15:35:00.123456789,15:35,1.175E-37,1.79769E308,100.1234",
+                               
"2,2020-01-01T15:36:01.123456,2020-01-01T15:36:01.123456789,15:36:01,-1.175E-37,-1.79769E308,101.1234");
                StreamITCase.compareWithList(expected);
        }
 
        @Test
-       public void testAllFieldsSelection() throws Exception {
-               StreamITCase.clear();
+       public void testProjectableJDBCSource() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               EnvironmentSettings envSettings = 
EnvironmentSettings.newInstance()
+                               .useBlinkPlanner()
+                               .inStreamingMode()
+                               .build();
+               StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(env, envSettings);
+
+               tEnv.sqlUpdate(
+                       "CREATE TABLE " + INPUT_TABLE + "(" +
+                               "id BIGINT," +
+                               "timestamp6_col TIMESTAMP(6)," +
+                               "timestamp9_col TIMESTAMP(9)," +
+                               "time_col TIME," +
+                               "real_col FLOAT," +
+                               "decimal_col DECIMAL(10, 4)" +
+                               ") WITH (" +
+                               "  'connector.type'='jdbc'," +
+                               "  'connector.url'='" + DB_URL + "'," +
+                               "  'connector.table'='" + INPUT_TABLE + "'" +
+                               ")"
+               );
 
-               Table result = tEnv.sqlQuery(SELECT_ALL_BOOKS);
-               DataStream<Row> resultSet = tEnv.toAppendStream(result, 
Row.class);
-               resultSet.addSink(new StreamITCase.StringSink<>());
+               StreamITCase.clear();
+               tEnv.toAppendStream(tEnv.sqlQuery("SELECT timestamp6_col, 
decimal_col FROM " + INPUT_TABLE), Row.class)
+                               .addSink(new StreamITCase.StringSink<>());
                env.execute();
 
-               List<String> expected = new ArrayList<>();
-               expected.add("1001,Java public for dummies,Tan Ah 
Teck,11.11,11");
-               expected.add("1002,More Java for dummies,Tan Ah Teck,22.22,22");
-               expected.add("1003,More Java for more dummies,Mohammad 
Ali,33.33,33");
-               expected.add("1004,A Cup of Java,Kumar,44.44,44");
-               expected.add("1005,A Teaspoon of Java,Kevin Jones,55.55,55");
-               expected.add("1006,A Teaspoon of Java 1.4,Kevin 
Jones,66.66,66");
-               expected.add("1007,A Teaspoon of Java 1.5,Kevin 
Jones,77.77,77");
-               expected.add("1008,A Teaspoon of Java 1.6,Kevin 
Jones,88.88,88");
-               expected.add("1009,A Teaspoon of Java 1.7,Kevin 
Jones,99.99,99");
-               expected.add("1010,A Teaspoon of Java 1.8,Kevin 
Jones,null,1010");
-
+               List<String> expected =
+                       Arrays.asList(
+                               "2020-01-01T15:35:00.123456,100.1234",
+                               "2020-01-01T15:36:01.123456,101.1234");
                StreamITCase.compareWithList(expected);
        }
 
+       @Test(expected = TableException.class)
+       public void testInvalidPrecisionOfJDBCSource() throws Exception {
 
 Review comment:
   You can create a unit test `JDBCDataTypeTest` to verify all the types with 
different precision with different dialects. This doesn't involve a job 
submission, and is a lightweight unit test. You can take `FlinkDDLDataTypeTest` 
as an example.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to