This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 033b230  [FLINK-20273][table/kafka] Fix the Kafka 'round-robin' 
partitioner behaviour when message keys are specified
033b230 is described below

commit 033b230a155ed188ca8051c5defcda016427591a
Author: Shengkai <[email protected]>
AuthorDate: Mon Dec 7 23:26:26 2020 +0800

    [FLINK-20273][table/kafka] Fix the Kafka 'round-robin' partitioner 
behaviour when message keys are specified
    
    This closes #14246
---
 docs/dev/table/connectors/kafka.md                 | 12 ++++++-----
 docs/dev/table/connectors/kafka.zh.md              | 12 ++++++-----
 .../connectors/kafka/table/KafkaOptions.java       | 23 +++++++++++-----------
 .../kafka/table/KafkaDynamicTableFactoryTest.java  | 15 +++++++++++++-
 4 files changed, 39 insertions(+), 23 deletions(-)

diff --git a/docs/dev/table/connectors/kafka.md 
b/docs/dev/table/connectors/kafka.md
index 85e1f5b..7e3891f 100644
--- a/docs/dev/table/connectors/kafka.md
+++ b/docs/dev/table/connectors/kafka.md
@@ -333,14 +333,16 @@ Connector Options
     <tr>
       <td><h5>sink.partitioner</h5></td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">(none)</td>
+      <td style="word-wrap: break-word;">'default'</td>
       <td>String</td>
       <td>Output partitioning from Flink's partitions into Kafka's partitions. 
Valid values are
       <ul>
+        <li><code>default</code>: use the kafka default partitioner to 
partition records.</li>
         <li><code>fixed</code>: each Flink partition ends up in at most one 
Kafka partition.</li>
-        <li><code>round-robin</code>: a Flink partition is distributed to 
Kafka partitions round-robin.</li>
+        <li><code>round-robin</code>: a Flink partition is distributed to 
Kafka partitions sticky round-robin. It only works when record's keys are not 
specified.</li>
         <li>Custom <code>FlinkKafkaPartitioner</code> subclass: e.g. 
<code>'org.mycompany.MyPartitioner'</code>.</li>
       </ul>
+      See the following <a href="#sink-partitioning">Sink Partitioning</a> for 
more details.
       </td>
     </tr>
     <tr>
