This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 88871b2ab53bf4d212e10a04f076c27d0d4b3327
Author: Jark Wu <j...@apache.org>
AuthorDate: Thu Jun 4 20:02:21 2020 +0800

    [FLINK-18029][kafka] Add more ITCases for Kafka with new formats (avro, 
csv, json)
    
    This closes #12471
---
 .../flink-connector-kafka-0.10/pom.xml             | 13 ++++
 .../flink-connector-kafka-0.11/pom.xml             | 13 ++++
 .../connectors/kafka/table/KafkaTableTestBase.java | 81 ++++++++++++++++------
 flink-connectors/flink-connector-kafka/pom.xml     | 13 ++++
 4 files changed, 100 insertions(+), 20 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml 
b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index 5a4e613..f9f34c8 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -204,12 +204,25 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <!-- Kafka SQL IT test with formats -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-json</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-avro</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-csv</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
 
                <dependency>
                        <!-- Required for 
org.apache.flink.streaming.connectors.kafka.Kafka010SecuredRunITCase -->
diff --git a/flink-connectors/flink-connector-kafka-0.11/pom.xml 
b/flink-connectors/flink-connector-kafka-0.11/pom.xml
index 394c707..c7c3e06 100644
--- a/flink-connectors/flink-connector-kafka-0.11/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.11/pom.xml
@@ -204,12 +204,25 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <!-- Kafka SQL IT test with formats -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-json</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-avro</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-csv</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
 
        </dependencies>
 
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
index c1dc7ce..49d0269 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
@@ -46,17 +46,26 @@ import static org.junit.Assert.assertEquals;
 @RunWith(Parameterized.class)
 public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink {
 
+       private static final String JSON_FORMAT = "json";
+       private static final String AVRO_FORMAT = "avro";
+       private static final String CSV_FORMAT = "csv";
+
        @Parameterized.Parameter
        public boolean isLegacyConnector;
 
        @Parameterized.Parameter(1)
-       public int topicID;
+       public String format;
 
-       @Parameterized.Parameters(name = "legacy = {0}, topicId = {1}")
+       @Parameterized.Parameters(name = "legacy = {0}, format = {1}")
        public static Object[] parameters() {
                return new Object[][]{
-                       new Object[]{true, 0},
-                       new Object[]{false, 1}
+                       // cover all 3 formats for new and old connector
+                       new Object[]{false, JSON_FORMAT},
+                       new Object[]{false, AVRO_FORMAT},
+                       new Object[]{false, CSV_FORMAT},
+                       new Object[]{true, JSON_FORMAT},
+                       new Object[]{true, AVRO_FORMAT},
+                       new Object[]{true, CSV_FORMAT}
                };
        }
 
@@ -87,7 +96,9 @@ public abstract class KafkaTableTestBase extends 
KafkaTestBaseWithFlink {
 
        @Test
        public void testKafkaSourceSink() throws Exception {
-               final String topic = "tstopic" + topicID;
+               // we always use a different topic name for each parameterized 
topic,
+               // in order to make sure the topic can be created.
+               final String topic = "tstopic_" + format + "_" + 
isLegacyConnector;
                createTestTopic(topic, 1, 1);
 
                // ---------- Produce an event time stream into Kafka 
-------------------
@@ -101,6 +112,8 @@ public abstract class KafkaTableTestBase extends 
KafkaTestBaseWithFlink {
                                        "  `computed-price` as price + 1.0,\n" +
                                        "  price decimal(38, 18),\n" +
                                        "  currency string,\n" +
+                                       "  log_date date,\n" +
+                                       "  log_time time(3),\n" +
                                        "  log_ts timestamp(3),\n" +
                                        "  ts as log_ts + INTERVAL '1' 
SECOND,\n" +
                                        "  watermark for ts as ts\n" +
@@ -110,18 +123,21 @@ public abstract class KafkaTableTestBase extends 
KafkaTestBaseWithFlink {
                                        "  'properties.bootstrap.servers' = 
'%s',\n" +
                                        "  'properties.group.id' = '%s',\n" +
                                        "  'scan.startup.mode' = 
'earliest-offset',\n" +
-                                       "  'format' = 'json'\n" +
+                                       "  %s\n" +
                                        ")",
                                factoryIdentifier(),
                                topic,
                                bootstraps,
-                               groupId);
+                               groupId,
+                               formatOptions());
                } else {
                        createTable = String.format(
                                "create table kafka (\n" +
                                        "  `computed-price` as price + 1.0,\n" +
                                        "  price decimal(38, 18),\n" +
                                        "  currency string,\n" +
+                                       "  log_date date,\n" +
+                                       "  log_time time(3),\n" +
                                        "  log_ts timestamp(3),\n" +
                                        "  ts as log_ts + INTERVAL '1' 
SECOND,\n" +
                                        "  watermark for ts as ts\n" +
@@ -132,32 +148,36 @@ public abstract class KafkaTableTestBase extends 
KafkaTestBaseWithFlink {
                                        "  
'connector.properties.bootstrap.servers' = '%s',\n" +
                                        "  'connector.properties.group.id' = 
'%s',\n" +
                                        "  'connector.startup-mode' = 
'earliest-offset',\n" +
-                                       "  'format.type' = 'json',\n" +
-                                       "  'update-mode' = 'append'\n" +
+                                       "  'update-mode' = 'append',\n" +
+                                       "  %s\n" +
                                        ")",
                                kafkaVersion(),
                                topic,
                                bootstraps,
-                               groupId);
+                               groupId,
+                               formatOptions());
                }
 
                tEnv.executeSql(createTable);
 
                String initialValues = "INSERT INTO kafka\n" +
-                       "SELECT CAST(price AS DECIMAL(10, 2)), currency, 
CAST(ts AS TIMESTAMP(3))\n" +
-                       "FROM (VALUES (2.02,'Euro','2019-12-12 
00:00:00.001001'), \n" +
-                       "  (1.11,'US Dollar','2019-12-12 00:00:01.002001'), \n" 
+
-                       "  (50,'Yen','2019-12-12 00:00:03.004001'), \n" +
-                       "  (3.1,'Euro','2019-12-12 00:00:04.005001'), \n" +
-                       "  (5.33,'US Dollar','2019-12-12 00:00:05.006001'), \n" 
+
-                       "  (0,'DUMMY','2019-12-12 00:00:10'))\n" +
-                       "  AS orders (price, currency, ts)";
+                       "SELECT CAST(price AS DECIMAL(10, 2)), currency, " +
+                       " CAST(d AS DATE), CAST(t AS TIME(0)), CAST(ts AS 
TIMESTAMP(3))\n" +
+                       "FROM (VALUES (2.02,'Euro','2019-12-12', '00:00:01', 
'2019-12-12 00:00:01.001001'), \n" +
+                       "  (1.11,'US Dollar','2019-12-12', '00:00:02', 
'2019-12-12 00:00:02.002001'), \n" +
+                       "  (50,'Yen','2019-12-12', '00:00:03', '2019-12-12 
00:00:03.004001'), \n" +
+                       "  (3.1,'Euro','2019-12-12', '00:00:04', '2019-12-12 
00:00:04.005001'), \n" +
+                       "  (5.33,'US Dollar','2019-12-12', '00:00:05', 
'2019-12-12 00:00:05.006001'), \n" +
+                       "  (0,'DUMMY','2019-12-12', '00:00:10', '2019-12-12 
00:00:10'))\n" +
+                       "  AS orders (price, currency, d, t, ts)";
                TableEnvUtil.execInsertSqlAndWaitResult(tEnv, initialValues);
 
                // ---------- Consume stream from Kafka -------------------
 
                String query = "SELECT\n" +
                        "  CAST(TUMBLE_END(ts, INTERVAL '5' SECOND) AS 
