libenchao commented on a change in pull request #10745:
[FLINK-15445][connectors/jdbc] JDBC Table Source didn't work for Type…
URL: https://github.com/apache/flink/pull/10745#discussion_r376680959
##########
File path:
flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCTableSourceITCase.java
##########
@@ -18,93 +18,167 @@
package org.apache.flink.api.java.io.jdbc;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-import java.util.ArrayList;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
import java.util.List;
+
/**
- * IT case for {@link JDBCTableSource}.
+ * ITCase for {@link JDBCTableSource}.
*/
-public class JDBCTableSourceITCase extends JDBCTestBase {
-
- private static final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- private static final EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
- private static final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env, bsSettings);
-
- static final String TABLE_SOURCE_SQL = "CREATE TABLE books (" +
- " id int, " +
- " title varchar, " +
- " author varchar, " +
- " price double, " +
- " qty int " +
- ") with (" +
- " 'connector.type' = 'jdbc', " +
- " 'connector.url' = 'jdbc:derby:memory:ebookshop', " +
- " 'connector.table' = 'books', " +
- " 'connector.driver' = 'org.apache.derby.jdbc.EmbeddedDriver' "
+
- ")";
-
- @BeforeClass
- public static void createTable() {
- tEnv.sqlUpdate(TABLE_SOURCE_SQL);
+public class JDBCTableSourceITCase extends AbstractTestBase {
+
+ public static final String DRIVER_CLASS =
"org.apache.derby.jdbc.EmbeddedDriver";
+ public static final String DB_URL = "jdbc:derby:memory:test";
+ public static final String INPUT_TABLE = "jdbcSource";
+
+ @Before
+ public void before() throws ClassNotFoundException, SQLException {
+ System.setProperty("derby.stream.error.field",
JDBCTestBase.class.getCanonicalName() + ".DEV_NULL");
+ Class.forName(DRIVER_CLASS);
+
+ try (
+ Connection conn = DriverManager.getConnection(DB_URL +
";create=true");
+ Statement statement = conn.createStatement()) {
+ statement.executeUpdate("CREATE TABLE " + INPUT_TABLE +
" (" +
+ "id BIGINT NOT NULL," +
+ "timestamp6_col TIMESTAMP, " +
+ "timestamp9_col TIMESTAMP, " +
+ "time_col TIME, " +
+ "real_col FLOAT(23), " + // A
precision of 23 or less makes FLOAT equivalent to REAL.
+ "double_col FLOAT(24)," + // A
precision of 24 or greater makes FLOAT equivalent to DOUBLE PRECISION.
+ "decimal_col DECIMAL(10, 4))");
+ statement.executeUpdate("INSERT INTO " + INPUT_TABLE +
" VALUES (" +
+ "1, TIMESTAMP('2020-01-01
15:35:00.123456'), TIMESTAMP('2020-01-01 15:35:00.123456789'), " +
+ "TIME('15:35:00'), 1.175E-37,
1.79769E+308, 100.1234)");
+ statement.executeUpdate("INSERT INTO " + INPUT_TABLE +
" VALUES (" +
+ "2, TIMESTAMP('2020-01-01
15:36:01.123456'), TIMESTAMP('2020-01-01 15:36:01.123456789'), " +
+ "TIME('15:36:01'), -1.175E-37,
-1.79769E+308, 101.1234)");
+ }
+ }
+
+ @After
+ public void clearOutputTable() throws Exception {
+ Class.forName(DRIVER_CLASS);
+ try (
+ Connection conn = DriverManager.getConnection(DB_URL);
+ Statement stat = conn.createStatement()) {
+ stat.execute("DROP TABLE " + INPUT_TABLE);
+ }
}
@Test
- public void testFieldsProjection() throws Exception {
- StreamITCase.clear();
+ public void testJDBCSource() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ EnvironmentSettings envSettings =
EnvironmentSettings.newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env, envSettings);
+
+ tEnv.sqlUpdate(
+ "CREATE TABLE " + INPUT_TABLE + "(" +
+ "id BIGINT," +
+ "timestamp6_col TIMESTAMP(6)," +
+ "timestamp9_col TIMESTAMP(9)," +
+ "time_col TIME," +
+ "real_col FLOAT," +
+ "double_col DOUBLE," +
+ "decimal_col DECIMAL(10, 4)" +
+ ") WITH (" +
+ " 'connector.type'='jdbc'," +
+ " 'connector.url'='" + DB_URL + "'," +
+ " 'connector.table'='" + INPUT_TABLE + "'" +
+ ")"
+ );
- Table result = tEnv.sqlQuery(SELECT_ID_BOOKS);
- DataStream<Row> resultSet = tEnv.toAppendStream(result,
Row.class);
- resultSet.addSink(new StreamITCase.StringSink<>());
+ StreamITCase.clear();
+ tEnv.toAppendStream(tEnv.sqlQuery("SELECT * FROM " +
INPUT_TABLE), Row.class)
+ .addSink(new StreamITCase.StringSink<>());
env.execute();
- List<String> expected = new ArrayList<>();
- expected.add("1001");
- expected.add("1002");
- expected.add("1003");
- expected.add("1004");
- expected.add("1005");
- expected.add("1006");
- expected.add("1007");
- expected.add("1008");
- expected.add("1009");
- expected.add("1010");
-
+ List<String> expected =
+ Arrays.asList(
+
"1,2020-01-01T15:35:00.123456,2020-01-01T15:35:00.123456789,15:35,1.175E-37,1.79769E308,100.1234",
+
"2,2020-01-01T15:36:01.123456,2020-01-01T15:36:01.123456789,15:36:01,-1.175E-37,-1.79769E308,101.1234");
StreamITCase.compareWithList(expected);
}
@Test
- public void testAllFieldsSelection() throws Exception {
- StreamITCase.clear();
+ public void testProjectableJDBCSource() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ EnvironmentSettings envSettings =
EnvironmentSettings.newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env, envSettings);
+
+ tEnv.sqlUpdate(
+ "CREATE TABLE " + INPUT_TABLE + "(" +
+ "id BIGINT," +
+ "timestamp6_col TIMESTAMP(6)," +
+ "timestamp9_col TIMESTAMP(9)," +
+ "time_col TIME," +
+ "real_col FLOAT," +
+ "decimal_col DECIMAL(10, 4)" +
+ ") WITH (" +
+ " 'connector.type'='jdbc'," +
+ " 'connector.url'='" + DB_URL + "'," +
+ " 'connector.table'='" + INPUT_TABLE + "'" +
+ ")"
+ );
- Table result = tEnv.sqlQuery(SELECT_ALL_BOOKS);
- DataStream<Row> resultSet = tEnv.toAppendStream(result,
Row.class);
- resultSet.addSink(new StreamITCase.StringSink<>());
+ StreamITCase.clear();
+ tEnv.toAppendStream(tEnv.sqlQuery("SELECT timestamp6_col,
decimal_col FROM " + INPUT_TABLE), Row.class)
+ .addSink(new StreamITCase.StringSink<>());
env.execute();
- List<String> expected = new ArrayList<>();
- expected.add("1001,Java public for dummies,Tan Ah
Teck,11.11,11");
- expected.add("1002,More Java for dummies,Tan Ah Teck,22.22,22");
- expected.add("1003,More Java for more dummies,Mohammad
Ali,33.33,33");
- expected.add("1004,A Cup of Java,Kumar,44.44,44");
- expected.add("1005,A Teaspoon of Java,Kevin Jones,55.55,55");
- expected.add("1006,A Teaspoon of Java 1.4,Kevin
Jones,66.66,66");
- expected.add("1007,A Teaspoon of Java 1.5,Kevin
Jones,77.77,77");
- expected.add("1008,A Teaspoon of Java 1.6,Kevin
Jones,88.88,88");
- expected.add("1009,A Teaspoon of Java 1.7,Kevin
Jones,99.99,99");
- expected.add("1010,A Teaspoon of Java 1.8,Kevin
Jones,null,1010");
-
+ List<String> expected =
+ Arrays.asList(
+ "2020-01-01T15:35:00.123456,100.1234",
+ "2020-01-01T15:36:01.123456,101.1234");
StreamITCase.compareWithList(expected);
}
+ @Test(expected = TableException.class)
+ public void testInvalidPrecisionOfJDBCSource() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ EnvironmentSettings envSettings =
EnvironmentSettings.newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env, envSettings);
+
+ tEnv.sqlUpdate(
+ "CREATE TABLE " + INPUT_TABLE + "(" +
Review comment:
seems indent is not correct.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services