[
https://issues.apache.org/jira/browse/FLINK-20273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17236949#comment-17236949
]
Jark Wu commented on FLINK-20273:
---------------------------------
Thanks [~fsk119] for the investigation!
> Fix Table api Kafka connector Sink Partitioner Document Error
> -------------------------------------------------------------
>
> Key: FLINK-20273
> URL: https://issues.apache.org/jira/browse/FLINK-20273
> Project: Flink
> Issue Type: Bug
> Components: Documentation, Table SQL / API
> Affects Versions: 1.12.0
> Reporter: Shengkai Fang
> Assignee: Shengkai Fang
> Priority: Major
> Fix For: 1.12.0
>
>
> The
> [doc|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#sink-partitioning]
> tells us that the kafka sink uses fixed partitioner by default. However, in
> my local test, the it uses sticky partitioner to get the record partition id
> if key is not set.
> You can add the test in the {{KafkaTableITCase}}, the code follows
> {code:java}
> public void testKafkaSourceSinkWithDefaultPartitioner() throws Exception {
> if (isLegacyConnector) {
> return;
> }
> // we always use a different topic name for each parameterized
> topic,
> // in order to make sure the topic can be created.
> final String topic = "key_full_value_topic_" + format;
> createTestTopic(topic, 3, 1);
> // ---------- Produce an event time stream into Kafka
> -------------------
> String groupId = standardProps.getProperty("group.id");
> String bootstraps =
> standardProps.getProperty("bootstrap.servers");
> // compared to the partial value test we cannot support both
> k_user_id and user_id in a full
> // value due to duplicate names after key prefix stripping,
> // fields are reordered on purpose,
> // fields for keys and values are overlapping
> final String createSourceTable = String.format(
> "CREATE TABLE kafkaSource (\n"
> + " `user_id` BIGINT,\n"
> + " `name` STRING,\n"
> + " `partition` INT METADATA"
> + ") WITH (\n"
> + " 'connector' = 'kafka',\n"
> + " 'topic' = '%s',\n"
> + "
> 'properties.bootstrap.servers' = '%s',\n"
> + " 'properties.group.id' =
> '%s',\n"
> + " 'scan.startup.mode' =
> 'earliest-offset',\n"
> + " 'format' = '%s'\n"
> + ")",
> topic,
> bootstraps,
> groupId,
> format);
> final String createSinkTable = String.format(
> "CREATE TABLE kafkaSink (\n"
> + " `user_id` BIGINT,\n"
> + " `name` STRING\n"
> + ") WITH (\n"
> + " 'connector' = 'kafka',\n"
> + " 'topic' = '%s',\n"
> + "
> 'properties.bootstrap.servers' = '%s',\n"
> + " 'properties.group.id' =
> '%s',\n"
> + " 'scan.startup.mode' =
> 'earliest-offset',\n"
> + " 'format' = '%s'\n"
> + ")",
> topic,
> bootstraps,
> groupId,
> format);
> tEnv.executeSql(createSourceTable);
> tEnv.executeSql(createSinkTable);
> String initialValues = "INSERT INTO kafkaSink\n"
> +
> "VALUES\n"
> + " (1,
> 'name 1'),\n"
> + " (2,
> 'name 2'),\n"
> + " (3,
> 'name 3')";
> tEnv.executeSql(initialValues).await();
> initialValues = "INSERT INTO kafkaSink\n"
> + "VALUES\n"
> + " (4, 'name 4'),\n"
> + " (5, 'name 5'),\n"
> + " (6, 'name 6')";
> tEnv.executeSql(initialValues).await();
> initialValues = "INSERT INTO kafkaSink\n"
> + "VALUES\n"
> + " (7, 'name 7'),\n"
> + " (8, 'name 8'),\n"
> + " (9, 'name 9')";
> tEnv.executeSql(initialValues).await();
> // ---------- Consume stream from Kafka -------------------
> final List<Row> result = collectRows(tEnv.sqlQuery("SELECT *
> FROM kafkaSource"), 9);
> // ------------- cleanup -------------------
> deleteTestTopic(topic);
> }
> {code}
> The test will use the kafka default partitioner and sends record to kafka
> topic. After insert, we can read the record with the parititon id. If it uses
> the fixed partitioner, all records will has the same partition id. I repeat
> the test 3 times and the results are
> {code:java}
> // the first result
> <1,name 1,1>
> <2,name 2,1>
> <3,name 3,1>
> <7,name 7,1>
> <4,name 4,0>
> <5,name 5,0>
> <6,name 6,0>
> <8,name 8,0>
> <9,name 9,0>
> // the second result
> <1,name 1,1>
> <2,name 2,1>
> <3,name 3,1>
> <4,name 4,0>
> <5,name 5,0>
> <6,name 6,0>
> <7,name 7,0>
> <8,name 8,0>
> <9,name 9,0>
> // the third result
> <9,name 9,2>
> <1,name 1,0>
> <2,name 2,0>
> <3,name 3,0>
> <4,name 4,0>
> <5,name 5,0>
> <6,name 6,0>
> <7,name 7,1>
> <8,name 8,1>
> {code}
> The last column is the partition-id and we have 3 partitions in the test. The
> results show the default partitioner is sticky paritioner rather than fixed
> partitioner.
> By the way, the sink partitioning section in the doc only works when the key
> is null. If we set the key fields, the {{round-robin}} strategy will not work.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)