VARCHAR),\n" +
+                       "  CAST(MAX(log_date) AS VARCHAR),\n" +
+                       "  CAST(MAX(log_time) AS VARCHAR),\n" +
                        "  CAST(MAX(ts) AS VARCHAR),\n" +
                        "  COUNT(*),\n" +
                        "  CAST(MAX(price) AS DECIMAL(10, 2))\n" +
@@ -180,8 +200,8 @@ public abstract class KafkaTableTestBase extends 
KafkaTestBaseWithFlink {
                }
 
                List<String> expected = Arrays.asList(
-                       "+I(2019-12-12 00:00:05.000,2019-12-12 
00:00:04.004,3,50.00)",
-                       "+I(2019-12-12 00:00:10.000,2019-12-12 
00:00:06.006,2,5.33)");
+                       "+I(2019-12-12 
00:00:05.000,2019-12-12,00:00:03,2019-12-12 00:00:04.004,3,50.00)",
+                       "+I(2019-12-12 
00:00:10.000,2019-12-12,00:00:05,2019-12-12 00:00:06.006,2,5.33)");
 
                assertEquals(expected, TestingSinkFunction.rows);
 
@@ -190,6 +210,27 @@ public abstract class KafkaTableTestBase extends 
KafkaTestBaseWithFlink {
                deleteTestTopic(topic);
        }
 
+       private String formatOptions() {
+               if (!isLegacyConnector) {
+                       return String.format("'format' = '%s'", format);
+               } else {
+                       String formatType = String.format("'format.type' = 
'%s'", format);
+                       if (format.equals(AVRO_FORMAT)) {
+                               // legacy connector requires to specify 
avro-schema
+                               String avroSchema = 
"{\"type\":\"record\",\"name\":\"row_0\",\"fields\":" +
+                                       
"[{\"name\":\"price\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\","
 +
+                                       
"\"precision\":38,\"scale\":18}},{\"name\":\"currency\",\"type\":[\"string\"," +
+                                       
"\"null\"]},{\"name\":\"log_date\",\"type\":{\"type\":\"int\",\"logicalType\":" 
+
+                                       
"\"date\"}},{\"name\":\"log_time\",\"type\":{\"type\":\"int\",\"logicalType\":" 
+
+                                       
"\"time-millis\"}},{\"name\":\"log_ts\",\"type\":{\"type\":\"long\"," +
+                                       
"\"logicalType\":\"timestamp-millis\"}}]}";
+                               return formatType + String.format(", 
'format.avro-schema' = '%s'", avroSchema);
+                       } else {
+                               return formatType;
+                       }
+               }
+       }
+
        private static final class TestingSinkFunction implements 
SinkFunction<RowData> {
 
                private static final long serialVersionUID = 
455430015321124493L;
diff --git a/flink-connectors/flink-connector-kafka/pom.xml 
b/flink-connectors/flink-connector-kafka/pom.xml
index 58acb57..4430c0f 100644
--- a/flink-connectors/flink-connector-kafka/pom.xml
+++ b/flink-connectors/flink-connector-kafka/pom.xml
@@ -196,12 +196,25 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <!-- Kafka SQL IT test with formats -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-json</artifactId>
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-avro</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-csv</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
 
        </dependencies>
 

Reply via email to