[ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379643#comment-17379643
 ] 

longwang0616 commented on FLINK-22969:
--------------------------------------

[~fsk119],[~luoyuxia]

I think Sql api can be verified like that, but datastream api is empty without 
verification. He didn't throw an exception when creating kafkasinkfunction or 
kafkasourcefunction, but waited until these Tasks were assigned to taskmanager 
by Jobmanager before running the open method. Isn't it a bit late? I think 
KafkaSourceFunction and KafkaSinkFunction can also be verified for datastream 
api. What do you think?

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-22969
>                 URL: https://issues.apache.org/jira/browse/FLINK-22969
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka, Table SQL / Ecosystem
>    Affects Versions: 1.14.0
>            Reporter: Shengkai Fang
>            Priority: Major
>              Labels: pull-request-available, starter
>         Attachments: image-2021-07-06-18-55-22-235.png, 
> image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, 
> image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, 
> image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, 
> image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png, 
> image-2021-07-06-19-07-27-936.png, image-2021-07-06-22-41-52-089.png, 
> image-2021-07-13-11-02-37-451.png, image-2021-07-13-11-03-30-740.png, 
> image-2021-07-13-11-15-06-977.png, image-2021-07-13-11-15-47-392.png, 
> image-2021-07-13-11-20-22-134.png, image-2021-07-13-11-37-27-870.png, 
> image-2021-07-13-11-39-20-010.png
>
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
>     public void testSourceSinkWithKeyAndPartialValue() throws Exception {
>         // we always use a different topic name for each parameterized topic,
>         // in order to make sure the topic can be created.
>         final String topic = "key_partial_value_topic_" + format;
>         createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
>         // ---------- Produce an event time stream into Kafka 
> -------------------
>         String bootstraps = standardProps.getProperty("bootstrap.servers");
>         // k_user_id and user_id have different data types to verify the 
> correct mapping,
>         // fields are reordered on purpose
>         final String createTable =
>                 String.format(
>                         "CREATE TABLE upsert_kafka (\n"
>                                 + "  `k_user_id` BIGINT,\n"
>                                 + "  `name` STRING,\n"
>                                 + "  `timestamp` TIMESTAMP(3) METADATA,\n"
>                                 + "  `k_event_id` BIGINT,\n"
>                                 + "  `user_id` INT,\n"
>                                 + "  `payload` STRING,\n"
>                                 + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
>                                 + ") WITH (\n"
>                                 + "  'connector' = 'upsert-kafka',\n"
>                                 + "  'topic' = '%s',\n"
>                                 + "  'properties.bootstrap.servers' = '%s',\n"
>                                 + "  'key.format' = '%s',\n"
>                                 + "  'key.fields-prefix' = 'k_',\n"
>                                 + "  'value.format' = '%s',\n"
>                                 + "  'value.fields-include' = 'EXCEPT_KEY'\n"
>                                 + ")",
>                         "", bootstraps, format, format);
>         tEnv.executeSql(createTable);
>         String initialValues =
>                 "INSERT INTO upsert_kafka\n"
>                         + "VALUES\n"
>                         + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
>                         + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
>                         + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
>                         + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
>         tEnv.executeSql(initialValues).await();
>         // ---------- Consume stream from Kafka -------------------
>         final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
>         final List<Row> expected =
>                 Arrays.asList(
>                         changelogRow(
>                                 "+I",
>                                 1L,
>                                 "name 1",
>                                 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
>                                 100L,
>                                 41,
>                                 "payload 1"),
>                         changelogRow(
>                                 "+I",
>                                 2L,
>                                 "name 2",
>                                 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
>                                 101L,
>                                 42,
>                                 "payload 2"),
>                         changelogRow(
>                                 "+I",
>                                 3L,
>                                 "name 3",
>                                 
> LocalDateTime.parse("2020-03-10T13:12:11.123"),
>                                 102L,
>                                 43,
>                                 "payload 3"),
>                         changelogRow(
>                                 "-U",
>                                 2L,
>                                 "name 2",
>                                 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
>                                 101L,
>                                 42,
>                                 "payload 2"),
>                         changelogRow(
>                                 "+U",
>                                 2L,
>                                 "name 2",
>                                 
> LocalDateTime.parse("2020-03-11T13:12:11.123"),
>                                 101L,
>                                 42,
>                                 "payload"));
>         assertThat(result, deepEqualTo(expected, true));
>         // ------------- cleanup -------------------
>         deleteTestTopic(topic);
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to