[ 
https://issues.apache.org/jira/browse/FLINK-9979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16565402#comment-16565402
 ] 

ASF GitHub Bot commented on FLINK-9979:
---------------------------------------

asfgit closed pull request #6440: [FLINK-9979] [table] Support a 
FlinkKafkaPartitioner for Kafka table sink factory
URL: https://github.com/apache/flink/pull/6440
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
index 2ad31420789..8471908a9cf 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
@@ -24,6 +24,7 @@
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.types.Row;
 
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -73,16 +74,23 @@ public Kafka010JsonTableSink(String topic, Properties 
properties, FlinkKafkaPart
        }
 
        @Override
-       protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, 
Properties properties, SerializationSchema<Row> serializationSchema, 
FlinkKafkaPartitioner<Row> partitioner) {
+       protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+                       String topic,
+                       Properties properties,
+                       SerializationSchema<Row> serializationSchema,
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner) {
                return new FlinkKafkaProducer010<>(
                        topic,
                        serializationSchema,
                        properties,
-                       partitioner);
+                       partitioner.orElse(new FlinkFixedPartitioner<>()));
        }
 
        @Override
        protected Kafka010JsonTableSink createCopy() {
-               return new Kafka010JsonTableSink(topic, properties, 
partitioner);
+               return new Kafka010JsonTableSink(
+                       topic,
+                       properties,
+                       partitioner.orElse(new FlinkFixedPartitioner<>()));
        }
 }
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
index a8c65539824..1d408b8ab52 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java
@@ -24,6 +24,7 @@
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.types.Row;
 
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -36,7 +37,7 @@ public Kafka010TableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
-                       FlinkKafkaPartitioner<Row> partitioner,
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner,
                        SerializationSchema<Row> serializationSchema) {
                super(
                        schema,
@@ -51,11 +52,11 @@ public Kafka010TableSink(
                        String topic,
                        Properties properties,
                        SerializationSchema<Row> serializationSchema,
-                       FlinkKafkaPartitioner<Row> partitioner) {
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner) {
                return new FlinkKafkaProducer010<>(
                        topic,
                        serializationSchema,
                        properties,
-                       partitioner);
+                       partitioner.orElse(null));
        }
 }
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
index 0cf94995bdd..ecf12b27a08 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java
@@ -77,7 +77,7 @@ protected KafkaTableSink createKafkaTableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
-                       FlinkKafkaPartitioner<Row> partitioner,
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner,
                        SerializationSchema<Row> serializationSchema) {
 
                return new Kafka010TableSink(
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
index 339420cede3..9208f6583b8 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
@@ -40,7 +40,10 @@ protected KafkaTableSink createTableSink(
                        Properties properties,
                        FlinkKafkaPartitioner<Row> partitioner) {
 
-               return new Kafka010JsonTableSink(topic, properties, 
partitioner);
+               return new Kafka010JsonTableSink(
+                       topic,
+                       properties,
+                       partitioner);
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
index cc198c91595..dac8a4dacdd 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java
@@ -85,7 +85,7 @@ protected KafkaTableSink getExpectedKafkaTableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
-                       FlinkKafkaPartitioner<Row> partitioner,
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner,
                        SerializationSchema<Row> serializationSchema) {
 
                return new Kafka010TableSink(
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
index 22c6da13b05..8d81a5b59a1 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java
@@ -39,7 +39,7 @@ public Kafka011TableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
-                       FlinkKafkaPartitioner<Row> partitioner,
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner,
                        SerializationSchema<Row> serializationSchema) {
                super(
                        schema,
@@ -54,11 +54,11 @@ public Kafka011TableSink(
                        String topic,
                        Properties properties,
                        SerializationSchema<Row> serializationSchema,
-                       FlinkKafkaPartitioner<Row> partitioner) {
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner) {
                return new FlinkKafkaProducer011<>(
                        topic,
                        new 
KeyedSerializationSchemaWrapper<>(serializationSchema),
                        properties,
-                       Optional.of(partitioner));
+                       partitioner);
        }
 }
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
index c26df42ed4a..e6f677fb568 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java
@@ -77,7 +77,7 @@ protected KafkaTableSink createKafkaTableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
-                       FlinkKafkaPartitioner<Row> partitioner,
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner,
                        SerializationSchema<Row> serializationSchema) {
 
                return new Kafka011TableSink(
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
index 996c5083860..f4614761d21 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java
@@ -85,7 +85,7 @@ protected KafkaTableSink getExpectedKafkaTableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
-                       FlinkKafkaPartitioner<Row> partitioner,
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner,
                        SerializationSchema<Row> serializationSchema) {
 
                return new Kafka011TableSink(
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 45588cdb141..189a9fdf46b 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -26,6 +26,7 @@
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.types.Row;
 
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -92,17 +93,24 @@ public Kafka08JsonTableSink(String topic, Properties 
properties, KafkaPartitione
        }
 
        @Override
-       protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, 
Properties properties, SerializationSchema<Row> serializationSchema, 
FlinkKafkaPartitioner<Row> partitioner) {
+       protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+                       String topic,
+                       Properties properties,
+                       SerializationSchema<Row> serializationSchema,
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner) {
                return new FlinkKafkaProducer08<>(
                        topic,
                        serializationSchema,
                        properties,
-                       partitioner);
+                       partitioner.orElse(new FlinkFixedPartitioner<>()));
        }
 
        @Override
        protected Kafka08JsonTableSink createCopy() {
-               return new Kafka08JsonTableSink(topic, properties, partitioner);
+               return new Kafka08JsonTableSink(
+                       topic,
+                       properties,
+                       partitioner.orElse(new FlinkFixedPartitioner<>()));
        }
 }
 
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
index c34de13efde..146cfc90739 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java
@@ -24,6 +24,7 @@
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.types.Row;
 
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -36,7 +37,7 @@ public Kafka08TableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
-                       FlinkKafkaPartitioner<Row> partitioner,
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner,
                        SerializationSchema<Row> serializationSchema) {
                super(
                        schema,
@@ -51,11 +52,11 @@ public Kafka08TableSink(
                        String topic,
                        Properties properties,
                        SerializationSchema<Row> serializationSchema,
-                       FlinkKafkaPartitioner<Row> partitioner) {
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner) {
                return new FlinkKafkaProducer08<>(
                        topic,
                        serializationSchema,
                        properties,
-                       partitioner);
+                       partitioner.orElse(null));
        }
 }
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
index 3e93b6fdeac..aeccd4f1ac3 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java
@@ -77,7 +77,7 @@ protected KafkaTableSink createKafkaTableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
-                       FlinkKafkaPartitioner<Row> partitioner,
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner,
                        SerializationSchema<Row> serializationSchema) {
 
                return new Kafka08TableSink(
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 32bd3b69c06..fc46ad4c6ee 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -40,7 +40,10 @@ protected KafkaTableSink createTableSink(
                        Properties properties,
                        FlinkKafkaPartitioner<Row> partitioner) {
 
-               return new Kafka08JsonTableSink(topic, properties, partitioner);
+               return new Kafka08JsonTableSink(
+                       topic,
+                       properties,
+                       partitioner);
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
index b67501e449e..ff633ec0246 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java
@@ -85,7 +85,7 @@ protected KafkaTableSink getExpectedKafkaTableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
-                       FlinkKafkaPartitioner<Row> partitioner,
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner,
                        SerializationSchema<Row> serializationSchema) {
 
                return new Kafka08TableSink(
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index b3cc0aa77ca..33634590061 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -26,6 +26,7 @@
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.types.Row;
 
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -92,16 +93,23 @@ public Kafka09JsonTableSink(String topic, Properties 
properties, KafkaPartitione
        }
 
        @Override
-       protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, 
Properties properties, SerializationSchema<Row> serializationSchema, 
FlinkKafkaPartitioner<Row> partitioner) {
+       protected FlinkKafkaProducerBase<Row> createKafkaProducer(
+                       String topic,
+                       Properties properties,
+                       SerializationSchema<Row> serializationSchema,
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner) {
                return new FlinkKafkaProducer09<>(
                        topic,
                        serializationSchema,
                        properties,
-                       partitioner);
+                       partitioner.orElse(new FlinkFixedPartitioner<>()));
        }
 
        @Override
        protected Kafka09JsonTableSink createCopy() {
-               return new Kafka09JsonTableSink(topic, properties, partitioner);
+               return new Kafka09JsonTableSink(
+                       topic,
+                       properties,
+                       partitioner.orElse(new FlinkFixedPartitioner<>()));
        }
 }
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
index 8c349d7a0b2..6e38aad1a39 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java
@@ -24,6 +24,7 @@
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.types.Row;
 
+import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -36,7 +37,7 @@ public Kafka09TableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
-                       FlinkKafkaPartitioner<Row> partitioner,
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner,
                        SerializationSchema<Row> serializationSchema) {
                super(
                        schema,
@@ -51,11 +52,11 @@ public Kafka09TableSink(
                        String topic,
                        Properties properties,
                        SerializationSchema<Row> serializationSchema,
-                       FlinkKafkaPartitioner<Row> partitioner) {
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner) {
                return new FlinkKafkaProducer09<>(
                        topic,
                        serializationSchema,
                        properties,
-                       partitioner);
+                       partitioner.orElse(null));
        }
 }
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
index 9958b4ef316..19f51508b93 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java
@@ -77,7 +77,7 @@ protected KafkaTableSink createKafkaTableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
-                       FlinkKafkaPartitioner<Row> partitioner,
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner,
                        SerializationSchema<Row> serializationSchema) {
 
                return new Kafka09TableSink(
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index 79f251b8302..97b5c7d88a2 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -40,7 +40,10 @@ protected KafkaTableSink createTableSink(
                        Properties properties,
                        FlinkKafkaPartitioner<Row> partitioner) {
 
-               return new Kafka09JsonTableSink(topic, properties, partitioner);
+               return new Kafka09JsonTableSink(
+                       topic,
+                       properties,
+                       partitioner);
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
index a6c8bd4b279..d54c3945949 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java
@@ -85,7 +85,7 @@ protected KafkaTableSink getExpectedKafkaTableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
-                       FlinkKafkaPartitioner<Row> partitioner,
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner,
                        SerializationSchema<Row> serializationSchema) {
 
                return new Kafka09TableSink(
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 7853bb702a5..a85d536eac9 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -40,7 +40,7 @@
  * A version-agnostic Kafka {@link AppendStreamTableSink}.
  *
  * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #createKafkaProducer(String, Properties, 
SerializationSchema, FlinkKafkaPartitioner)}}.
+ * override {@link #createKafkaProducer(String, Properties, 
SerializationSchema, Optional)}}.
  */
 @Internal
 public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