@@ -525,9 +527,9 @@ See more about how to use the CDC formats in 
[debezium-json]({% link dev/table/c
 ### Sink Partitioning
 
 The config option `sink.partitioner` specifies output partitioning from 
Flink's partitions into Kafka's partitions.
-By default, a Kafka sink writes to at most as many partitions as its own 
parallelism (each parallel instance of the sink writes to exactly one 
partition).
-In order to distribute the writes to more partitions or control the routing of 
rows into partitions, a custom sink partitioner can be provided. The 
`round-robin` partitioner is useful to avoid an unbalanced partitioning.
-However, it will cause a lot of network connections between all the Flink 
instances and all the Kafka brokers.
+By default, Flink uses the [Kafka default 
partitioner](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java)
 to parititon records. It uses the [sticky partition 
strategy](https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/)
 for records with null keys and uses a murmur2 hash to compute the partition 
for a record with the key defined.
+
+In order to control the routing of rows into partitions, a custom sink 
partitioner can be provided. The 'fixed' partitioner will write the records in 
the same Flink partition into the same Kafka partition, which could reduce the 
cost of the network connections.
 
 ### Consistency guarantees
 
diff --git a/docs/dev/table/connectors/kafka.zh.md 
b/docs/dev/table/connectors/kafka.zh.md
index df6192c..0e6c108 100644
--- a/docs/dev/table/connectors/kafka.zh.md
+++ b/docs/dev/table/connectors/kafka.zh.md
@@ -333,14 +333,16 @@ Connector Options
     <tr>
       <td><h5>sink.partitioner</h5></td>
       <td>optional</td>
-      <td style="word-wrap: break-word;">(none)</td>
+      <td style="word-wrap: break-word;">'default'</td>
       <td>String</td>
       <td>Output partitioning from Flink's partitions into Kafka's partitions. 
Valid values are
       <ul>
+        <li><code>default</code>: use the kafka default partitioner to 
partition records.</li>
         <li><code>fixed</code>: each Flink partition ends up in at most one 
Kafka partition.</li>
-        <li><code>round-robin</code>: a Flink partition is distributed to 
Kafka partitions round-robin.</li>
+        <li><code>round-robin</code>: a Flink partition is distributed to 
Kafka partitions sticky round-robin. It only works when record's keys are not 
specified.</li>
         <li>Custom <code>FlinkKafkaPartitioner</code> subclass: e.g. 
<code>'org.mycompany.MyPartitioner'</code>.</li>
       </ul>
+      See the following <a href="#sink-partitioning">Sink Partitioning</a> for 
more details.
       </td>
     </tr>
     <tr>
@@ -526,9 +528,9 @@ See more about how to use the CDC formats in 
[debezium-json]({% link dev/table/c
 ### Sink Partitioning
 
 The config option `sink.partitioner` specifies output partitioning from 
Flink's partitions into Kafka's partitions.
-By default, a Kafka sink writes to at most as many partitions as its own 
parallelism (each parallel instance of the sink writes to exactly one 
partition).
-In order to distribute the writes to more partitions or control the routing of 
rows into partitions, a custom sink partitioner can be provided. The 
`round-robin` partitioner is useful to avoid an unbalanced partitioning.
-However, it will cause a lot of network connections between all the Flink 
instances and all the Kafka brokers.
+By default, Flink uses the [Kafka default 
partitioner](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java)
 to parititon records. It uses the [sticky partition 
strategy](https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/)
 for records with null keys and uses a murmur2 hash to compute the partition 
for a record with the key defined.
+
+In order to control the routing of rows into partitions, a custom sink 
partitioner can be provided. The 'fixed' partitioner will write the records in 
the same Flink partition into the same partition, which could reduce the cost 
of the network connections.
 
 ### Consistency guarantees
 
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
index 5e5baca..c2bd143 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
@@ -176,11 +176,12 @@ public class KafkaOptions {
        public static final ConfigOption<String> SINK_PARTITIONER = 
ConfigOptions
                        .key("sink.partitioner")
                        .stringType()
-                       .noDefaultValue()
+                       .defaultValue("default")
                        .withDescription("Optional output partitioning from 
Flink's partitions\n"
                                        + "into Kafka's partitions valid 
enumerations are\n"
+                                       + "\"default\": (use kafka default 
partitioner to partition records),\n"
                                        + "\"fixed\": (each Flink partition 
ends up in at most one Kafka partition),\n"
-                                       + "\"round-robin\": (a Flink partition 
is distributed to Kafka partitions round-robin)\n"
+                                       + "\"round-robin\": (a Flink partition 
is distributed to Kafka partitions round-robin when 'key.fields' is not 
specified.)\n"
                                        + "\"custom class name\": (use a custom 
FlinkKafkaPartitioner subclass)");
 
        public static final ConfigOption<String> SINK_SEMANTIC = 
ConfigOptions.key("sink.semantic")
@@ -207,13 +208,10 @@ public class KafkaOptions {
                        SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
 
        // Sink partitioner.
+       public static final String SINK_PARTITIONER_VALUE_DEFAULT = "default";
        public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
        public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = 
"round-robin";
 
-       private static final Set<String> SINK_PARTITIONER_ENUMS = new 
HashSet<>(Arrays.asList(
-                       SINK_PARTITIONER_VALUE_FIXED,
-                       SINK_PARTITIONER_VALUE_ROUND_ROBIN));
-
        // Sink semantic
        public static final String SINK_SEMANTIC_VALUE_EXACTLY_ONCE = 
"exactly-once";
        public static final String SINK_SEMANTIC_VALUE_AT_LEAST_ONCE = 
"at-least-once";
@@ -314,12 +312,12 @@ public class KafkaOptions {
        private static void validateSinkPartitioner(ReadableConfig 
tableOptions) {
                tableOptions.getOptional(SINK_PARTITIONER)
                                .ifPresent(partitioner -> {
-                                       if 
(!SINK_PARTITIONER_ENUMS.contains(partitioner.toLowerCase())) {
-                                               if (partitioner.isEmpty()) {
-                                                       throw new 
ValidationException(
-                                                                       
String.format("Option '%s' should be a non-empty string.",
-                                                                               
        SINK_PARTITIONER.key()));
-                                               }
+                                       if 
(partitioner.equals(SINK_PARTITIONER_VALUE_ROUND_ROBIN) && 
tableOptions.getOptional(KEY_FIELDS).isPresent()) {
+                                               throw new 
ValidationException("Currently 'round-robin' partitioner only works when option 
'key.fields' is not specified.");
+                                       } else if (partitioner.isEmpty()) {
+                                               throw new ValidationException(
+                                                               
String.format("Option '%s' should be a non-empty string.",
+                                                                               
SINK_PARTITIONER.key()));
                                        }
                                });
        }
@@ -440,6 +438,7 @@ public class KafkaOptions {
                                        switch (partitioner) {
                                        case SINK_PARTITIONER_VALUE_FIXED:
                                                return Optional.of(new 
FlinkFixedPartitioner<>());
+                                       case SINK_PARTITIONER_VALUE_DEFAULT:
                                        case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
                                                return Optional.empty();
                                        // Default fallback to full class name 
of the partitioner.
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index 6777272..e788cc8 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -502,13 +502,26 @@ public class KafkaDynamicTableFactoryTest extends 
TestLogger {
                                + "class 'abc'")));
 
                final Map<String, String> modifiedOptions = getModifiedOptions(
-                               getBasicSourceOptions(),
+                               getBasicSinkOptions(),
                                options -> options.put("sink.partitioner", 
"abc"));
 
                createTableSink(SCHEMA, modifiedOptions);
        }
 
        @Test
+       public void testInvalidRoundRobinPartitionerWithKeyFields() {
+               thrown.expect(ValidationException.class);
+               thrown.expect(containsCause(new ValidationException("Currently 
'round-robin' partitioner only works " +
+                               "when option 'key.fields' is not specified.")));
+
+               final Map<String, String> modifiedOptions = getModifiedOptions(
+                               getKeyValueOptions(),
+                               options -> options.put("sink.partitioner", 
"round-robin"));
+
+               createTableSink(SCHEMA, modifiedOptions);
+       }
+
+       @Test
        public void testInvalidSinkSemantic() {
                thrown.expect(ValidationException.class);
                thrown.expect(containsCause(new 
ValidationException("Unsupported value 'xyz' for 'sink.semantic'. "

Reply via email to