This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit ecc464a51e87814201b650765e3971ef9ff0a2a1 Author: Aljoscha Krettek <[email protected]> AuthorDate: Tue May 19 17:40:26 2020 +0200 [FLINK-17361] Refactor JdbcTableSourceITCase to use TableResult instead of StreamITCase Using the static sink approach of StreamITCase is potentially problematic with concurrency, plus the code is just plain nicer like this. --- .../jdbc/table/JdbcTableSourceITCase.java | 85 ++++++++++++---------- 1 file changed, 48 insertions(+), 37 deletions(-) diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java index fa8d98a..277191c 100644 --- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java +++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java @@ -21,8 +21,8 @@ package org.apache.flink.connector.jdbc.table; import org.apache.flink.connector.jdbc.JdbcTestBase; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.java.StreamTableEnvironment; -import org.apache.flink.table.runtime.utils.StreamITCase; import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.types.Row; @@ -34,8 +34,15 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; -import java.util.Arrays; +import java.util.Iterator; import java.util.List; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; /** @@ -107,20 +114,19 @@ public class JdbcTableSourceITCase extends AbstractTestBase { ")" ); - StreamITCase.clear(); - tEnv.toAppendStream(tEnv.sqlQuery("SELECT * FROM " + INPUT_TABLE), Row.class) - .addSink(new StreamITCase.StringSink<>()); - env.execute(); + TableResult tableResult = tEnv.executeSql("SELECT * FROM " + INPUT_TABLE); + + List<String> results = manifestResults(tableResult); - List<String> expected = - Arrays.asList( - "1,2020-01-01T15:35:00.123456,2020-01-01T15:35:00.123456789,15:35,1.175E-37,1.79769E308,100.1234", - "2,2020-01-01T15:36:01.123456,2020-01-01T15:36:01.123456789,15:36:01,-1.175E-37,-1.79769E308,101.1234"); - StreamITCase.compareWithList(expected); + assertThat( + results, + containsInAnyOrder( + "1,2020-01-01T15:35:00.123456,2020-01-01T15:35:00.123456789,15:35,1.175E-37,1.79769E308,100.1234", + "2,2020-01-01T15:36:01.123456,2020-01-01T15:36:01.123456789,15:36:01,-1.175E-37,-1.79769E308,101.1234")); } @Test - public void testProjectableJdbcSource() throws Exception { + public void testProjectableJdbcSource() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() @@ -143,20 +149,19 @@ public class JdbcTableSourceITCase extends AbstractTestBase { ")" ); - StreamITCase.clear(); - tEnv.toAppendStream(tEnv.sqlQuery("SELECT timestamp6_col, decimal_col FROM " + INPUT_TABLE), Row.class) - .addSink(new StreamITCase.StringSink<>()); - env.execute(); + TableResult tableResult = tEnv.executeSql("SELECT timestamp6_col, decimal_col FROM " + INPUT_TABLE); + + List<String> results = manifestResults(tableResult); - List<String> expected = - Arrays.asList( - "2020-01-01T15:35:00.123456,100.1234", - "2020-01-01T15:36:01.123456,101.1234"); - StreamITCase.compareWithList(expected); + assertThat( + results, + containsInAnyOrder( + "2020-01-01T15:35:00.123456,100.1234", + "2020-01-01T15:36:01.123456,101.1234")); } @Test - public void testScanQueryJDBCSource() throws Exception { + public void testScanQueryJDBCSource() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings envSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() @@ -165,23 +170,29 @@ public class JdbcTableSourceITCase extends AbstractTestBase { StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings); final String testQuery = "SELECT id FROM " + INPUT_TABLE; - tEnv.sqlUpdate( - "CREATE TABLE test(" + - "id BIGINT" + - ") WITH (" + - " 'connector.type'='jdbc'," + - " 'connector.url'='" + DB_URL + "'," + - " 'connector.table'='whatever'," + - " 'connector.read.query'='" + testQuery + "'" + - ")" + tEnv.executeSql( + "CREATE TABLE test(" + + "id BIGINT" + + ") WITH (" + + " 'connector.type'='jdbc'," + + " 'connector.url'='" + DB_URL + "'," + + " 'connector.table'='whatever'," + + " 'connector.read.query'='" + testQuery + "'" + + ")" ); - StreamITCase.clear(); - tEnv.toAppendStream(tEnv.sqlQuery("SELECT id FROM test"), Row.class) - .addSink(new StreamITCase.StringSink<>()); - env.execute(); + TableResult tableResult = tEnv.executeSql("SELECT id FROM test"); + + List<String> results = manifestResults(tableResult); + + assertThat(results, containsInAnyOrder("1", "2")); + } - List<String> expected = Arrays.asList("1", "2"); - StreamITCase.compareWithList(expected); + private static List<String> manifestResults(TableResult result) { + Iterator<Row> resultIterator = result.collect(); + return StreamSupport + .stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED), false) + .map(Row::toString) + .collect(Collectors.toList()); } }
