wuchong commented on a change in pull request #14246:
URL: https://github.com/apache/flink/pull/14246#discussion_r537312778



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -314,12 +317,12 @@ private static void 
validateScanStartupMode(ReadableConfig tableOptions) {
        private static void validateSinkPartitioner(ReadableConfig 
tableOptions) {
                tableOptions.getOptional(SINK_PARTITIONER)
                                .ifPresent(partitioner -> {
-                                       if 
(!SINK_PARTITIONER_ENUMS.contains(partitioner.toLowerCase())) {
-                                               if (partitioner.isEmpty()) {
-                                                       throw new 
ValidationException(
-                                                                       
String.format("Option '%s' should be a non-empty string.",
-                                                                               
        SINK_PARTITIONER.key()));
-                                               }
+                                       if 
(partitioner.equals(SINK_PARTITIONER_VALUE_ROUND_ROBIN) && 
tableOptions.getOptional(KEY_FIELDS).isPresent()) {

Review comment:
       We should still need to check the partitioner is in the allowed enums. 

##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -525,9 +527,9 @@ See more about how to use the CDC formats in 
[debezium-json]({% link dev/table/c
 ### Sink Partitioning
 
 The config option `sink.partitioner` specifies output partitioning from 
Flink's partitions into Kafka's partitions.
-By default, a Kafka sink writes to at most as many partitions as its own 
parallelism (each parallel instance of the sink writes to exactly one 
partition).
-In order to distribute the writes to more partitions or control the routing of 
rows into partitions, a custom sink partitioner can be provided. The 
`round-robin` partitioner is useful to avoid an unbalanced partitioning.
-However, it will cause a lot of network connections between all the Flink 
instances and all the Kafka brokers.
+By default, Flink uses the [Kafka default 
partitioner](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java)
 to parititon records.
+It uses the [sticky partition 
strategy](https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/)
 for records with null keys and uses a murmur2 hash to compute the partition 
for a record with the key defined.
+In order to control the routing of rows into partitions, a custom sink 
partitioner can be provided. The 'fixed' partitioner will write the records in 
the same Flink partition into the same partition, which could reduce the cost 
of the network connections.

Review comment:
       ```suggestion
   By default, Flink uses the [Kafka default 
partitioner](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java)
 to parititon records. It uses the [sticky partition 
strategy](https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/)
 for records with null keys and uses a murmur2 hash to compute the partition 
for a record with the key defined.
   
   In order to control the routing of rows into partitions, a custom sink 
partitioner can be provided. The 'fixed' partitioner will write the records in 
the same Flink partition into the same Kafka partition, which could reduce the 
cost of the network connections.
   ```

##########
File path: docs/dev/table/connectors/kafka.zh.md
##########
@@ -526,9 +528,9 @@ See more about how to use the CDC formats in 
[debezium-json]({% link dev/table/c
 ### Sink Partitioning
 
 The config option `sink.partitioner` specifies output partitioning from 
Flink's partitions into Kafka's partitions.
-By default, a Kafka sink writes to at most as many partitions as its own 
parallelism (each parallel instance of the sink writes to exactly one 
partition).
-In order to distribute the writes to more partitions or control the routing of 
rows into partitions, a custom sink partitioner can be provided. The 
`round-robin` partitioner is useful to avoid an unbalanced partitioning.
-However, it will cause a lot of network connections between all the Flink 
instances and all the Kafka brokers.
+By default, Flink uses the [Kafka default 
partitioner](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java)
 to parititon records.
+It uses the [sticky partition 
strategy](https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/)
 for records with null keys and uses a murmur2 hash to compute the partition 
for a record with the key defined.
+In order to control the routing of rows into partitions, a custom sink 
partitioner can be provided. The 'fixed' partitioner will write the records in 
the same Flink partition into the same partition, which could reduce the cost 
of the network connections.

Review comment:
       ```suggestion
   By default, Flink uses the [Kafka default 
partitioner](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java)
 to parititon records. It uses the [sticky partition 
strategy](https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/)
 for records with null keys and uses a murmur2 hash to compute the partition 
for a record with the key defined.
   
   In order to control the routing of rows into partitions, a custom sink 
partitioner can be provided. The 'fixed' partitioner will write the records in 
the same Flink partition into the same partition, which could reduce the cost 
of the network connections.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to