wuchong commented on a change in pull request #14246:
URL: https://github.com/apache/flink/pull/14246#discussion_r537312778
##########
File path:
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -314,12 +317,12 @@ private static void
validateScanStartupMode(ReadableConfig tableOptions) {
private static void validateSinkPartitioner(ReadableConfig
tableOptions) {
tableOptions.getOptional(SINK_PARTITIONER)
.ifPresent(partitioner -> {
- if
(!SINK_PARTITIONER_ENUMS.contains(partitioner.toLowerCase())) {
- if (partitioner.isEmpty()) {
- throw new
ValidationException(
-
String.format("Option '%s' should be a non-empty string.",
-
SINK_PARTITIONER.key()));
- }
+ if
(partitioner.equals(SINK_PARTITIONER_VALUE_ROUND_ROBIN) &&
tableOptions.getOptional(KEY_FIELDS).isPresent()) {
Review comment:
We should still need to check the partitioner is in the allowed enums.
##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -525,9 +527,9 @@ See more about how to use the CDC formats in
[debezium-json]({% link dev/table/c
### Sink Partitioning
The config option `sink.partitioner` specifies output partitioning from
Flink's partitions into Kafka's partitions.
-By default, a Kafka sink writes to at most as many partitions as its own
parallelism (each parallel instance of the sink writes to exactly one
partition).
-In order to distribute the writes to more partitions or control the routing of
rows into partitions, a custom sink partitioner can be provided. The
`round-robin` partitioner is useful to avoid an unbalanced partitioning.
-However, it will cause a lot of network connections between all the Flink
instances and all the Kafka brokers.
+By default, Flink uses the [Kafka default
partitioner](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java)
to parititon records.
+It uses the [sticky partition
strategy](https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/)
for records with null keys and uses a murmur2 hash to compute the partition
for a record with the key defined.
+In order to control the routing of rows into partitions, a custom sink
partitioner can be provided. The 'fixed' partitioner will write the records in
the same Flink partition into the same partition, which could reduce the cost
of the network connections.
Review comment:
```suggestion
By default, Flink uses the [Kafka default
partitioner](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java)
to parititon records. It uses the [sticky partition
strategy](https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/)
for records with null keys and uses a murmur2 hash to compute the partition
for a record with the key defined.
In order to control the routing of rows into partitions, a custom sink
partitioner can be provided. The 'fixed' partitioner will write the records in
the same Flink partition into the same Kafka partition, which could reduce the
cost of the network connections.
```
##########
File path: docs/dev/table/connectors/kafka.zh.md
##########
@@ -526,9 +528,9 @@ See more about how to use the CDC formats in
[debezium-json]({% link dev/table/c
### Sink Partitioning
The config option `sink.partitioner` specifies output partitioning from
Flink's partitions into Kafka's partitions.
-By default, a Kafka sink writes to at most as many partitions as its own
parallelism (each parallel instance of the sink writes to exactly one
partition).
-In order to distribute the writes to more partitions or control the routing of
rows into partitions, a custom sink partitioner can be provided. The
`round-robin` partitioner is useful to avoid an unbalanced partitioning.
-However, it will cause a lot of network connections between all the Flink
instances and all the Kafka brokers.
+By default, Flink uses the [Kafka default
partitioner](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java)
to parititon records.
+It uses the [sticky partition
strategy](https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/)
for records with null keys and uses a murmur2 hash to compute the partition
for a record with the key defined.
+In order to control the routing of rows into partitions, a custom sink
partitioner can be provided. The 'fixed' partitioner will write the records in
the same Flink partition into the same partition, which could reduce the cost
of the network connections.
Review comment:
```suggestion
By default, Flink uses the [Kafka default
partitioner](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java)
to parititon records. It uses the [sticky partition
strategy](https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/)
for records with null keys and uses a murmur2 hash to compute the partition
for a record with the key defined.
In order to control the routing of rows into partitions, a custom sink
partitioner can be provided. The 'fixed' partitioner will write the records in
the same Flink partition into the same partition, which could reduce the cost
of the network connections.
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]