[ 
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17379844#comment-17379844
 ] 

luoyuxia commented on FLINK-22969:
----------------------------------

[~fsk119] Yeah,  I think we can close the Jira. 

[~longwang0616] Thanks for your contribution and passion to Flink. As there are 
many issues in flink community, you can pick any one of them you

are interested to fix. We'll be glad to review your pull request and merge it 
to flink. Of course, in flink community, you can report a bug/propose new 
features that is not implemented yet in flink but valuable /ask question/ 
answer to othes, it's also an import contribution to flink.

> Validate the topic is not null or empty string when create kafka source/sink 
> function 
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-22969
>                 URL: https://issues.apache.org/jira/browse/FLINK-22969
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka, Table SQL / Ecosystem
>    Affects Versions: 1.14.0
>            Reporter: Shengkai Fang
>            Priority: Major
>              Labels: pull-request-available, starter
>         Attachments: image-2021-07-06-18-55-22-235.png, 
> image-2021-07-06-18-55-54-109.png, image-2021-07-06-19-01-22-483.png, 
> image-2021-07-06-19-03-22-899.png, image-2021-07-06-19-03-32-050.png, 
> image-2021-07-06-19-04-16-530.png, image-2021-07-06-19-04-53-651.png, 
> image-2021-07-06-19-05-48-964.png, image-2021-07-06-19-07-01-607.png, 
> image-2021-07-06-19-07-27-936.png, image-2021-07-06-22-41-52-089.png, 
> image-2021-07-13-11-02-37-451.png, image-2021-07-13-11-03-30-740.png, 
> image-2021-07-13-11-15-06-977.png, image-2021-07-13-11-15-47-392.png, 
> image-2021-07-13-11-20-22-134.png, image-2021-07-13-11-37-27-870.png, 
> image-2021-07-13-11-39-20-010.png
>
>
> Add test in UpsertKafkaTableITCase
> {code:java}
>  @Test
>     public void testSourceSinkWithKeyAndPartialValue() throws Exception {
>         // we always use a different topic name for each parameterized topic,
>         // in order to make sure the topic can be created.
>         final String topic = "key_partial_value_topic_" + format;
>         createTestTopic(topic, 1, 1); // use single partition to guarantee 
> orders in tests
>         // ---------- Produce an event time stream into Kafka 
> -------------------
>         String bootstraps = standardProps.getProperty("bootstrap.servers");
>         // k_user_id and user_id have different data types to verify the 
> correct mapping,
>         // fields are reordered on purpose
>         final String createTable =
>                 String.format(
>                         "CREATE TABLE upsert_kafka (\n"
>                                 + "  `k_user_id` BIGINT,\n"
>                                 + "  `name` STRING,\n"
>                                 + "  `timestamp` TIMESTAMP(3) METADATA,\n"
>                                 + "  `k_event_id` BIGINT,\n"
>                                 + "  `user_id` INT,\n"
>                                 + "  `payload` STRING,\n"
>                                 + "  PRIMARY KEY (k_event_id, k_user_id) NOT 
> ENFORCED"
>                                 + ") WITH (\n"
>                                 + "  'connector' = 'upsert-kafka',\n"
>                                 + "  'topic' = '%s',\n"
>                                 + "  'properties.bootstrap.servers' = '%s',\n"
>                                 + "  'key.format' = '%s',\n"
>                                 + "  'key.fields-prefix' = 'k_',\n"
>                                 + "  'value.format' = '%s',\n"
>                                 + "  'value.fields-include' = 'EXCEPT_KEY'\n"
>                                 + ")",
>                         "", bootstraps, format, format);
>         tEnv.executeSql(createTable);
>         String initialValues =
>                 "INSERT INTO upsert_kafka\n"
>                         + "VALUES\n"
>                         + " (1, 'name 1', TIMESTAMP '2020-03-08 
> 13:12:11.123', 100, 41, 'payload 1'),\n"
>                         + " (2, 'name 2', TIMESTAMP '2020-03-09 
> 13:12:11.123', 101, 42, 'payload 2'),\n"
>                         + " (3, 'name 3', TIMESTAMP '2020-03-10 
> 13:12:11.123', 102, 43, 'payload 3'),\n"
>                         + " (2, 'name 2', TIMESTAMP '2020-03-11 
> 13:12:11.123', 101, 42, 'payload')";
>         tEnv.executeSql(initialValues).await();
>         // ---------- Consume stream from Kafka -------------------
>         final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM 
> upsert_kafka"), 5);
>         final List<Row> expected =
>                 Arrays.asList(
>                         changelogRow(
>                                 "+I",
>                                 1L,
>                                 "name 1",
>                                 
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
>                                 100L,
>                                 41,
>                                 "payload 1"),
>                         changelogRow(
>                                 "+I",
>                                 2L,
>                                 "name 2",
>                                 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
>                                 101L,
>                                 42,
>                                 "payload 2"),
>                         changelogRow(
>                                 "+I",
>                                 3L,
>                                 "name 3",
>                                 
> LocalDateTime.parse("2020-03-10T13:12:11.123"),
>                                 102L,
>                                 43,
>                                 "payload 3"),
>                         changelogRow(
>                                 "-U",
>                                 2L,
>                                 "name 2",
>                                 
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
>                                 101L,
>                                 42,
>                                 "payload 2"),
>                         changelogRow(
>                                 "+U",
>                                 2L,
>                                 "name 2",
>                                 
> LocalDateTime.parse("2020-03-11T13:12:11.123"),
>                                 101L,
>                                 42,
>                                 "payload"));
>         assertThat(result, deepEqualTo(expected, true));
>         // ------------- cleanup -------------------
>         deleteTestTopic(topic);
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to