@@ -60,7 +60,7 @@
        protected Optional<SerializationSchema<Row>> serializationSchema;
 
        /** Partitioner to select Kafka partition for each item. */
-       protected final FlinkKafkaPartitioner<Row> partitioner;
+       protected final Optional<FlinkKafkaPartitioner<Row>> partitioner;
 
        // legacy variables
        protected String[] fieldNames;
@@ -70,7 +70,7 @@ protected KafkaTableSink(
                        TableSchema schema,
                        String topic,
                        Properties properties,
-                       FlinkKafkaPartitioner<Row> partitioner,
+                       Optional<FlinkKafkaPartitioner<Row>> partitioner,
                        SerializationSchema<Row> serializationSchema) {
                this.schema = Optional.of(Preconditions.checkNotNull(schema, 
"Schema must not be null."));
                this.topic = Preconditions.checkNotNull(topic, "Topic must not 
be null.");
@@ -96,7 +96,7 @@ public KafkaTableSink(
                this.schema = Optional.empty();
                this.topic = Preconditions.checkNotNull(topic, "topic");
                this.properties = Preconditions.checkNotNull(properties, 
"properties");
-               this.partitioner = Preconditions.checkNotNull(partitioner, 
"partitioner");
+               this.partitioner = 
Optional.of(Preconditions.checkNotNull(partitioner, "partitioner"));
                this.serializationSchema = Optional.empty();
        }
 
@@ -113,7 +113,7 @@ public KafkaTableSink(
                String topic,
                Properties properties,
                SerializationSchema<Row> serializationSchema,
-               FlinkKafkaPartitioner<Row> partitioner);
+               Optional<FlinkKafkaPartitioner<Row>> partitioner);
 
        /**
         * Create serialization schema for converting table rows into bytes.
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
index 27b2e67ce0b..5634331adbb 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
@@ -38,6 +38,7 @@
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
 import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -54,6 +55,11 @@
 import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
 import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
 import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_CLASS;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_FIXED;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN;
 import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
 import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
 import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
@@ -105,6 +111,8 @@
                properties.add(CONNECTOR_STARTUP_MODE);
                properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + 
CONNECTOR_SPECIFIC_OFFSETS_PARTITION);
                properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + 
CONNECTOR_SPECIFIC_OFFSETS_OFFSET);
+               properties.add(CONNECTOR_SINK_PARTITIONER);
+               properties.add(CONNECTOR_SINK_PARTITIONER_CLASS);
 
                // schema
                properties.add(SCHEMA() + ".#." + SCHEMA_TYPE());
@@ -170,7 +178,7 @@
                        schema,
                        topic,
                        getKafkaProperties(descriptorProperties),
-                       getFlinkKafkaPartitioner(),
+                       getFlinkKafkaPartitioner(descriptorProperties),
                        getSerializationSchema(properties));
        }
 
@@ -228,7 +236,7 @@ protected abstract KafkaTableSink createKafkaTableSink(
                TableSchema schema,
                String topic,
                Properties properties,
-               FlinkKafkaPartitioner<Row> partitioner,
+               Optional<FlinkKafkaPartitioner<Row>> partitioner,
                SerializationSchema<Row> serializationSchema);
 
        // 
--------------------------------------------------------------------------------------------
@@ -314,9 +322,24 @@ private StartupOptions getStartupOptions(
                return options;
        }
 
-       private FlinkKafkaPartitioner<Row> getFlinkKafkaPartitioner() {
-               // we don't support custom partitioner so far
-               return new FlinkFixedPartitioner<>();
+       @SuppressWarnings("unchecked")
+       private Optional<FlinkKafkaPartitioner<Row>> 
getFlinkKafkaPartitioner(DescriptorProperties descriptorProperties) {
+               return descriptorProperties
+                       .getOptionalString(CONNECTOR_SINK_PARTITIONER)
+                       .flatMap((String partitionerString) -> {
+                               switch (partitionerString) {
+                                       case 
CONNECTOR_SINK_PARTITIONER_VALUE_FIXED:
+                                               return Optional.of(new 
FlinkFixedPartitioner<>());
+                                       case 
CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN:
+                                               return Optional.empty();
+                                       case 
CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM:
+                                               final Class<? extends 
FlinkKafkaPartitioner> partitionerClass =
+                                                       
descriptorProperties.getClass(CONNECTOR_SINK_PARTITIONER_CLASS, 
FlinkKafkaPartitioner.class);
+                                               return 
Optional.of(InstantiationUtil.instantiate(partitionerClass));
+                                       default:
+                                               throw new 
TableException("Unsupported sink partitioner. Validator should have checked 
that.");
+                               }
+                       });
        }
 
        private boolean checkForCustomFieldMapping(DescriptorProperties 
descriptorProperties, TableSchema schema) {
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
index 45359587c1c..e44341a991e 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
@@ -34,6 +35,11 @@
 import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES;
 import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY;
 import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_CLASS;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_FIXED;
+import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN;
 import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS;
 import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET;
 import static 
org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION;
@@ -51,6 +57,8 @@
        private StartupMode startupMode;
        private Map<Integer, Long> specificOffsets;
        private Map<String, String> kafkaProperties;
+       private String sinkPartitionerType;
+       private Class<? extends FlinkKafkaPartitioner> sinkPartitionerClass;
 
        /**
         * Connector descriptor for the Apache Kafka message queue.
@@ -175,6 +183,69 @@ public Kafka startFromSpecificOffset(int partition, long 
specificOffset) {
                return this;
        }
 
+       /**
+        * Configures how to partition records from Flink's partitions into 
Kafka's partitions.
+        *
+        * <p>This strategy ensures that each Flink partition ends up in one 
Kafka partition.
+        *
+        * <p>Note: One Kafka partition can contain multiple Flink partitions. 
Examples:
+        *
+        * <p>More Flink partitions than Kafka partitions. Some (or all) Kafka 
partitions contain
+        * the output of more than one flink partition:
+        * <pre>
+        *     Flink Sinks            Kafka Partitions
+        *         1    ----------------&gt;    1
+        *         2    --------------/
+        *         3    -------------/
+        *         4    ------------/
+        * </pre>
+        *
+        *
+        * <p>Fewer Flink partitions than Kafka partitions:
+        * <pre>
+        *     Flink Sinks            Kafka Partitions
+        *         1    ----------------&gt;    1
+        *         2    ----------------&gt;    2
+        *                                      3
+        *                                      4
+        *                                      5
+        * </pre>
+        *
+        * @see 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner
+        */
+       public Kafka sinkPartitionerFixed() {
+               sinkPartitionerType = CONNECTOR_SINK_PARTITIONER_VALUE_FIXED;
+               sinkPartitionerClass = null;
+               return this;
+       }
+
+       /**
+        * Configures how to partition records from Flink's partitions into 
Kafka's partitions.
+        *
+        * <p>This strategy ensures that records will be distributed to Kafka 
partitions in a
+        * round-robin fashion.
+        *
+        * <p>Note: This strategy is useful to avoid an unbalanced 
partitioning. However, it will
+        * cause a lot of network connections between all the Flink instances 
and all the Kafka brokers.
+        */
+       public Kafka sinkPartitionerRoundRobin() {
+               sinkPartitionerType = 
CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN;
+               sinkPartitionerClass = null;
+               return this;
+       }
+
+       /**
+        * Configures how to partition records from Flink's partitions into 
Kafka's partitions.
+        *
+        * <p>This strategy allows for a custom partitioner by providing an 
implementation
+        * of {@link FlinkKafkaPartitioner}.
+        */
+       public Kafka sinkPartitionerCustom(Class<? extends 
FlinkKafkaPartitioner> partitionerClass) {
+               sinkPartitionerType = CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM;
+               sinkPartitionerClass = 
Preconditions.checkNotNull(partitionerClass);
+               return this;
+       }
+
        /**
         * Internal method for connector properties conversion.
         */
@@ -212,5 +283,12 @@ public void addConnectorProperties(DescriptorProperties 
properties) {
                                        .collect(Collectors.toList())
                                );
                }
