This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new 033b230 [FLINK-20273][table/kafka] Fix the Kafka 'round-robin'
partitioner behaviour when message keys are specified
033b230 is described below
commit 033b230a155ed188ca8051c5defcda016427591a
Author: Shengkai <[email protected]>
AuthorDate: Mon Dec 7 23:26:26 2020 +0800
[FLINK-20273][table/kafka] Fix the Kafka 'round-robin' partitioner
behaviour when message keys are specified
This closes #14246
---
docs/dev/table/connectors/kafka.md | 12 ++++++-----
docs/dev/table/connectors/kafka.zh.md | 12 ++++++-----
.../connectors/kafka/table/KafkaOptions.java | 23 +++++++++++-----------
.../kafka/table/KafkaDynamicTableFactoryTest.java | 15 +++++++++++++-
4 files changed, 39 insertions(+), 23 deletions(-)
diff --git a/docs/dev/table/connectors/kafka.md
b/docs/dev/table/connectors/kafka.md
index 85e1f5b..7e3891f 100644
--- a/docs/dev/table/connectors/kafka.md
+++ b/docs/dev/table/connectors/kafka.md
@@ -333,14 +333,16 @@ Connector Options
<tr>
<td><h5>sink.partitioner</h5></td>
<td>optional</td>
- <td style="word-wrap: break-word;">(none)</td>
+ <td style="word-wrap: break-word;">'default'</td>
<td>String</td>
<td>Output partitioning from Flink's partitions into Kafka's partitions.
Valid values are
<ul>
+ <li><code>default</code>: use the kafka default partitioner to
partition records.</li>
<li><code>fixed</code>: each Flink partition ends up in at most one
Kafka partition.</li>
- <li><code>round-robin</code>: a Flink partition is distributed to
Kafka partitions round-robin.</li>
+ <li><code>round-robin</code>: a Flink partition is distributed to
Kafka partitions sticky round-robin. It only works when record's keys are not
specified.</li>
<li>Custom <code>FlinkKafkaPartitioner</code> subclass: e.g.
<code>'org.mycompany.MyPartitioner'</code>.</li>
</ul>
+ See the following <a href="#sink-partitioning">Sink Partitioning</a> for
more details.
</td>
</tr>
<tr>
@@ -525,9 +527,9 @@ See more about how to use the CDC formats in
[debezium-json]({% link dev/table/c
### Sink Partitioning
The config option `sink.partitioner` specifies output partitioning from
Flink's partitions into Kafka's partitions.
-By default, a Kafka sink writes to at most as many partitions as its own
parallelism (each parallel instance of the sink writes to exactly one
partition).
-In order to distribute the writes to more partitions or control the routing of
rows into partitions, a custom sink partitioner can be provided. The
`round-robin` partitioner is useful to avoid an unbalanced partitioning.
-However, it will cause a lot of network connections between all the Flink
instances and all the Kafka brokers.
+By default, Flink uses the [Kafka default
partitioner](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java)
to parititon records. It uses the [sticky partition
strategy](https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/)
for records with null keys and uses a murmur2 hash to compute the partition
for a record with the key defined.
+
+In order to control the routing of rows into partitions, a custom sink
partitioner can be provided. The 'fixed' partitioner will write the records in
the same Flink partition into the same Kafka partition, which could reduce the
cost of the network connections.
### Consistency guarantees
diff --git a/docs/dev/table/connectors/kafka.zh.md
b/docs/dev/table/connectors/kafka.zh.md
index df6192c..0e6c108 100644
--- a/docs/dev/table/connectors/kafka.zh.md
+++ b/docs/dev/table/connectors/kafka.zh.md
@@ -333,14 +333,16 @@ Connector Options
<tr>
<td><h5>sink.partitioner</h5></td>
<td>optional</td>
- <td style="word-wrap: break-word;">(none)</td>
+ <td style="word-wrap: break-word;">'default'</td>
<td>String</td>
<td>Output partitioning from Flink's partitions into Kafka's partitions.
Valid values are
<ul>
+ <li><code>default</code>: use the kafka default partitioner to
partition records.</li>
<li><code>fixed</code>: each Flink partition ends up in at most one
Kafka partition.</li>
- <li><code>round-robin</code>: a Flink partition is distributed to
Kafka partitions round-robin.</li>
+ <li><code>round-robin</code>: a Flink partition is distributed to
Kafka partitions sticky round-robin. It only works when record's keys are not
specified.</li>
<li>Custom <code>FlinkKafkaPartitioner</code> subclass: e.g.
<code>'org.mycompany.MyPartitioner'</code>.</li>
</ul>
+ See the following <a href="#sink-partitioning">Sink Partitioning</a> for
more details.
</td>
</tr>
<tr>
@@ -526,9 +528,9 @@ See more about how to use the CDC formats in
[debezium-json]({% link dev/table/c
### Sink Partitioning
The config option `sink.partitioner` specifies output partitioning from
Flink's partitions into Kafka's partitions.
-By default, a Kafka sink writes to at most as many partitions as its own
parallelism (each parallel instance of the sink writes to exactly one
partition).
-In order to distribute the writes to more partitions or control the routing of
rows into partitions, a custom sink partitioner can be provided. The
`round-robin` partitioner is useful to avoid an unbalanced partitioning.
-However, it will cause a lot of network connections between all the Flink
instances and all the Kafka brokers.
+By default, Flink uses the [Kafka default
partitioner](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java)
to parititon records. It uses the [sticky partition
strategy](https://www.confluent.io/blog/apache-kafka-producer-improvements-sticky-partitioner/)
for records with null keys and uses a murmur2 hash to compute the partition
for a record with the key defined.
+
+In order to control the routing of rows into partitions, a custom sink
partitioner can be provided. The 'fixed' partitioner will write the records in
the same Flink partition into the same partition, which could reduce the cost
of the network connections.
### Consistency guarantees
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
index 5e5baca..c2bd143 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
@@ -176,11 +176,12 @@ public class KafkaOptions {
public static final ConfigOption<String> SINK_PARTITIONER =
ConfigOptions
.key("sink.partitioner")
.stringType()
- .noDefaultValue()
+ .defaultValue("default")
.withDescription("Optional output partitioning from
Flink's partitions\n"
+ "into Kafka's partitions valid
enumerations are\n"
+ + "\"default\": (use kafka default
partitioner to partition records),\n"
+ "\"fixed\": (each Flink partition
ends up in at most one Kafka partition),\n"
- + "\"round-robin\": (a Flink partition
is distributed to Kafka partitions round-robin)\n"
+ + "\"round-robin\": (a Flink partition
is distributed to Kafka partitions round-robin when 'key.fields' is not
specified.)\n"
+ "\"custom class name\": (use a custom
FlinkKafkaPartitioner subclass)");
public static final ConfigOption<String> SINK_SEMANTIC =
ConfigOptions.key("sink.semantic")
@@ -207,13 +208,10 @@ public class KafkaOptions {
SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
// Sink partitioner.
+ public static final String SINK_PARTITIONER_VALUE_DEFAULT = "default";
public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN =
"round-robin";
- private static final Set<String> SINK_PARTITIONER_ENUMS = new
HashSet<>(Arrays.asList(
- SINK_PARTITIONER_VALUE_FIXED,
- SINK_PARTITIONER_VALUE_ROUND_ROBIN));
-
// Sink semantic
public static final String SINK_SEMANTIC_VALUE_EXACTLY_ONCE =
"exactly-once";
public static final String SINK_SEMANTIC_VALUE_AT_LEAST_ONCE =
"at-least-once";
@@ -314,12 +312,12 @@ public class KafkaOptions {
private static void validateSinkPartitioner(ReadableConfig
tableOptions) {
tableOptions.getOptional(SINK_PARTITIONER)
.ifPresent(partitioner -> {
- if
(!SINK_PARTITIONER_ENUMS.contains(partitioner.toLowerCase())) {
- if (partitioner.isEmpty()) {
- throw new
ValidationException(
-
String.format("Option '%s' should be a non-empty string.",
-
SINK_PARTITIONER.key()));
- }
+ if
(partitioner.equals(SINK_PARTITIONER_VALUE_ROUND_ROBIN) &&
tableOptions.getOptional(KEY_FIELDS).isPresent()) {
+ throw new
ValidationException("Currently 'round-robin' partitioner only works when option
'key.fields' is not specified.");
+ } else if (partitioner.isEmpty()) {
+ throw new ValidationException(
+
String.format("Option '%s' should be a non-empty string.",
+
SINK_PARTITIONER.key()));
}
});
}
@@ -440,6 +438,7 @@ public class KafkaOptions {
switch (partitioner) {
case SINK_PARTITIONER_VALUE_FIXED:
return Optional.of(new
FlinkFixedPartitioner<>());
+ case SINK_PARTITIONER_VALUE_DEFAULT:
case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
return Optional.empty();
// Default fallback to full class name
of the partitioner.
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index 6777272..e788cc8 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -502,13 +502,26 @@ public class KafkaDynamicTableFactoryTest extends
TestLogger {
+ "class 'abc'")));
final Map<String, String> modifiedOptions = getModifiedOptions(
- getBasicSourceOptions(),
+ getBasicSinkOptions(),
options -> options.put("sink.partitioner",
"abc"));
createTableSink(SCHEMA, modifiedOptions);
}
@Test
+ public void testInvalidRoundRobinPartitionerWithKeyFields() {
+ thrown.expect(ValidationException.class);
+ thrown.expect(containsCause(new ValidationException("Currently
'round-robin' partitioner only works " +
+ "when option 'key.fields' is not specified.")));
+
+ final Map<String, String> modifiedOptions = getModifiedOptions(
+ getKeyValueOptions(),
+ options -> options.put("sink.partitioner",
"round-robin"));
+
+ createTableSink(SCHEMA, modifiedOptions);
+ }
+
+ @Test
public void testInvalidSinkSemantic() {
thrown.expect(ValidationException.class);
thrown.expect(containsCause(new
ValidationException("Unsupported value 'xyz' for 'sink.semantic'. "