[FLINK-6288] [kafka] Cleanup and improvements to FlinkKafkaPartitioner custom 
partitioning

This commit wraps up some general improvements to the new Kafka sink
custom partitioning API, most notably:
1. remove deprecated constructors from base classes, as they are not
user-facing.
2. modify producer IT test to test custom partitioning for dynamic
topics.
3. improve documentation and Javadocs of the new interfaces.

This closes #3901.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e3fcbb08
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e3fcbb08
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e3fcbb08

Branch: refs/heads/master
Commit: e3fcbb087568a33aa03f58ba0dc359b1a6b02bfd
Parents: 9ed9b68
Author: Tzu-Li (Gordon) Tai <[email protected]>
Authored: Thu May 18 21:52:16 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Fri May 19 14:38:48 2017 +0800

----------------------------------------------------------------------
 docs/dev/connectors/kafka.md                    |   4 +-
 .../connectors/kafka/FlinkKafkaProducer010.java | 113 +++++-----
 .../connectors/kafka/FlinkKafkaProducer08.java  |  50 +++--
 .../connectors/kafka/Kafka08JsonTableSink.java  |  21 +-
 .../kafka/Kafka08JsonTableSinkTest.java         |  25 +--
 .../connectors/kafka/FlinkKafkaProducer09.java  |  52 +++--
 .../connectors/kafka/Kafka09JsonTableSink.java  |  28 +--
 .../kafka/Kafka09JsonTableSinkTest.java         |  26 +--
 .../kafka/FlinkKafkaProducerBase.java           |  89 ++++----
 .../connectors/kafka/KafkaJsonTableSink.java    |  14 --
 .../connectors/kafka/KafkaTableSink.java        |  40 +---
 .../kafka/partitioner/FixedPartitioner.java     |  77 -------
 .../partitioner/FlinkFixedPartitioner.java      |  19 +-
 .../FlinkKafkaDelegatePartitioner.java          |   5 +-
 .../partitioner/FlinkKafkaPartitioner.java      |  24 +-
 .../kafka/partitioner/KafkaPartitioner.java     |  16 +-
 .../kafka/FlinkKafkaProducerBaseTest.java       |  38 ++--
 .../connectors/kafka/KafkaConsumerTestBase.java |  98 ++++----
 .../connectors/kafka/KafkaProducerTestBase.java | 222 ++++++++++++-------
 .../kafka/KafkaTableSinkTestBase.java           |  51 +----
 .../connectors/kafka/TestFixedPartitioner.java  | 104 ---------
 .../TestFlinkKafkaDelegatePartitioner.java      | 111 ----------
 .../kafka/testutils/Tuple2Partitioner.java      |  49 ----
 23 files changed, 458 insertions(+), 818 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/docs/dev/connectors/kafka.md
----------------------------------------------------------------------
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 60a8039..bc7e7de 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -456,9 +456,9 @@ are other constructor variants that allow providing the 
following:
  Please refer to the [Apache Kafka 
documentation](https://kafka.apache.org/documentation.html) for
  details on how to configure Kafka Producers.
  * *Custom partitioner*: To assign records to specific
- partitions, you can provide an implementation of a `KafkaPartitioner` to the
+ partitions, you can provide an implementation of a `FlinkKafkaPartitioner` to 
the
  constructor. This partitioner will be called for each record in the stream
- to determine which exact partition the record will be sent to.
+ to determine which exact partition of the target topic the record should be 
sent to.
  * *Advanced serialization schema*: Similar to the consumer,
  the producer also allows using an advanced serialization schema called 
`KeyedSerializationSchema`,
  which allows serializing the key and value separately. It also allows to 
override the target topic,

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index 7addafa..711fe07 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -38,6 +39,7 @@ import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.kafka.clients.producer.ProducerRecord;
 
+import static 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPartitionsByTopic;
 import static 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
 
 
@@ -121,32 +123,6 @@ public class FlinkKafkaProducer010<T> extends 
StreamSink<T> implements SinkFunct
         *  @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
         *  @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
         *  @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
-        *  @deprecated Use {@link 
FlinkKafkaProducer010Configuration#writeToKafkaWithTimestamps(DataStream, 
String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
-        */
-       @Deprecated
-       public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
-                                                                               
                                                                                
        String topicId,
-                                                                               
                                                                                
        KeyedSerializationSchema<T> serializationSchema,
-                                                                               
                                                                                
        Properties producerConfig,
-                                                                               
                                                                                
        KafkaPartitioner<T> customPartitioner) {
-
-               GenericTypeInfo<Object> objectTypeInfo = new 
GenericTypeInfo<>(Object.class);
-               FlinkKafkaProducer010<T> kafkaProducer = new 
FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, 
customPartitioner);
-               SingleOutputStreamOperator<Object> transformation = 
inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
-               return new FlinkKafkaProducer010Configuration<>(transformation, 
kafkaProducer);
-       }
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
-        * the topic.
-        *
-        * This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
-        *
-        *  @param inStream The stream to write to Kafka
-        *  @param topicId The name of the target topic
-        *  @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
-        *  @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        *  @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
         */
        public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
                                                                                
                                                                                
        String topicId,
@@ -200,21 +176,6 @@ public class FlinkKafkaProducer010<T> extends 
StreamSink<T> implements SinkFunct
         * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
         * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
