[
https://issues.apache.org/jira/browse/FLINK-22969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Johannes Moser updated FLINK-22969:
-----------------------------------
Labels: starter (was: )
> Validate the topic is not null or empty string when create kafka source/sink
> function
> --------------------------------------------------------------------------------------
>
> Key: FLINK-22969
> URL: https://issues.apache.org/jira/browse/FLINK-22969
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka, Table SQL / Ecosystem
> Affects Versions: 1.14.0
> Reporter: Shengkai Fang
> Priority: Major
> Labels: starter
>
> Add test in UpsertKafkaTableITCase
> {code:java}
> @Test
> public void testSourceSinkWithKeyAndPartialValue() throws Exception {
> // we always use a different topic name for each parameterized topic,
> // in order to make sure the topic can be created.
> final String topic = "key_partial_value_topic_" + format;
> createTestTopic(topic, 1, 1); // use single partition to guarantee
> orders in tests
> // ---------- Produce an event time stream into Kafka
> -------------------
> String bootstraps = standardProps.getProperty("bootstrap.servers");
> // k_user_id and user_id have different data types to verify the
> correct mapping,
> // fields are reordered on purpose
> final String createTable =
> String.format(
> "CREATE TABLE upsert_kafka (\n"
> + " `k_user_id` BIGINT,\n"
> + " `name` STRING,\n"
> + " `timestamp` TIMESTAMP(3) METADATA,\n"
> + " `k_event_id` BIGINT,\n"
> + " `user_id` INT,\n"
> + " `payload` STRING,\n"
> + " PRIMARY KEY (k_event_id, k_user_id) NOT
> ENFORCED"
> + ") WITH (\n"
> + " 'connector' = 'upsert-kafka',\n"
> + " 'topic' = '%s',\n"
> + " 'properties.bootstrap.servers' = '%s',\n"
> + " 'key.format' = '%s',\n"
> + " 'key.fields-prefix' = 'k_',\n"
> + " 'value.format' = '%s',\n"
> + " 'value.fields-include' = 'EXCEPT_KEY'\n"
> + ")",
> "", bootstraps, format, format);
> tEnv.executeSql(createTable);
> String initialValues =
> "INSERT INTO upsert_kafka\n"
> + "VALUES\n"
> + " (1, 'name 1', TIMESTAMP '2020-03-08
> 13:12:11.123', 100, 41, 'payload 1'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-09
> 13:12:11.123', 101, 42, 'payload 2'),\n"
> + " (3, 'name 3', TIMESTAMP '2020-03-10
> 13:12:11.123', 102, 43, 'payload 3'),\n"
> + " (2, 'name 2', TIMESTAMP '2020-03-11
> 13:12:11.123', 101, 42, 'payload')";
> tEnv.executeSql(initialValues).await();
> // ---------- Consume stream from Kafka -------------------
> final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * FROM
> upsert_kafka"), 5);
> final List<Row> expected =
> Arrays.asList(
> changelogRow(
> "+I",
> 1L,
> "name 1",
>
> LocalDateTime.parse("2020-03-08T13:12:11.123"),
> 100L,
> 41,
> "payload 1"),
> changelogRow(
> "+I",
> 2L,
> "name 2",
>
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> "+I",
> 3L,
> "name 3",
>
> LocalDateTime.parse("2020-03-10T13:12:11.123"),
> 102L,
> 43,
> "payload 3"),
> changelogRow(
> "-U",
> 2L,
> "name 2",
>
> LocalDateTime.parse("2020-03-09T13:12:11.123"),
> 101L,
> 42,
> "payload 2"),
> changelogRow(
> "+U",
> 2L,
> "name 2",
>
> LocalDateTime.parse("2020-03-11T13:12:11.123"),
> 101L,
> 42,
> "payload"));
> assertThat(result, deepEqualTo(expected, true));
> // ------------- cleanup -------------------
> deleteTestTopic(topic);
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)