This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 88871b2ab53bf4d212e10a04f076c27d0d4b3327 Author: Jark Wu <j...@apache.org> AuthorDate: Thu Jun 4 20:02:21 2020 +0800 [FLINK-18029][kafka] Add more ITCases for Kafka with new formats (avro, csv, json) This closes #12471 --- .../flink-connector-kafka-0.10/pom.xml | 13 ++++ .../flink-connector-kafka-0.11/pom.xml | 13 ++++ .../connectors/kafka/table/KafkaTableTestBase.java | 81 ++++++++++++++++------ flink-connectors/flink-connector-kafka/pom.xml | 13 ++++ 4 files changed, 100 insertions(+), 20 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml index 5a4e613..f9f34c8 100644 --- a/flink-connectors/flink-connector-kafka-0.10/pom.xml +++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml @@ -204,12 +204,25 @@ under the License. <scope>test</scope> </dependency> + <!-- Kafka SQL IT test with formats --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-csv</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> <dependency> <!-- Required for org.apache.flink.streaming.connectors.kafka.Kafka010SecuredRunITCase --> diff --git a/flink-connectors/flink-connector-kafka-0.11/pom.xml b/flink-connectors/flink-connector-kafka-0.11/pom.xml index 394c707..c7c3e06 100644 --- a/flink-connectors/flink-connector-kafka-0.11/pom.xml +++ b/flink-connectors/flink-connector-kafka-0.11/pom.xml @@ -204,12 +204,25 @@ under the License. <scope>test</scope> </dependency> + <!-- Kafka SQL IT test with formats --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-csv</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java index c1dc7ce..49d0269 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java @@ -46,17 +46,26 @@ import static org.junit.Assert.assertEquals; @RunWith(Parameterized.class) public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink { + private static final String JSON_FORMAT = "json"; + private static final String AVRO_FORMAT = "avro"; + private static final String CSV_FORMAT = "csv"; + @Parameterized.Parameter public boolean isLegacyConnector; @Parameterized.Parameter(1) - public int topicID; + public String format; - @Parameterized.Parameters(name = "legacy = {0}, topicId = {1}") + @Parameterized.Parameters(name = "legacy = {0}, format = {1}") public static Object[] parameters() { return new Object[][]{ - new Object[]{true, 0}, - new Object[]{false, 1} + // cover all 3 formats for new and old connector + new Object[]{false, JSON_FORMAT}, + new Object[]{false, AVRO_FORMAT}, + new Object[]{false, CSV_FORMAT}, + new Object[]{true, JSON_FORMAT}, + new Object[]{true, AVRO_FORMAT}, + new Object[]{true, CSV_FORMAT} }; } @@ -87,7 +96,9 @@ public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink { @Test public void testKafkaSourceSink() throws Exception { - final String topic = "tstopic" + topicID; + // we always use a different topic name for each parameterized topic, + // in order to make sure the topic can be created. + final String topic = "tstopic_" + format + "_" + isLegacyConnector; createTestTopic(topic, 1, 1); // ---------- Produce an event time stream into Kafka ------------------- @@ -101,6 +112,8 @@ public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink { " `computed-price` as price + 1.0,\n" + " price decimal(38, 18),\n" + " currency string,\n" + + " log_date date,\n" + + " log_time time(3),\n" + " log_ts timestamp(3),\n" + " ts as log_ts + INTERVAL '1' SECOND,\n" + " watermark for ts as ts\n" + @@ -110,18 +123,21 @@ public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink { " 'properties.bootstrap.servers' = '%s',\n" + " 'properties.group.id' = '%s',\n" + " 'scan.startup.mode' = 'earliest-offset',\n" + - " 'format' = 'json'\n" + + " %s\n" + ")", factoryIdentifier(), topic, bootstraps, - groupId); + groupId, + formatOptions()); } else { createTable = String.format( "create table kafka (\n" + " `computed-price` as price + 1.0,\n" + " price decimal(38, 18),\n" + " currency string,\n" + + " log_date date,\n" + + " log_time time(3),\n" + " log_ts timestamp(3),\n" + " ts as log_ts + INTERVAL '1' SECOND,\n" + " watermark for ts as ts\n" + @@ -132,32 +148,36 @@ public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink { " 'connector.properties.bootstrap.servers' = '%s',\n" + " 'connector.properties.group.id' = '%s',\n" + " 'connector.startup-mode' = 'earliest-offset',\n" + - " 'format.type' = 'json',\n" + - " 'update-mode' = 'append'\n" + + " 'update-mode' = 'append',\n" + + " %s\n" + ")", kafkaVersion(), topic, bootstraps, - groupId); + groupId, + formatOptions()); } tEnv.executeSql(createTable); String initialValues = "INSERT INTO kafka\n" + - "SELECT CAST(price AS DECIMAL(10, 2)), currency, CAST(ts AS TIMESTAMP(3))\n" + - "FROM (VALUES (2.02,'Euro','2019-12-12 00:00:00.001001'), \n" + - " (1.11,'US Dollar','2019-12-12 00:00:01.002001'), \n" + - " (50,'Yen','2019-12-12 00:00:03.004001'), \n" + - " (3.1,'Euro','2019-12-12 00:00:04.005001'), \n" + - " (5.33,'US Dollar','2019-12-12 00:00:05.006001'), \n" + - " (0,'DUMMY','2019-12-12 00:00:10'))\n" + - " AS orders (price, currency, ts)"; + "SELECT CAST(price AS DECIMAL(10, 2)), currency, " + + " CAST(d AS DATE), CAST(t AS TIME(0)), CAST(ts AS TIMESTAMP(3))\n" + + "FROM (VALUES (2.02,'Euro','2019-12-12', '00:00:01', '2019-12-12 00:00:01.001001'), \n" + + " (1.11,'US Dollar','2019-12-12', '00:00:02', '2019-12-12 00:00:02.002001'), \n" + + " (50,'Yen','2019-12-12', '00:00:03', '2019-12-12 00:00:03.004001'), \n" + + " (3.1,'Euro','2019-12-12', '00:00:04', '2019-12-12 00:00:04.005001'), \n" + + " (5.33,'US Dollar','2019-12-12', '00:00:05', '2019-12-12 00:00:05.006001'), \n" + + " (0,'DUMMY','2019-12-12', '00:00:10', '2019-12-12 00:00:10'))\n" + + " AS orders (price, currency, d, t, ts)"; TableEnvUtil.execInsertSqlAndWaitResult(tEnv, initialValues); // ---------- Consume stream from Kafka ------------------- String query = "SELECT\n" + " CAST(TUMBLE_END(ts, INTERVAL '5' SECOND) AS VARCHAR),\n" + + " CAST(MAX(log_date) AS VARCHAR),\n" + + " CAST(MAX(log_time) AS VARCHAR),\n" + " CAST(MAX(ts) AS VARCHAR),\n" + " COUNT(*),\n" + " CAST(MAX(price) AS DECIMAL(10, 2))\n" + @@ -180,8 +200,8 @@ public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink { } List<String> expected = Arrays.asList( - "+I(2019-12-12 00:00:05.000,2019-12-12 00:00:04.004,3,50.00)", - "+I(2019-12-12 00:00:10.000,2019-12-12 00:00:06.006,2,5.33)"); + "+I(2019-12-12 00:00:05.000,2019-12-12,00:00:03,2019-12-12 00:00:04.004,3,50.00)", + "+I(2019-12-12 00:00:10.000,2019-12-12,00:00:05,2019-12-12 00:00:06.006,2,5.33)"); assertEquals(expected, TestingSinkFunction.rows); @@ -190,6 +210,27 @@ public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink { deleteTestTopic(topic); } + private String formatOptions() { + if (!isLegacyConnector) { + return String.format("'format' = '%s'", format); + } else { + String formatType = String.format("'format.type' = '%s'", format); + if (format.equals(AVRO_FORMAT)) { + // legacy connector requires to specify avro-schema + String avroSchema = "{\"type\":\"record\",\"name\":\"row_0\",\"fields\":" + + "[{\"name\":\"price\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\"," + + "\"precision\":38,\"scale\":18}},{\"name\":\"currency\",\"type\":[\"string\"," + + "\"null\"]},{\"name\":\"log_date\",\"type\":{\"type\":\"int\",\"logicalType\":" + + "\"date\"}},{\"name\":\"log_time\",\"type\":{\"type\":\"int\",\"logicalType\":" + + "\"time-millis\"}},{\"name\":\"log_ts\",\"type\":{\"type\":\"long\"," + + "\"logicalType\":\"timestamp-millis\"}}]}"; + return formatType + String.format(", 'format.avro-schema' = '%s'", avroSchema); + } else { + return formatType; + } + } + } + private static final class TestingSinkFunction implements SinkFunction<RowData> { private static final long serialVersionUID = 455430015321124493L; diff --git a/flink-connectors/flink-connector-kafka/pom.xml b/flink-connectors/flink-connector-kafka/pom.xml index 58acb57..4430c0f 100644 --- a/flink-connectors/flink-connector-kafka/pom.xml +++ b/flink-connectors/flink-connector-kafka/pom.xml @@ -196,12 +196,25 @@ under the License. <scope>test</scope> </dependency> + <!-- Kafka SQL IT test with formats --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-csv</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies>