[ 
https://issues.apache.org/jira/browse/FLINK-20273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17236949#comment-17236949
 ] 

Jark Wu commented on FLINK-20273:
---------------------------------

Thanks [~fsk119] for the investigation!

> Fix Table api Kafka connector Sink Partitioner Document Error
> -------------------------------------------------------------
>
>                 Key: FLINK-20273
>                 URL: https://issues.apache.org/jira/browse/FLINK-20273
>             Project: Flink
>          Issue Type: Bug
>          Components: Documentation, Table SQL / API
>    Affects Versions: 1.12.0
>            Reporter: Shengkai Fang
>            Assignee: Shengkai Fang
>            Priority: Major
>             Fix For: 1.12.0
>
>
> The 
> [doc|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#sink-partitioning]
>  tells us that the kafka sink uses fixed partitioner by default. However, in 
> my local test, the it uses sticky partitioner to get the record partition id 
> if key is not set.
> You can add the test in the {{KafkaTableITCase}}, the code follows
> {code:java}
> public void testKafkaSourceSinkWithDefaultPartitioner() throws Exception {
>               if (isLegacyConnector) {
>                       return;
>               }
>               // we always use a different topic name for each parameterized 
> topic,
>               // in order to make sure the topic can be created.
>               final String topic = "key_full_value_topic_" + format;
>               createTestTopic(topic, 3, 1);
>               // ---------- Produce an event time stream into Kafka 
> -------------------
>               String groupId = standardProps.getProperty("group.id");
>               String bootstraps = 
> standardProps.getProperty("bootstrap.servers");
>               // compared to the partial value test we cannot support both 
> k_user_id and user_id in a full
>               // value due to duplicate names after key prefix stripping,
>               // fields are reordered on purpose,
>               // fields for keys and values are overlapping
>               final String createSourceTable = String.format(
>                               "CREATE TABLE kafkaSource (\n"
>                                               + "  `user_id` BIGINT,\n"
>                                               + "  `name` STRING,\n"
>                                               + "  `partition` INT METADATA"
>                                               + ") WITH (\n"
>                                               + "  'connector' = 'kafka',\n"
>                                               + "  'topic' = '%s',\n"
>                                               + "  
> 'properties.bootstrap.servers' = '%s',\n"
>                                               + "  'properties.group.id' = 
> '%s',\n"
>                                               + "  'scan.startup.mode' = 
> 'earliest-offset',\n"
>                                               + "  'format' = '%s'\n"
>                                               + ")",
>                               topic,
>                               bootstraps,
>                               groupId,
>                               format);
>               final String createSinkTable = String.format(
>                               "CREATE TABLE kafkaSink (\n"
>                                               + "  `user_id` BIGINT,\n"
>                                               + "  `name` STRING\n"
>                                               + ") WITH (\n"
>                                               + "  'connector' = 'kafka',\n"
>                                               + "  'topic' = '%s',\n"
>                                               + "  
> 'properties.bootstrap.servers' = '%s',\n"
>                                               + "  'properties.group.id' = 
> '%s',\n"
>                                               + "  'scan.startup.mode' = 
> 'earliest-offset',\n"
>                                               + "  'format' = '%s'\n"
>                                               + ")",
>                               topic,
>                               bootstraps,
>                               groupId,
>                               format);
>               tEnv.executeSql(createSourceTable);
>               tEnv.executeSql(createSinkTable);
>               String initialValues = "INSERT INTO kafkaSink\n"
>                                                                       + 
> "VALUES\n"
>                                                                       + " (1, 
> 'name 1'),\n"
>                                                                       + " (2, 
> 'name 2'),\n"
>                                                                       + " (3, 
> 'name 3')";
>               tEnv.executeSql(initialValues).await();
>               initialValues = "INSERT INTO kafkaSink\n"
>                               + "VALUES\n"
>                               + " (4, 'name 4'),\n"
>                               + " (5, 'name 5'),\n"
>                               + " (6, 'name 6')";
>               tEnv.executeSql(initialValues).await();
>               initialValues = "INSERT INTO kafkaSink\n"
>                               + "VALUES\n"
>                               + " (7, 'name 7'),\n"
>                               + " (8, 'name 8'),\n"
>                               + " (9, 'name 9')";
>               tEnv.executeSql(initialValues).await();
>               // ---------- Consume stream from Kafka -------------------
>               final List<Row> result = collectRows(tEnv.sqlQuery("SELECT * 
> FROM kafkaSource"), 9);
>               // ------------- cleanup -------------------
>               deleteTestTopic(topic);
>       }
> {code}
> The test will use the kafka default partitioner and sends record to kafka 
> topic. After insert, we can read the record with the parititon id. If it uses 
> the fixed partitioner, all records will has the same partition id. I repeat 
> the test 3 times and the results are
> {code:java}
> // the first result
> <1,name 1,1>
> <2,name 2,1>
> <3,name 3,1>
> <7,name 7,1>
> <4,name 4,0>
> <5,name 5,0>
> <6,name 6,0>
> <8,name 8,0>
> <9,name 9,0>
> // the second result
> <1,name 1,1>
> <2,name 2,1>
> <3,name 3,1>
> <4,name 4,0>
> <5,name 5,0>
> <6,name 6,0>
> <7,name 7,0>
> <8,name 8,0>
> <9,name 9,0>
> // the third result
> <9,name 9,2>
> <1,name 1,0>
> <2,name 2,0>
> <3,name 3,0>
> <4,name 4,0>
> <5,name 5,0>
> <6,name 6,0>
> <7,name 7,1>
> <8,name 8,1>
> {code}
> The last column is the partition-id and we have 3 partitions in the test. The 
> results show the default partitioner is sticky paritioner rather than fixed 
> partitioner.
> By the way, the sink partitioning section in the doc only works when the key 
> is null. If we set the key fields, the {{round-robin}} strategy will not work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to