-        * @deprecated Use {@link 
FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead
-        */
-       @Deprecated
-       public FlinkKafkaProducer010(String topicId, SerializationSchema<T> 
serializationSchema, Properties producerConfig, KafkaPartitioner<T> 
customPartitioner) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
-       }
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
-        * the topic.
-        *
-        * @param topicId The topic to write data to
-        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
-        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
         */
        public FlinkKafkaProducer010(String topicId, SerializationSchema<T> 
serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> 
customPartitioner) {
                this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
@@ -251,32 +212,84 @@ public class FlinkKafkaProducer010<T> extends 
StreamSink<T> implements SinkFunct
        public FlinkKafkaProducer010(String topicId, 
KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
                this(topicId, serializationSchema, producerConfig, new 
FlinkFixedPartitioner<T>());
        }
-
+       
        /**
         * Create Kafka producer
         *
         * This constructor does not allow writing timestamps to Kafka, it 
follow approach (a) (see above)
-        * @deprecated Use {@link 
FlinkKafkaProducer010#FlinkKafkaProducer010(String, KeyedSerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead
         */
-       @Deprecated
-       public FlinkKafkaProducer010(String topicId, 
KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, 
KafkaPartitioner<T> customPartitioner) {
+       public FlinkKafkaProducer010(String topicId, 
KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, 
FlinkKafkaPartitioner<T> customPartitioner) {
                // We create a Kafka 09 producer instance here and only 
"override" (by intercepting) the
                // invoke call.
                super(new FlinkKafkaProducer09<>(topicId, serializationSchema, 
producerConfig, customPartitioner));
        }
-       
+
+       // ----------------------------- Deprecated constructors / factory 
methods  ---------------------------
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+        * the topic.
+        *
+        * This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
+        *
+        *  @param inStream The stream to write to Kafka
+        *  @param topicId The name of the target topic
+        *  @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
+        *  @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        *  @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        *
+        *  @deprecated This is a deprecated since it does not correctly handle 
partitioning when
+        *              producing to multiple topics. Use
+        *              {@link 
FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead.
+        */
+       @Deprecated
+       public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
+                                                                               
                                                                                
        String topicId,
+                                                                               
                                                                                
        KeyedSerializationSchema<T> serializationSchema,
+                                                                               
                                                                                
        Properties producerConfig,
+                                                                               
                                                                                
        KafkaPartitioner<T> customPartitioner) {
+
+               GenericTypeInfo<Object> objectTypeInfo = new 
GenericTypeInfo<>(Object.class);
+               FlinkKafkaProducer010<T> kafkaProducer =
+                               new FlinkKafkaProducer010<>(topicId, 
serializationSchema, producerConfig, new 
FlinkKafkaDelegatePartitioner<>(customPartitioner));
+               SingleOutputStreamOperator<Object> transformation = 
inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
+               return new FlinkKafkaProducer010Configuration<>(transformation, 
kafkaProducer);
+       }
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param topicId The topic to write data to
+        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+        *
+        * @deprecated This is a deprecated since it does not correctly handle 
partitioning when
+        *             producing to multiple topics. Use
+        *             {@link 
FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead.
+        */
+       @Deprecated
+       public FlinkKafkaProducer010(String topicId, SerializationSchema<T> 
serializationSchema, Properties producerConfig, KafkaPartitioner<T> 
customPartitioner) {
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
+       }
+
        /**
         * Create Kafka producer
         *
         * This constructor does not allow writing timestamps to Kafka, it 
follow approach (a) (see above)
+        *
+        * @deprecated This is a deprecated constructor that does not correctly 
handle partitioning when
+        *             producing to multiple topics. Use
+        *             {@link 
FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead.
         */
-       public FlinkKafkaProducer010(String topicId, 
KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, 
FlinkKafkaPartitioner<T> customPartitioner) {
+       @Deprecated
+       public FlinkKafkaProducer010(String topicId, 
KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, 
KafkaPartitioner<T> customPartitioner) {
                // We create a Kafka 09 producer instance here and only 
"override" (by intercepting) the
                // invoke call.
-               super(new FlinkKafkaProducer09<>(topicId, serializationSchema, 
producerConfig, customPartitioner));
+               super(new FlinkKafkaProducer09<>(topicId, serializationSchema, 
producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner)));
        }
 
-
        // ----------------------------- Generic element processing  
---------------------------
 
        private void invokeInternal(T next, long elementTimestamp) throws 
Exception {
@@ -300,7 +313,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> 
implements SinkFunct
                ProducerRecord<byte[], byte[]> record;
                int[] partitions = 
internalProducer.topicPartitionsMap.get(targetTopic);
                if(null == partitions) {
-                       partitions = 
internalProducer.getPartitionsByTopic(targetTopic, internalProducer.producer);
+                       partitions = getPartitionsByTopic(targetTopic, 
internalProducer.producer);
                        internalProducer.topicPartitionsMap.put(targetTopic, 
partitions);
                }
                if (internalProducer.flinkKafkaPartitioner == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
index 64d3716..08dcb2f 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -76,21 +77,6 @@ public class FlinkKafkaProducer08<IN> extends 
FlinkKafkaProducerBase<IN>  {
         * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
         * @param customPartitioner A serializable partitioner for assining 
messages to Kafka partitions.
-        * @deprecated Use {@link 
FlinkKafkaProducer08#FlinkKafkaProducer08(String, SerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead
-        */
-       @Deprecated
-       public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, KafkaPartitioner<IN> 
customPartitioner) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
-
-       }
-
-       /**
-        * The main constructor for creating a FlinkKafkaProducer.
-        *
-        * @param topicId The topic to write data to
-        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
-        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assining 
messages to Kafka partitions.
         */
        public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> 
customPartitioner) {
                this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
@@ -136,13 +122,30 @@ public class FlinkKafkaProducer08<IN> extends 
FlinkKafkaProducerBase<IN>  {
         * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
         * @param customPartitioner A serializable partitioner for assining 
messages to Kafka partitions.
-        * @deprecated Use {@link 
FlinkKafkaProducer08#FlinkKafkaProducer08(String, KeyedSerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead
         */
-       @Deprecated
-       public FlinkKafkaProducer08(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
KafkaPartitioner<IN> customPartitioner) {
+       public FlinkKafkaProducer08(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
FlinkKafkaPartitioner<IN> customPartitioner) {
                super(topicId, serializationSchema, producerConfig, 
customPartitioner);
        }
 
+       // ------------------- Deprecated constructors ----------------------
+
+       /**
+        * The main constructor for creating a FlinkKafkaProducer.
+        *
+        * @param topicId The topic to write data to
+        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assining 
messages to Kafka partitions.
+        *
+        * @deprecated This is a deprecated constructor that does not correctly 
handle partitioning when
+        *             producing to multiple topics. Use
+        *             {@link FlinkKafkaProducer08(String, SerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead.
+        */
+       @Deprecated
+       public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, KafkaPartitioner<IN> 
customPartitioner) {
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
+       }
+
        /**
         * The main constructor for creating a FlinkKafkaProducer.
         *
@@ -150,11 +153,18 @@ public class FlinkKafkaProducer08<IN> extends 
FlinkKafkaProducerBase<IN>  {
         * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
         * @param customPartitioner A serializable partitioner for assining 
messages to Kafka partitions.
+        *
+        * @deprecated This is a deprecated constructor that does not correctly 
handle partitioning when
+        *             producing to multiple topics. Use
+        *             {@link FlinkKafkaProducer08(String, 
KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead.
         */
-       public FlinkKafkaProducer08(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
FlinkKafkaPartitioner<IN> customPartitioner) {
-               super(topicId, serializationSchema, producerConfig, 
customPartitioner);
+       @Deprecated
+       public FlinkKafkaProducer08(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
KafkaPartitioner<IN> customPartitioner) {
+               super(topicId, serializationSchema, producerConfig, new 
FlinkKafkaDelegatePartitioner<>(customPartitioner));
        }
 
+       // ---------------------------------------------------------------------
+
        @Override
        protected void flush() {
                // The Kafka 0.8 producer doesn't support flushing, we wait here

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 5a066ec0..80bd180 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -28,7 +29,7 @@ import java.util.Properties;
  * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format.
  */
 public class Kafka08JsonTableSink extends KafkaJsonTableSink {
-
+       
        /**
         * Creates {@link KafkaTableSink} for Kafka 0.8
         *
@@ -36,24 +37,24 @@ public class Kafka08JsonTableSink extends 
KafkaJsonTableSink {
         * @param properties properties to connect to Kafka
         * @param partitioner Kafka partitioner
         */
-       public Kafka08JsonTableSink(String topic, Properties properties, 
KafkaPartitioner<Row> partitioner) {
+       public Kafka08JsonTableSink(String topic, Properties properties, 
FlinkKafkaPartitioner<Row> partitioner) {
                super(topic, properties, partitioner);
        }
-       
+
        /**
         * Creates {@link KafkaTableSink} for Kafka 0.8
         *
         * @param topic topic in Kafka to which table is written
         * @param properties properties to connect to Kafka
         * @param partitioner Kafka partitioner
+        *
+        * @deprecated This is a deprecated constructor that does not correctly 
handle partitioning when
+        *             producing to multiple topics. Use
+        *             {@link #Kafka08JsonTableSink(String, Properties, 
FlinkKafkaPartitioner)} instead.
         */
-       public Kafka08JsonTableSink(String topic, Properties properties, 
FlinkKafkaPartitioner<Row> partitioner) {
-               super(topic, properties, partitioner);
-       }
-
-       @Override
-       protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, 
Properties properties, SerializationSchema<Row> serializationSchema, 
KafkaPartitioner<Row> partitioner) {
-               return new FlinkKafkaProducer08<>(topic, serializationSchema, 
properties, partitioner);
+       @Deprecated
+       public Kafka08JsonTableSink(String topic, Properties properties, 
KafkaPartitioner<Row> partitioner) {
+               super(topic, properties, new 
FlinkKafkaDelegatePartitioner<>(partitioner));
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 164c162..2136476 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
@@ -28,26 +27,20 @@ import java.util.Properties;
 public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
 
        @Override
-       protected KafkaTableSink createTableSink(String topic, Properties 
properties, KafkaPartitioner<Row> partitioner,
+       protected KafkaTableSink createTableSink(
+                       String topic,
+                       Properties properties,
+                       FlinkKafkaPartitioner<Row> partitioner,
                        final FlinkKafkaProducerBase<Row> kafkaProducer) {
 
                return new Kafka08JsonTableSink(topic, properties, partitioner) 
{
                        @Override
-                       protected FlinkKafkaProducerBase<Row> 
createKafkaProducer(String topic, Properties properties,
-                                       SerializationSchema<Row> 
serializationSchema, KafkaPartitioner<Row> partitioner) {
-                               return kafkaProducer;
-                       }
-               };
-       }
+                       protected FlinkKafkaProducerBase<Row> 
createKafkaProducer(
+                                       String topic,
+                                       Properties properties,
+                                       SerializationSchema<Row> 
serializationSchema,
+                                       FlinkKafkaPartitioner<Row> partitioner) 
{
 
-       @Override
-       protected KafkaTableSink createTableSinkWithFlinkPartitioner(String 
topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner,
-                       final FlinkKafkaProducerBase<Row> kafkaProducer) {
-               
-               return new Kafka08JsonTableSink(topic, properties, partitioner) 
{
-                       @Override
-                       protected FlinkKafkaProducerBase<Row> 
createKafkaProducer(String topic, Properties properties,
-                                       SerializationSchema<Row> 
serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
                                return kafkaProducer;
                        }
                };

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index 4f41c43..cbed361 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -78,22 +79,6 @@ public class FlinkKafkaProducer09<IN> extends 
FlinkKafkaProducerBase<IN> {
         * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
         * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
-        * @deprecated Use {@link 
FlinkKafkaProducer09#FlinkKafkaProducer09(String, SerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead
-        */
-       @Deprecated
-       public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, KafkaPartitioner<IN> 
customPartitioner) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
-
-       }
-
-       /**
-        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
-        * the topic.
-        *
-        * @param topicId The topic to write data to
-        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
-        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
         */
        public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> 
customPartitioner) {
                this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
@@ -140,13 +125,31 @@ public class FlinkKafkaProducer09<IN> extends 
FlinkKafkaProducerBase<IN> {
         * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
         * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
-        * @deprecated Use {@link 
FlinkKafkaProducer09#FlinkKafkaProducer09(String, KeyedSerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead
         */
-       @Deprecated
-       public FlinkKafkaProducer09(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
KafkaPartitioner<IN> customPartitioner) {
+       public FlinkKafkaProducer09(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
FlinkKafkaPartitioner<IN> customPartitioner) {
                super(topicId, serializationSchema, producerConfig, 
customPartitioner);
        }
 
+       // ------------------- Deprecated constructors ----------------------
+
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param topicId The topic to write data to
+        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+        *
+        * @deprecated This is a deprecated constructor that does not correctly 
handle partitioning when
+        *             producing to multiple topics. Use
+        *             {@link FlinkKafkaProducer09(String, SerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead.
+        */
+       @Deprecated
+       public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, KafkaPartitioner<IN> 
customPartitioner) {
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
+       }
+
        /**
         * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
         * the topic.
@@ -155,11 +158,18 @@ public class FlinkKafkaProducer09<IN> extends 
FlinkKafkaProducerBase<IN> {
         * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
         * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        *
+        * @deprecated This is a deprecated constructor that does not correctly 
handle partitioning when
+        *             producing to multiple topics. Use
+        *             {@link FlinkKafkaProducer09(String, 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema, 
Properties, FlinkKafkaPartitioner)} instead.
         */
-       public FlinkKafkaProducer09(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
FlinkKafkaPartitioner<IN> customPartitioner) {
-               super(topicId, serializationSchema, producerConfig, 
customPartitioner);
+       @Deprecated
+       public FlinkKafkaProducer09(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
KafkaPartitioner<IN> customPartitioner) {
+               super(topicId, serializationSchema, producerConfig, new 
FlinkKafkaDelegatePartitioner<>(customPartitioner));
        }
 
+       // ------------------------------------------------------------------
+
        @Override
        protected void flush() {
                if (this.producer != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index b82ebc4..a81422e 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -28,43 +29,32 @@ import java.util.Properties;
  * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format.
  */
 public class Kafka09JsonTableSink extends KafkaJsonTableSink {
+       
        /**
         * Creates {@link KafkaTableSink} for Kafka 0.9
         *
         * @param topic topic in Kafka to which table is written
         * @param properties properties to connect to Kafka
         * @param partitioner Kafka partitioner
-        * @deprecated Use {@link 
Kafka09JsonTableSink#Kafka09JsonTableSink(String, Properties, 
FlinkKafkaPartitioner)} instead
         */
-       @Deprecated
-       public Kafka09JsonTableSink(String topic, Properties properties, 
KafkaPartitioner<Row> partitioner) {
+       public Kafka09JsonTableSink(String topic, Properties properties, 
FlinkKafkaPartitioner<Row> partitioner) {
                super(topic, properties, partitioner);
        }
-       
+
        /**
         * Creates {@link KafkaTableSink} for Kafka 0.9
         *
         * @param topic topic in Kafka to which table is written
         * @param properties properties to connect to Kafka
         * @param partitioner Kafka partitioner
-        */
-       public Kafka09JsonTableSink(String topic, Properties properties, 
FlinkKafkaPartitioner<Row> partitioner) {
-               super(topic, properties, partitioner);
-       }
-
-       /**
         *
-        * @param topic               Kafka topic to produce to.
-        * @param properties          Properties for the Kafka producer.
-        * @param serializationSchema Serialization schema to use to create 
Kafka records.
-        * @param partitioner         Partitioner to select Kafka partition.
-        * @return The version-specific Kafka producer
-        * @deprecated Use {@link 
Kafka09JsonTableSink#createKafkaProducer(String, Properties, 
SerializationSchema, FlinkKafkaPartitioner)} instead
+        * @deprecated This is a deprecated constructor that does not correctly 
handle partitioning when
+        *             producing to multiple topics. Use
+        *             {@link #Kafka09JsonTableSink(String, Properties, 
FlinkKafkaPartitioner)} instead.
         */
        @Deprecated
-       @Override
-       protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, 
Properties properties, SerializationSchema<Row> serializationSchema, 
KafkaPartitioner<Row> partitioner) {
-               return new FlinkKafkaProducer09<>(topic, serializationSchema, 
properties, partitioner);
+       public Kafka09JsonTableSink(String topic, Properties properties, 
KafkaPartitioner<Row> partitioner) {
+               super(topic, properties, new 
FlinkKafkaDelegatePartitioner<>(partitioner));
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index ad8f623..3afb5e4 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
@@ -27,28 +26,21 @@ import java.util.Properties;
 
 public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
 
-       @Deprecated
        @Override
-       protected KafkaTableSink createTableSink(String topic, Properties 
properties, KafkaPartitioner<Row> partitioner,
+       protected KafkaTableSink createTableSink(
+                       String topic,
+                       Properties properties,
+                       FlinkKafkaPartitioner<Row> partitioner,
                        final FlinkKafkaProducerBase<Row> kafkaProducer) {
 
                return new Kafka09JsonTableSink(topic, properties, partitioner) 
{
                        @Override
-                       protected FlinkKafkaProducerBase<Row> 
createKafkaProducer(String topic, Properties properties,
-                                       SerializationSchema<Row> 
serializationSchema, KafkaPartitioner<Row> partitioner) {
-                               return kafkaProducer;
-                       }
-               };
-       }
+                       protected FlinkKafkaProducerBase<Row> 
createKafkaProducer(
+                                       String topic,
+                                       Properties properties,
+                                       SerializationSchema<Row> 
serializationSchema,
+                                       FlinkKafkaPartitioner<Row> partitioner) 
{
 
-       @Override
-       protected KafkaTableSink createTableSinkWithFlinkPartitioner(String 
topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner,
-                       final FlinkKafkaProducerBase<Row> kafkaProducer) {
-
-               return new Kafka09JsonTableSink(topic, properties, partitioner) 
{
-                       @Override
-                       protected FlinkKafkaProducerBase<Row> 
createKafkaProducer(String topic, Properties properties,
-                                       SerializationSchema<Row> 
serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
                                return kafkaProducer;
                        }
                };

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index f9a1e41..3a8228c 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -40,7 +40,6 @@ import 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
 import org.apache.kafka.clients.producer.Callback;
@@ -74,38 +73,38 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
        private static final long serialVersionUID = 1L;
 
        /**
-        * Configuration key for disabling the metrics reporting
+        * Configuration key for disabling the metrics reporting.
         */
        public static final String KEY_DISABLE_METRICS = 
"flink.disable-metrics";
 
        /**
-        * User defined properties for the Producer
+        * User defined properties for the Producer.
         */
        protected final Properties producerConfig;
 
        /**
-        * The name of the default topic this producer is writing data to
+        * The name of the default topic this producer is writing data to.
         */
        protected final String defaultTopicId;
 
        /**
-        * (Serializable) SerializationSchema for turning objects used with 
Flink into
+        * (Serializable) SerializationSchema for turning objects used with 
Flink into.
         * byte[] for Kafka.
         */
        protected final KeyedSerializationSchema<IN> schema;
 
        /**
-        * User-provided partitioner for assigning an object to a Kafka 
partition for each topic
+        * User-provided partitioner for assigning an object to a Kafka 
partition for each topic.
         */
-       protected final FlinkKafkaPartitioner flinkKafkaPartitioner;
+       protected final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
 
        /**
-        * Partitions for each topic
+        * Partitions of each topic
         */
        protected final Map<String, int[]> topicPartitionsMap;
 
        /**
-        * Flag indicating whether to accept failures (and log them), or to 
fail on failures
+        * Flag indicating whether to accept failures (and log them), or to 
fail on failures.
         */
        protected boolean logFailuresOnly;
 
@@ -114,11 +113,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
         */
        protected boolean flushOnCheckpoint;
 
-       /**
-        * Retry times of fetching kafka meta
-        */
-       protected long kafkaMetaRetryTimes;
-
        // -------------------------------- Runtime fields 
------------------------------------------
 
        /** KafkaProducer instance */
@@ -138,21 +132,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
 
        protected OperatorStateStore stateStore;
 
-
-       /**
-        * The main constructor for creating a FlinkKafkaProducer. For 
customPartitioner parameter, use {@link 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner} 
instead
-        *
-        * @param defaultTopicId The default topic to write data to
-        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
-        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions. Passing null will use Kafka's partitioner.
-        * @deprecated Use {@link 
FlinkKafkaProducerBase#FlinkKafkaProducerBase(String, KeyedSerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead
-        */
-       @Deprecated
-       public FlinkKafkaProducerBase(String defaultTopicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
KafkaPartitioner<IN> customPartitioner) {
-               this(defaultTopicId, serializationSchema, producerConfig, null 
== customPartitioner ? null : new 
FlinkKafkaDelegatePartitioner<>(customPartitioner));
-       }
-
        /**
         * The main constructor for creating a FlinkKafkaProducer.
         *
@@ -236,9 +215,11 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
                producer = getKafkaProducer(this.producerConfig);
 
                RuntimeContext ctx = getRuntimeContext();
+
                if(null != flinkKafkaPartitioner) {
                        if(flinkKafkaPartitioner instanceof 
FlinkKafkaDelegatePartitioner) {
-                               
((FlinkKafkaDelegatePartitioner)flinkKafkaPartitioner).setPartitions(getPartitionsByTopic(this.defaultTopicId,
 this.producer));
+                               ((FlinkKafkaDelegatePartitioner) 
flinkKafkaPartitioner).setPartitions(
+                                               
getPartitionsByTopic(this.defaultTopicId, this.producer));
                        }
                        flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks());
                }
@@ -290,26 +271,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
                }
        }
 
-       protected int[] getPartitionsByTopic(String topic, 
KafkaProducer<byte[], byte[]> producer) {
-               // the fetched list is immutable, so we're creating a mutable 
copy in order to sort it
-               List<PartitionInfo> partitionsList = new 
ArrayList<>(producer.partitionsFor(topic));
-
-               // sort the partitions by partition id to make sure the fetched 
partition list is the same across subtasks
-               Collections.sort(partitionsList, new 
Comparator<PartitionInfo>() {
-                               @Override
-                               public int compare(PartitionInfo o1, 
PartitionInfo o2) {
-                               return Integer.compare(o1.partition(), 
o2.partition());
-                       }
-                       });
-
-               int[] partitions = new int[partitionsList.size()];
-               for (int i = 0; i < partitions.length; i++) {
-                       partitions[i] = partitionsList.get(i).partition();
-               }
-
-               return partitions;
-       }
-
        /**
         * Called when new data arrives to the sink, and forwards it to Kafka.
         *
@@ -330,7 +291,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
 
                int[] partitions = this.topicPartitionsMap.get(targetTopic);
                if(null == partitions) {
-                       partitions = this.getPartitionsByTopic(targetTopic, 
producer);
+                       partitions = getPartitionsByTopic(targetTopic, 
producer);
                        this.topicPartitionsMap.put(targetTopic, partitions);
                }
 
@@ -338,7 +299,11 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
                if (flinkKafkaPartitioner == null) {
                        record = new ProducerRecord<>(targetTopic, 
serializedKey, serializedValue);
                } else {
-                       record = new ProducerRecord<>(targetTopic, 
flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, 
targetTopic, partitions), serializedKey, serializedValue);
+                       record = new ProducerRecord<>(
+                                       targetTopic,
+                                       flinkKafkaPartitioner.partition(next, 
serializedKey, serializedValue, targetTopic, partitions),
+                                       serializedKey,
+                                       serializedValue);
                }
                if (flushOnCheckpoint) {
                        synchronized (pendingRecordsLock) {
@@ -425,6 +390,26 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
                return props;
        }
 
+       protected static int[] getPartitionsByTopic(String topic, 
KafkaProducer<byte[], byte[]> producer) {
+               // the fetched list is immutable, so we're creating a mutable 
copy in order to sort it
+               List<PartitionInfo> partitionsList = new 
ArrayList<>(producer.partitionsFor(topic));
+
+               // sort the partitions by partition id to make sure the fetched 
partition list is the same across subtasks
+               Collections.sort(partitionsList, new 
Comparator<PartitionInfo>() {
+                       @Override
+                       public int compare(PartitionInfo o1, PartitionInfo o2) {
+                               return Integer.compare(o1.partition(), 
o2.partition());
+                       }
+               });
+
+               int[] partitions = new int[partitionsList.size()];
+               for (int i = 0; i < partitions.length; i++) {
+                       partitions[i] = partitionsList.get(i).partition();
+               }
+
+               return partitions;
+       }
+
        @VisibleForTesting
        protected long numPendingRecords() {
                synchronized (pendingRecordsLock) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index a0b5033..41bb329 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.types.Row;
@@ -29,19 +28,6 @@ import java.util.Properties;
  * Base class for {@link KafkaTableSink} that serializes data in JSON format
  */
 public abstract class KafkaJsonTableSink extends KafkaTableSink {
-
-       /**
-        * Creates KafkaJsonTableSink
-        *
-        * @param topic topic in Kafka to which table is written
-        * @param properties properties to connect to Kafka
-        * @param partitioner Kafka partitioner
-        * @deprecated Use {@link KafkaJsonTableSink#KafkaJsonTableSink(String, 
Properties, FlinkKafkaPartitioner)} instead
-        */
-       @Deprecated
-       public KafkaJsonTableSink(String topic, Properties properties, 
KafkaPartitioner<Row> partitioner) {
-               super(topic, properties, partitioner);
-       }
        
        /**
         * Creates KafkaJsonTableSink

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index 0a937d6..1c38816 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -18,13 +18,11 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.Preconditions;
 
@@ -34,7 +32,7 @@ import java.util.Properties;
  * A version-agnostic Kafka {@link AppendStreamTableSink}.
  *
  * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #createKafkaProducer(String, Properties, 
SerializationSchema, KafkaPartitioner)}}.
+ * override {@link #createKafkaProducer(String, Properties, 
SerializationSchema, FlinkKafkaPartitioner)}}.
  */
 public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
 
@@ -47,22 +45,6 @@ public abstract class KafkaTableSink implements 
AppendStreamTableSink<Row> {
 
        /**
         * Creates KafkaTableSink
-        *
-        * @param topic                 Kafka topic to write to.
-        * @param properties            Properties for the Kafka consumer.
-        * @param partitioner           Partitioner to select Kafka partition 
for each item
-        * @deprecated Use {@link KafkaTableSink#KafkaTableSink(String, 
Properties, FlinkKafkaPartitioner)} instead
-        */
-       @Deprecated
-       public KafkaTableSink(
-                       String topic,
-                       Properties properties,
-                       KafkaPartitioner<Row> partitioner) {
-               this(topic, properties, new 
FlinkKafkaDelegatePartitioner<Row>(partitioner));
-       }
-
-       /**
-        * Creates KafkaTableSink
         * 
         * @param topic                 Kafka topic to write to.
         * @param properties            Properties for the Kafka consumer.
@@ -85,22 +67,6 @@ public abstract class KafkaTableSink implements 
AppendStreamTableSink<Row> {
         * @param serializationSchema Serialization schema to use to create 
Kafka records.
         * @param partitioner         Partitioner to select Kafka partition.
         * @return The version-specific Kafka producer
-        * @deprecated Use {@link KafkaTableSink#createKafkaProducer(String, 
Properties, SerializationSchema, FlinkKafkaPartitioner)} instead
-        */
-       @Deprecated
-       protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
-               String topic, Properties properties,
-               SerializationSchema<Row> serializationSchema,
-               KafkaPartitioner<Row> partitioner);
-
-       /**
-        * Returns the version-specifid Kafka producer.
-        *
-        * @param topic               Kafka topic to produce to.
-        * @param properties          Properties for the Kafka producer.
-        * @param serializationSchema Serialization schema to use to create 
Kafka records.
-        * @param partitioner         Partitioner to select Kafka partition.
-        * @return The version-specific Kafka producer
         */
        protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
                String topic, Properties properties,
@@ -153,8 +119,4 @@ public abstract class KafkaTableSink implements 
AppendStreamTableSink<Row> {
 
                return copy;
        }
-
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
deleted file mode 100644
index edabfe0..0000000
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.partitioner;
-
-import java.io.Serializable;
-
-/**
- * A partitioner ensuring that each internal Flink partition ends up in one 
Kafka partition.
- *
- * Note, one Kafka partition can contain multiple Flink partitions.
- *
- * Cases:
- *     # More Flink partitions than kafka partitions
- * <pre>
- *             Flink Sinks:            Kafka Partitions
- *                     1       ----------------&gt;    1
- *                     2   --------------/
- *                     3   -------------/
- *                     4       ------------/
- * </pre>
- * Some (or all) kafka partitions contain the output of more than one flink 
partition
- *
- *# Fewer Flink partitions than Kafka
- * <pre>
- *             Flink Sinks:            Kafka Partitions
- *                     1       ----------------&gt;    1
- *                     2       ----------------&gt;    2
- *                                                                             
3
- *                                                                             
4
- *                                                                             
5
- * </pre>
- *
- *  Not all Kafka partitions contain data
- *  To avoid such an unbalanced partitioning, use a round-robin kafka 
partitioner. (note that this will
- *  cause a lot of network connections between all the Flink instances and all 
the Kafka brokers
- *  @deprecated Use {@link FlinkFixedPartitioner} instead.
- *
- */
-@Deprecated
-public class FixedPartitioner<T> extends KafkaPartitioner<T> implements 
Serializable {
-       private static final long serialVersionUID = 1627268846962918126L;
-
-       private int targetPartition = -1;
-
-       @Override
-       public void open(int parallelInstanceId, int parallelInstances, int[] 
partitions) {
-               if (parallelInstanceId < 0 || parallelInstances <= 0 || 
partitions.length == 0) {
-                       throw new IllegalArgumentException();
-               }
-               
-               this.targetPartition = partitions[parallelInstanceId % 
partitions.length];
-       }
-
-       @Override
-       public int partition(T next, byte[] serializedKey, byte[] 
serializedValue, int numPartitions) {
-               if (targetPartition >= 0) {
-                       return targetPartition;
-               } else {
-                       throw new RuntimeException("The partitioner has not 
been initialized properly");
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
index d2eb7af..e47c667 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.streaming.connectors.kafka.partitioner;
 
+import org.apache.flink.util.Preconditions;
+
 /**
  * A partitioner ensuring that each internal Flink partition ends up in one 
Kafka partition.
  *
@@ -44,9 +46,8 @@ package 
org.apache.flink.streaming.connectors.kafka.partitioner;
  * </pre>
  *
  *  Not all Kafka partitions contain data
- *  To avoid such an unbalanced partitioning, use a round-robin kafka 
partitioner. (note that this will
- *  cause a lot of network connections between all the Flink instances and all 
the Kafka brokers
- *
+ *  To avoid such an unbalanced partitioning, use a round-robin kafka 
partitioner (note that this will
+ *  cause a lot of network connections between all the Flink instances and all 
the Kafka brokers).
  */
 public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
 
@@ -54,17 +55,17 @@ public class FlinkFixedPartitioner<T> extends 
FlinkKafkaPartitioner<T> {
 
        @Override
        public void open(int parallelInstanceId, int parallelInstances) {
-               if (parallelInstanceId < 0 || parallelInstances <= 0) {
-                       throw new IllegalArgumentException();
-               }
+               Preconditions.checkArgument(parallelInstanceId >= 0, "Id of 
this subtask cannot be negative.");
+               Preconditions.checkArgument(parallelInstances > 0, "Number of 
subtasks must be larger than 0.");
+
                this.parallelInstanceId = parallelInstanceId;
        }
        
        @Override
        public int partition(T record, byte[] key, byte[] value, String 
targetTopic, int[] partitions) {
-               if(null == partitions || partitions.length == 0) {
-                       throw new IllegalArgumentException();
-               }
+               Preconditions.checkArgument(
+                       partitions != null && partitions.length > 0,
+                       "Partitions of the target topic is empty.");
                
                return partitions[parallelInstanceId % partitions.length];
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
index 469fd1b..b7b4143 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
@@ -18,8 +18,9 @@
 package org.apache.flink.streaming.connectors.kafka.partitioner;
 
 /**
- * Delegate for KafkaPartitioner
- * @param <T>
+ * Delegate for the deprecated {@link KafkaPartitioner}.
+ * This should only be used for bridging deprecated partitioning API methods.
+ *
  * @deprecated Delegate for {@link KafkaPartitioner}, use {@link 
FlinkKafkaPartitioner} instead
  */
 @Deprecated

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
index e074b9b..b634af7 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
@@ -20,20 +20,34 @@ package 
org.apache.flink.streaming.connectors.kafka.partitioner;
 import java.io.Serializable;
 
 /**
- * It contains a open() method which is called on each parallel instance.
- * Partitioners must be serializable!
+ * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records
+ * across partitions of multiple Kafka topics.
  */
 public abstract class FlinkKafkaPartitioner<T> implements Serializable {
+
        private static final long serialVersionUID = -9086719227828020494L;
 
        /**
-        * Initializer for the Partitioner.
-        * @param parallelInstanceId 0-indexed id of the parallel instance in 
Flink
+        * Initializer for the partitioner. This is called once on each 
parallel sink instance of
+        * the Flink Kafka producer. This method should be overridden if 
necessary.
+        *
+        * @param parallelInstanceId 0-indexed id of the parallel sink instance 
in Flink
         * @param parallelInstances the total number of parallel instances
         */
        public void open(int parallelInstanceId, int parallelInstances) {
                // overwrite this method if needed.
        }
-       
+
+       /**
+        * Determine the id of the partition that the record should be written 
to.
+        *
+        * @param record the record value
+        * @param key serialized key of the record
+        * @param value serialized value of the record
+        * @param targetTopic target topic for the record
+        * @param partitions found partitions for the target topic
+        *
+        * @return the id of the target partition
+        */
        public abstract int partition(T record, byte[] key, byte[] value, 
String targetTopic, int[] partitions);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
index 7c82bd1..eebc619 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -22,7 +22,9 @@ import java.io.Serializable;
 /**
  * It contains a open() method which is called on each parallel instance.
  * Partitioners must be serializable!
- * @deprecated Use {@link FlinkKafkaPartitioner} instead.
+ *
+ * @deprecated This partitioner does not handle partitioning properly in the 
case of
+ *             multiple topics, and has been deprecated. Please use {@link 
FlinkKafkaPartitioner} instead.
  */
 @Deprecated
 public abstract class KafkaPartitioner<T> implements Serializable {
@@ -34,22 +36,10 @@ public abstract class KafkaPartitioner<T> implements 
Serializable {
         * @param parallelInstanceId 0-indexed id of the parallel instance in 
Flink
         * @param parallelInstances the total number of parallel instances
         * @param partitions an array describing the partition IDs of the 
available Kafka partitions.
-        * @deprecated Use {@link FlinkKafkaPartitioner#open(int, int)} instead.
         */
-       @Deprecated
        public void open(int parallelInstanceId, int parallelInstances, int[] 
partitions) {
                // overwrite this method if needed.
        }
 
-       /**
-        *
-        * @param next
-        * @param serializedKey
-        * @param serializedValue
-        * @param numPartitions
-        * @return
-        * @deprecated Use {@link FlinkKafkaPartitioner#partition(T, byte[], 
byte[], String, int[])} instead.
-        */
-       @Deprecated
        public abstract int partition(T next, byte[] serializedKey, byte[] 
serializedValue, int numPartitions);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
index 1f16d8e..6b2cc02 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
@@ -22,12 +22,14 @@ import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.configuration.Configuration;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -61,7 +63,7 @@ public class FlinkKafkaProducerBaseTest {
                // no bootstrap servers set in props
                Properties props = new Properties();
                // should throw IllegalArgumentException
-               new DummyFlinkKafkaProducer<>(props, null);
+               new DummyFlinkKafkaProducer<>(props, new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
        }
 
        /**
@@ -72,7 +74,7 @@ public class FlinkKafkaProducerBaseTest {
                Properties props = new Properties();
                props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:12345");
                // should set missing key value deserializers
-               new DummyFlinkKafkaProducer<>(props, null);
+               new DummyFlinkKafkaProducer<>(props, new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
 
                
assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
                
assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
@@ -83,9 +85,10 @@ public class FlinkKafkaProducerBaseTest {
        /**
         * Tests that partitions list is determinate and correctly provided to 
custom partitioner
         */
+       @SuppressWarnings("unchecked")
        @Test
-       public void testPartitionerOpenedWithDeterminatePartitionList() throws 
Exception {
-               KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class);
+       public void testPartitionerInvokedWithDeterminatePartitionList() throws 
Exception {
+               FlinkKafkaPartitioner<String> mockPartitioner = 
mock(FlinkKafkaPartitioner.class);
 
                RuntimeContext mockRuntimeContext = mock(RuntimeContext.class);
                when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0);
@@ -98,8 +101,8 @@ public class FlinkKafkaProducerBaseTest {
                mockPartitionsList.add(new 
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null));
                mockPartitionsList.add(new 
PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null));
 
-               final DummyFlinkKafkaProducer producer = new 
DummyFlinkKafkaProducer(
-                       FakeStandardProducerConfig.get(), mockPartitioner);
+               final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
+                       FakeStandardProducerConfig.get(), new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), mockPartitioner);
                producer.setRuntimeContext(mockRuntimeContext);
 
                final KafkaProducer mockProducer = 
producer.getMockKafkaProducer();
@@ -107,10 +110,11 @@ public class FlinkKafkaProducerBaseTest {
                when(mockProducer.metrics()).thenReturn(null);
 
                producer.open(new Configuration());
+               verify(mockPartitioner, times(1)).open(0, 1);
 
-               // the out-of-order partitions list should be sorted before 
provided to the custom partitioner's open() method
-               int[] correctPartitionList = {0, 1, 2, 3};
-               verify(mockPartitioner).open(0, 1, correctPartitionList);
+               producer.invoke("foobar");
+               verify(mockPartitioner, times(1)).partition(
+                       "foobar", null, "foobar".getBytes(), 
DummyFlinkKafkaProducer.DUMMY_TOPIC, new int[] {0, 1, 2, 3});
        }
 
        /**
@@ -119,7 +123,7 @@ public class FlinkKafkaProducerBaseTest {
        @Test
        public void testAsyncErrorRethrownOnInvoke() throws Throwable {
                final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
-                       FakeStandardProducerConfig.get(), null);
+                       FakeStandardProducerConfig.get(), new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
 
                OneInputStreamOperatorTestHarness<String, Object> testHarness =
                        new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
@@ -150,7 +154,7 @@ public class FlinkKafkaProducerBaseTest {
        @Test
        public void testAsyncErrorRethrownOnCheckpoint() throws Throwable {
                final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
-                       FakeStandardProducerConfig.get(), null);
+                       FakeStandardProducerConfig.get(), new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
 
                OneInputStreamOperatorTestHarness<String, Object> testHarness =
                        new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
@@ -186,7 +190,7 @@ public class FlinkKafkaProducerBaseTest {
        @Test(timeout=5000)
        public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws 
Throwable {
                final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
-                       FakeStandardProducerConfig.get(), null);
+                       FakeStandardProducerConfig.get(), new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
                producer.setFlushOnCheckpoint(true);
 
                final KafkaProducer<?, ?> mockProducer = 
producer.getMockKafkaProducer();
@@ -239,7 +243,7 @@ public class FlinkKafkaProducerBaseTest {
        @Test(timeout=10000)
        public void testAtLeastOnceProducer() throws Throwable {
                final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
-                       FakeStandardProducerConfig.get(), null);
+                       FakeStandardProducerConfig.get(), new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
                producer.setFlushOnCheckpoint(true);
 
                final KafkaProducer<?, ?> mockProducer = 
producer.getMockKafkaProducer();
@@ -299,7 +303,7 @@ public class FlinkKafkaProducerBaseTest {
        @Test(timeout=5000)
        public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws 
Throwable {
                final DummyFlinkKafkaProducer<String> producer = new 
DummyFlinkKafkaProducer<>(
-                       FakeStandardProducerConfig.get(), null);
+                       FakeStandardProducerConfig.get(), new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
                producer.setFlushOnCheckpoint(false);
 
                final KafkaProducer<?, ?> mockProducer = 
producer.getMockKafkaProducer();
@@ -333,9 +337,9 @@ public class FlinkKafkaProducerBaseTest {
                private boolean isFlushed;
 
                @SuppressWarnings("unchecked")
-               DummyFlinkKafkaProducer(Properties producerConfig, 
KafkaPartitioner partitioner) {
+               DummyFlinkKafkaProducer(Properties producerConfig, 
KeyedSerializationSchema<T> schema, FlinkKafkaPartitioner partitioner) {
 
-                       super(DUMMY_TOPIC, (KeyedSerializationSchema<T>) 
mock(KeyedSerializationSchema.class), producerConfig, partitioner);
+                       super(DUMMY_TOPIC, schema, producerConfig, partitioner);
 
                        this.mockProducer = mock(KafkaProducer.class);
                        when(mockProducer.send(any(ProducerRecord.class), 
any(Callback.class))).thenAnswer(new Answer<Object>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 203d814..ac278fb 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -1291,55 +1291,6 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                }
        }
 
-       private static class Tuple2WithTopicSchema implements 
KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
-                       KeyedSerializationSchema<Tuple3<Integer, Integer, 
String>> {
-
-               private final TypeSerializer<Tuple2<Integer, Integer>> ts;
-               
-               public Tuple2WithTopicSchema(ExecutionConfig ec) {
-                       ts = TypeInfoParser.<Tuple2<Integer, 
Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
-               }
-
-               @Override
-               public Tuple3<Integer, Integer, String> deserialize(byte[] 
messageKey, byte[] message, String topic, int partition, long offset) throws 
IOException {
-                       DataInputView in = new DataInputViewStreamWrapper(new 
ByteArrayInputStream(message));
-                       Tuple2<Integer, Integer> t2 = ts.deserialize(in);
-                       return new Tuple3<>(t2.f0, t2.f1, topic);
-               }
-
-               @Override
-               public boolean isEndOfStream(Tuple3<Integer, Integer, String> 
nextElement) {
-                       return false;
-               }
-
-               @Override
-               public TypeInformation<Tuple3<Integer, Integer, String>> 
getProducedType() {
-                       return TypeInfoParser.parse("Tuple3<Integer, Integer, 
String>");
-               }
-
-               @Override
-               public byte[] serializeKey(Tuple3<Integer, Integer, String> 
element) {
-                       return null;
-               }
-
-               @Override
-               public byte[] serializeValue(Tuple3<Integer, Integer, String> 
element) {
-                       ByteArrayOutputStream by = new ByteArrayOutputStream();
-                       DataOutputView out = new 
DataOutputViewStreamWrapper(by);
-                       try {
-                               ts.serialize(new Tuple2<>(element.f0, 
element.f1), out);
-                       } catch (IOException e) {
-                               throw new RuntimeException("Error" ,e);
-                       }
-                       return by.toByteArray();
-               }
-
-               @Override
-               public String getTargetTopic(Tuple3<Integer, Integer, String> 
element) {
-                       return element.f2;
-               }
-       }
-
        /**
         * Test Flink's Kafka integration also with very big records (30MB)
         * see 
http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
@@ -2276,4 +2227,53 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        this.numElementsTotal = state.get(0);
                }
        }
+
+       private static class Tuple2WithTopicSchema implements 
KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
+               KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
+
+               private final TypeSerializer<Tuple2<Integer, Integer>> ts;
+
+               public Tuple2WithTopicSchema(ExecutionConfig ec) {
+                       ts = TypeInfoParser.<Tuple2<Integer, 
Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
+               }
+
+               @Override
+               public Tuple3<Integer, Integer, String> deserialize(byte[] 
messageKey, byte[] message, String topic, int partition, long offset) throws 
IOException {
+                       DataInputView in = new DataInputViewStreamWrapper(new 
ByteArrayInputStream(message));
+                       Tuple2<Integer, Integer> t2 = ts.deserialize(in);
+                       return new Tuple3<>(t2.f0, t2.f1, topic);
+               }
+
+               @Override
+               public boolean isEndOfStream(Tuple3<Integer, Integer, String> 
nextElement) {
+                       return false;
+               }
+
+               @Override
+               public TypeInformation<Tuple3<Integer, Integer, String>> 
getProducedType() {
+                       return TypeInfoParser.parse("Tuple3<Integer, Integer, 
String>");
+               }
+
+               @Override
+               public byte[] serializeKey(Tuple3<Integer, Integer, String> 
element) {
+                       return null;
+               }
+
+               @Override
+               public byte[] serializeValue(Tuple3<Integer, Integer, String> 
element) {
+                       ByteArrayOutputStream by = new ByteArrayOutputStream();
+                       DataOutputView out = new 
DataOutputViewStreamWrapper(by);
+                       try {
+                               ts.serialize(new Tuple2<>(element.f0, 
element.f1), out);
+                       } catch (IOException e) {
+                               throw new RuntimeException("Error" ,e);
+                       }
+                       return by.toByteArray();
+               }
+
+               @Override
+               public String getTargetTopic(Tuple3<Integer, Integer, String> 
element) {
+                       return element.f2;
+               }
+       }
 }

Reply via email to