+
+               if (sinkPartitionerType != null) {
+                       properties.putString(CONNECTOR_SINK_PARTITIONER, 
sinkPartitionerType);
+                       if (sinkPartitionerClass != null) {
+                               
properties.putClass(CONNECTOR_SINK_PARTITIONER_CLASS, sinkPartitionerClass);
+                       }
+               }
        }
 }
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
index 3adc7c518a4..cad37f8f8cd 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
@@ -48,12 +48,27 @@
        public static final String CONNECTOR_PROPERTIES = 
"connector.properties";
        public static final String CONNECTOR_PROPERTIES_KEY = "key";
        public static final String CONNECTOR_PROPERTIES_VALUE = "value";
+       public static final String CONNECTOR_SINK_PARTITIONER = 
"connector.sink-partitioner";
+       public static final String CONNECTOR_SINK_PARTITIONER_VALUE_FIXED = 
"fixed";
+       public static final String CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN 
= "round-robin";
+       public static final String CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM = 
"custom";
+       public static final String CONNECTOR_SINK_PARTITIONER_CLASS = 
"connector.sink-partitioner-class";
 
        @Override
        public void validate(DescriptorProperties properties) {
                super.validate(properties);
                properties.validateValue(CONNECTOR_TYPE(), 
CONNECTOR_TYPE_VALUE_KAFKA, false);
 
+               validateVersion(properties);
+
+               validateStartupMode(properties);
+
+               validateKafkaProperties(properties);
+
+               validateSinkPartitioner(properties);
+       }
+
+       private void validateVersion(DescriptorProperties properties) {
                final List<String> versions = Arrays.asList(
                        CONNECTOR_VERSION_VALUE_08,
                        CONNECTOR_VERSION_VALUE_09,
@@ -61,7 +76,9 @@ public void validate(DescriptorProperties properties) {
                        CONNECTOR_VERSION_VALUE_011);
                properties.validateEnumValues(CONNECTOR_VERSION(), false, 
versions);
                properties.validateString(CONNECTOR_TOPIC, false, 1, 
Integer.MAX_VALUE);
+       }
 
+       private void validateStartupMode(DescriptorProperties properties) {
                final Map<String, Consumer<String>> specificOffsetValidators = 
new HashMap<>();
                specificOffsetValidators.put(
                        CONNECTOR_SPECIFIC_OFFSETS_PARTITION,
@@ -86,17 +103,29 @@ public void validate(DescriptorProperties properties) {
                        CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS,
                        prefix -> 
properties.validateFixedIndexedProperties(CONNECTOR_SPECIFIC_OFFSETS, false, 
specificOffsetValidators));
                properties.validateEnum(CONNECTOR_STARTUP_MODE, true, 
startupModeValidation);
+       }
 
+       private void validateKafkaProperties(DescriptorProperties properties) {
                final Map<String, Consumer<String>> propertyValidators = new 
HashMap<>();
                propertyValidators.put(
                        CONNECTOR_PROPERTIES_KEY,
-                       prefix -> properties.validateString(prefix + 
CONNECTOR_PROPERTIES_KEY, false, 1, Integer.MAX_VALUE));
+                       prefix -> properties.validateString(prefix + 
CONNECTOR_PROPERTIES_KEY, false, 1));
                propertyValidators.put(
                        CONNECTOR_PROPERTIES_VALUE,
-                       prefix -> properties.validateString(prefix + 
CONNECTOR_PROPERTIES_VALUE, false, 0, Integer.MAX_VALUE));
+                       prefix -> properties.validateString(prefix + 
CONNECTOR_PROPERTIES_VALUE, false, 0));
                properties.validateFixedIndexedProperties(CONNECTOR_PROPERTIES, 
true, propertyValidators);
        }
 
