[ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17378820#comment-17378820
 ] 

luoyuxia commented on FLINK-22969:
----------------------------------

[~fsk119]

Do you have time to check about it? I'm afraid of I misunderstand what the 
issue you proposes means.

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-22969
>                 URL: https://issues.apache.org/jira/browse/FLINK-22969
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka, Table SQL / Ecosystem
>    Affects Versions: 1.14.0
>            Reporter: Shengkai Fang
>            Priority: Major
>              Labels: pull-request-available, starter
>         Attachments: image-2021-07-06-18-55-22-235.png, 
> image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, 
> image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, 
> image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, 
> image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png, 
> image-2021-07-06-19-07-27-936.png, image-2021-07-06-22-41-52-089.png
>
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
>     public void testSourceSinkWithKeyAndPartialValue() throws Exception {
>         // we always use a different topic name for each parameterized topic,
>         // in order to make sure the topic can be created.
>         final String topic = "key_partial_value_topic_" + format;
>         createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
>         // ---------- Produce an event time stream into Kafka 
> -------------------
>         String bootstraps = standardProps.getProperty("bootstrap.servers");
>         // k_user_id and user_id have different data types to verify the 
> correct mapping,
>         // fields are reordered on purpose
>         final String createTable =
>                 String.format(
>                         "CREATE TABLE upsert_kafka (\n"
>                                 + "  `k_user_id` BIGINT,\n"
>                                 + "  `name` STRING,\n"
>                                 + "  `timestamp` TIMESTAMP(3) METADATA,\n"
>                                 + "  `k_event_id` BIGINT,\n"
>                                 + "  `user_id` INT,\n"
>                                 + "  `payload` STRING,\n"
>                                 + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
>                                 + ") WITH (\n"
>                                 + "  'connector' = 'upsert-kafka',\n"
>                                 + "  'topic' = '%s',\n"
>                                 + "  'properties.bootstrap.servers' = '%s',\n"
>                                 + "  'key.format' = '%s',\n"
>                                 + "  'key.fields-prefix' = 'k_',\n"
>                                 + "  'value.format' = '%s',\n"
>                                 + "  'value.fields-include' = 'EXCEPT_KEY'\n"
>                                 + ")",
>                         "", bootstraps, format, format);
>         tEnv.executeSql(createTable);
>         String initialValues =
>                 "INSERT INTO upsert_kafka\n"
>                         + "VALUES\n"
>                         + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
>                         + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
>                         + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
>                         + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
>         tEnv.executeSql(initialValues).await();
>         // ---------- Consume stream from Kafka -------------------
>         final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
>         final List<Row> expected =
>                 Arrays.asList(
>                         changelogRow(
>                                 "+I",
>                                 1L,
>                                 "name 1",
>                                 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
>                                 100L,
>                                 41,
>                                 "payload 1"),
>                         changelogRow(
>                                 "+I",
>                                 2L,
>                                 "name 2",
>                                 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
>                                 101L,
>                                 42,
>                                 "payload 2"),
>                         changelogRow(
>                                 "+I",
>                                 3L,
>                                 "name 3",
>                                 
> LocalDateTime.parse("2020-03-10T13:12:11.123"),
>                                 102L,
>                                 43,
>                                 "payload 3"),
>                         changelogRow(
>                                 "-U",
>                                 2L,
>                                 "name 2",
>                                 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
>                                 101L,
>                                 42,
>                                 "payload 2"),
>                         changelogRow(
>                                 "+U",
>                                 2L,
>                                 "name 2",
>                                 
> LocalDateTime.parse("2020-03-11T13:12:11.123"),
>                                 101L,
>                                 42,
>                                 "payload"));
>         assertThat(result, deepEqualTo(expected, true));
>         // ------------- cleanup -------------------
>         deleteTestTopic(topic);
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to