This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ecc464a51e87814201b650765e3971ef9ff0a2a1
Author: Aljoscha Krettek <[email protected]>
AuthorDate: Tue May 19 17:40:26 2020 +0200

    [FLINK-17361] Refactor JdbcTableSourceITCase to use TableResult instead of 
StreamITCase
    
    Using the static sink approach of StreamITCase is potentially
    problematic with concurrency, plus the code is just plain nicer like
    this.
---
 .../jdbc/table/JdbcTableSourceITCase.java          | 85 ++++++++++++----------
 1 file changed, 48 insertions(+), 37 deletions(-)

diff --git 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
index fa8d98a..277191c 100644
--- 
a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
+++ 
b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcTableSourceITCase.java
@@ -21,8 +21,8 @@ package org.apache.flink.connector.jdbc.table;
 import org.apache.flink.connector.jdbc.JdbcTestBase;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
-import org.apache.flink.table.runtime.utils.StreamITCase;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.types.Row;
 
@@ -34,8 +34,15 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
 
 
 /**
@@ -107,20 +114,19 @@ public class JdbcTableSourceITCase extends 
AbstractTestBase {
                                ")"
                );
 
-               StreamITCase.clear();
-               tEnv.toAppendStream(tEnv.sqlQuery("SELECT * FROM " + 
INPUT_TABLE), Row.class)
-                       .addSink(new StreamITCase.StringSink<>());
-               env.execute();
+               TableResult tableResult = tEnv.executeSql("SELECT * FROM " + 
INPUT_TABLE);
+
+               List<String> results = manifestResults(tableResult);
 
-               List<String> expected =
-                       Arrays.asList(
-                               
"1,2020-01-01T15:35:00.123456,2020-01-01T15:35:00.123456789,15:35,1.175E-37,1.79769E308,100.1234",
-                               
"2,2020-01-01T15:36:01.123456,2020-01-01T15:36:01.123456789,15:36:01,-1.175E-37,-1.79769E308,101.1234");
-               StreamITCase.compareWithList(expected);
+               assertThat(
+                               results,
+                               containsInAnyOrder(
+                                               
"1,2020-01-01T15:35:00.123456,2020-01-01T15:35:00.123456789,15:35,1.175E-37,1.79769E308,100.1234",
+                                               
"2,2020-01-01T15:36:01.123456,2020-01-01T15:36:01.123456789,15:36:01,-1.175E-37,-1.79769E308,101.1234"));
        }
 
        @Test
-       public void testProjectableJdbcSource() throws Exception {
+       public void testProjectableJdbcSource() {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                EnvironmentSettings envSettings = 
EnvironmentSettings.newInstance()
                                .useBlinkPlanner()
@@ -143,20 +149,19 @@ public class JdbcTableSourceITCase extends 
AbstractTestBase {
                                ")"
                );
 
-               StreamITCase.clear();
-               tEnv.toAppendStream(tEnv.sqlQuery("SELECT timestamp6_col, 
decimal_col FROM " + INPUT_TABLE), Row.class)
-                               .addSink(new StreamITCase.StringSink<>());
-               env.execute();
+               TableResult tableResult = tEnv.executeSql("SELECT 
timestamp6_col, decimal_col FROM " + INPUT_TABLE);
+
+               List<String> results = manifestResults(tableResult);
 
-               List<String> expected =
-                       Arrays.asList(
-                               "2020-01-01T15:35:00.123456,100.1234",
-                               "2020-01-01T15:36:01.123456,101.1234");
-               StreamITCase.compareWithList(expected);
+               assertThat(
+                               results,
+                               containsInAnyOrder(
+                                               
"2020-01-01T15:35:00.123456,100.1234",
+                                               
"2020-01-01T15:36:01.123456,101.1234"));
        }
 
        @Test
-       public void testScanQueryJDBCSource() throws Exception {
+       public void testScanQueryJDBCSource() {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                EnvironmentSettings envSettings = 
EnvironmentSettings.newInstance()
                        .useBlinkPlanner()
@@ -165,23 +170,29 @@ public class JdbcTableSourceITCase extends 
AbstractTestBase {
                StreamTableEnvironment tEnv = 
StreamTableEnvironment.create(env, envSettings);
 
                final String testQuery = "SELECT id FROM " + INPUT_TABLE;
-               tEnv.sqlUpdate(
-                       "CREATE TABLE test(" +
-                               "id BIGINT" +
-                               ") WITH (" +
-                               "  'connector.type'='jdbc'," +
-                               "  'connector.url'='" + DB_URL + "'," +
-                               "  'connector.table'='whatever'," +
-                               "  'connector.read.query'='" + testQuery + "'" +
-                               ")"
+               tEnv.executeSql(
+                               "CREATE TABLE test(" +
+                                               "id BIGINT" +
+                                               ") WITH (" +
+                                               "  'connector.type'='jdbc'," +
+                                               "  'connector.url'='" + DB_URL 
+ "'," +
+                                               "  
'connector.table'='whatever'," +
+                                               "  'connector.read.query'='" + 
testQuery + "'" +
+                                               ")"
                );
 
-               StreamITCase.clear();
-               tEnv.toAppendStream(tEnv.sqlQuery("SELECT id FROM test"), 
Row.class)
-                       .addSink(new StreamITCase.StringSink<>());
-               env.execute();
+               TableResult tableResult = tEnv.executeSql("SELECT id FROM 
test");
+
+               List<String> results = manifestResults(tableResult);
+
+               assertThat(results, containsInAnyOrder("1", "2"));
+       }
 
-               List<String> expected = Arrays.asList("1", "2");
-               StreamITCase.compareWithList(expected);
+       private static List<String> manifestResults(TableResult result) {
+               Iterator<Row> resultIterator = result.collect();
+               return StreamSupport
+                               
.stream(Spliterators.spliteratorUnknownSize(resultIterator, 
Spliterator.ORDERED), false)
+                               .map(Row::toString)
+                               .collect(Collectors.toList());
        }
 }

Reply via email to