+       private void validateSinkPartitioner(DescriptorProperties properties) {
+               final Map<String, Consumer<String>> sinkPartitionerValidators = 
new HashMap<>();
+               
sinkPartitionerValidators.put(CONNECTOR_SINK_PARTITIONER_VALUE_FIXED, 
properties.noValidation());
+               
sinkPartitionerValidators.put(CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN, 
properties.noValidation());
+               sinkPartitionerValidators.put(
+                       CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM,
+                       prefix -> 
properties.validateString(CONNECTOR_SINK_PARTITIONER_CLASS, false, 1));
+               properties.validateEnum(CONNECTOR_SINK_PARTITIONER, true, 
sinkPartitionerValidators);
+       }
+
        // utilities
 
        public static String normalizeStartupMode(StartupMode startupMode) {
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index 946b6eb5895..b4bb89dc048 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -30,6 +30,7 @@
 
 import org.junit.Test;
 
+import java.util.Optional;
 import java.util.Properties;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -59,7 +60,7 @@
 
        @SuppressWarnings("unchecked")
        @Test
-       public void testKafkaTableSink() throws Exception {
+       public void testKafkaTableSink() {
                DataStream dataStream = mock(DataStream.class);
                
when(dataStream.addSink(any(SinkFunction.class))).thenReturn(mock(DataStreamSink.class));
 
@@ -74,7 +75,7 @@ public void testKafkaTableSink() throws Exception {
                        eq(TOPIC),
                        eq(PROPERTIES),
                        any(getSerializationSchemaClass()),
-                       eq(PARTITIONER));
+                       eq(Optional.of(PARTITIONER)));
        }
 
        @Test
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
index 504bed16a74..5e9144c1e57 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -153,6 +153,7 @@ public void testTableSource() {
                                        .version(getKafkaVersion())
                                        .topic(TOPIC)
                                        .properties(KAFKA_PROPERTIES)
+                                       .sinkPartitionerRoundRobin() // test if 
accepted although not needed
                                        .startFromSpecificOffsets(OFFSETS))
                        .withFormat(new TestTableFormat())
                        .withSchema(
@@ -194,7 +195,7 @@ public void testTableSink() {
                        schema,
                        TOPIC,
                        KAFKA_PROPERTIES,
-                       new FlinkFixedPartitioner<>(), // a custom partitioner 
is not support yet
+                       Optional.of(new FlinkFixedPartitioner<>()),
                        new TestSerializationSchema(schema.toRowType()));
 
                // construct table sink using descriptors and table sink factory
@@ -204,6 +205,7 @@ public void testTableSink() {
                                        .version(getKafkaVersion())
                                        .topic(TOPIC)
                                        .properties(KAFKA_PROPERTIES)
+                                       .sinkPartitionerFixed()
                                        .startFromSpecificOffsets(OFFSETS)) // 
test if they accepted although not needed
                        .withFormat(new TestTableFormat())
                        .withSchema(
@@ -299,6 +301,6 @@ protected abstract KafkaTableSink getExpectedKafkaTableSink(
                TableSchema schema,
                String topic,
                Properties properties,
-               FlinkKafkaPartitioner<Row> partitioner,
+               Optional<FlinkKafkaPartitioner<Row>> partitioner,
                SerializationSchema<Row> serializationSchema);
 }
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java
index f3d96f1c443..c67bc4dcf20 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.table.descriptors;
 
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -59,7 +61,8 @@
                                .version("0.11")
                                .topic("MyTable")
                                .startFromSpecificOffsets(offsets)
-                               .properties(properties);
+                               .properties(properties)
+                               
.sinkPartitionerCustom(FlinkFixedPartitioner.class);
 
                return Arrays.asList(earliestDesc, specificOffsetsDesc, 
specificOffsetsMapDesc);
        }
@@ -102,6 +105,8 @@
                props3.put("connector.properties.0.value", "12");
                props3.put("connector.properties.1.key", "kafka.stuff");
                props3.put("connector.properties.1.value", "42");
+               props3.put("connector.sink-partitioner", "custom");
+               props3.put("connector.sink-partitioner-class", 
FlinkFixedPartitioner.class.getName());
 
                return Arrays.asList(props1, props2, props3);
        }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Support a custom FlinkKafkaPartitioner for a Kafka table sink factory
> ---------------------------------------------------------------------
>
>                 Key: FLINK-9979
>                 URL: https://issues.apache.org/jira/browse/FLINK-9979
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API &amp; SQL
>            Reporter: Timo Walther
>            Assignee: Timo Walther
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, the Kafka table sink factory does not support a custom 
> FlinkKafkaPartitioner. However, this is needed for many use cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to