[FLINK-6288] [kafka] New custom partitioner API that correctly handles multiple 
Kafka sink topics


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9ed9b683
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9ed9b683
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9ed9b683

Branch: refs/heads/master
Commit: 9ed9b68397b51bfd2b0f6e532212a82f771641bd
Parents: ffa9aa0
Author: zjureel <[email protected]>
Authored: Mon May 15 17:41:47 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Fri May 19 14:38:48 2017 +0800

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer010.java |  82 ++++++++++--
 .../connectors/kafka/Kafka010ITCase.java        |   6 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |   8 +-
 .../connectors/kafka/FlinkKafkaProducer.java    |   9 +-
 .../connectors/kafka/FlinkKafkaProducer08.java  |  40 +++++-
 .../connectors/kafka/Kafka08JsonTableSink.java  |  17 +++
 .../kafka/Kafka08JsonTableSinkTest.java         |  14 ++
 .../connectors/kafka/KafkaProducerTest.java     |   5 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |   6 +-
 .../connectors/kafka/FlinkKafkaProducer09.java  |  42 +++++-
 .../connectors/kafka/Kafka09JsonTableSink.java  |  29 +++++
 .../kafka/Kafka09JsonTableSinkTest.java         |  15 +++
 .../connectors/kafka/KafkaProducerTest.java     |   6 +-
 .../kafka/KafkaTestEnvironmentImpl.java         |  10 +-
 .../kafka/FlinkKafkaProducerBase.java           | 127 ++++++++++++-------
 .../connectors/kafka/KafkaJsonTableSink.java    |  16 ++-
 .../connectors/kafka/KafkaTableSink.java        |  35 ++++-
 .../kafka/partitioner/FixedPartitioner.java     |   3 +-
 .../partitioner/FlinkFixedPartitioner.java      |  71 +++++++++++
 .../FlinkKafkaDelegatePartitioner.java          |  47 +++++++
 .../partitioner/FlinkKafkaPartitioner.java      |  39 ++++++
 .../kafka/partitioner/KafkaPartitioner.java     |  14 ++
 .../connectors/kafka/KafkaConsumerTestBase.java | 106 ++++++++--------
 .../connectors/kafka/KafkaProducerTestBase.java |  11 +-
 .../kafka/KafkaTableSinkTestBase.java           |  56 +++++++-
 .../connectors/kafka/KafkaTestEnvironment.java  |  14 +-
 .../kafka/TestFlinkFixedPartitioner.java        | 104 +++++++++++++++
 .../TestFlinkKafkaDelegatePartitioner.java      | 111 ++++++++++++++++
 .../kafka/testutils/DataGenerators.java         |  20 +--
 .../kafka/testutils/Tuple2FlinkPartitioner.java |  45 +++++++
 .../kafka/testutils/Tuple2Partitioner.java      |   9 +-
 31 files changed, 937 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
index cc0194b..7addafa 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import java.util.Properties;
+
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
@@ -27,7 +29,8 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.StreamSink;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
@@ -35,8 +38,6 @@ import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.kafka.clients.producer.ProducerRecord;
 
-import java.util.Properties;
-
 import static 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
 
 
@@ -87,7 +88,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> 
implements SinkFunct
                                                                                
                                                                                
        String topicId,
                                                                                
                                                                                
        KeyedSerializationSchema<T> serializationSchema,
                                                                                
                                                                                
        Properties producerConfig) {
-               return writeToKafkaWithTimestamps(inStream, topicId, 
serializationSchema, producerConfig, new FixedPartitioner<T>());
+               return writeToKafkaWithTimestamps(inStream, topicId, 
serializationSchema, producerConfig, new FlinkFixedPartitioner<T>());
        }
 
 
@@ -106,7 +107,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> 
implements SinkFunct
                                                                                
                                                                                
        String topicId,
                                                                                
                                                                                
        SerializationSchema<T> serializationSchema,
                                                                                
                                                                                
        Properties producerConfig) {
-               return writeToKafkaWithTimestamps(inStream, topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FixedPartitioner<T>());
+               return writeToKafkaWithTimestamps(inStream, topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FlinkFixedPartitioner<T>());
        }
 
        /**
@@ -120,7 +121,9 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> 
implements SinkFunct
         *  @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
         *  @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
         *  @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        *  @deprecated Use {@link 
FlinkKafkaProducer010Configuration#writeToKafkaWithTimestamps(DataStream, 
String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead
         */
+       @Deprecated
        public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
                                                                                
                                                                                
        String topicId,
                                                                                
                                                                                
        KeyedSerializationSchema<T> serializationSchema,
@@ -133,6 +136,30 @@ public class FlinkKafkaProducer010<T> extends 
StreamSink<T> implements SinkFunct
                return new FlinkKafkaProducer010Configuration<>(transformation, 
kafkaProducer);
        }
 
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+        * the topic.
+        *
+        * This constructor allows writing timestamps to Kafka, it follow 
approach (b) (see above)
+        *
+        *  @param inStream The stream to write to Kafka
+        *  @param topicId The name of the target topic
+        *  @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
+        *  @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        *  @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        */
+       public static <T> FlinkKafkaProducer010Configuration<T> 
writeToKafkaWithTimestamps(DataStream<T> inStream,
+                                                                               
                                                                                
        String topicId,
+                                                                               
                                                                                
        KeyedSerializationSchema<T> serializationSchema,
+                                                                               
                                                                                
        Properties producerConfig,
+                                                                               
                                                                                
        FlinkKafkaPartitioner<T> customPartitioner) {
+
+               GenericTypeInfo<Object> objectTypeInfo = new 
GenericTypeInfo<>(Object.class);
+               FlinkKafkaProducer010<T> kafkaProducer = new 
FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, 
customPartitioner);
+               SingleOutputStreamOperator<Object> transformation = 
inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
+               return new FlinkKafkaProducer010Configuration<>(transformation, 
kafkaProducer);
+       }
+
        // ---------------------- Regular constructors w/o timestamp support  
------------------
 
        /**
@@ -147,7 +174,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> 
implements SinkFunct
         *                      User defined (keyless) serialization schema.
         */
        public FlinkKafkaProducer010(String brokerList, String topicId, 
SerializationSchema<T> serializationSchema) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<T>());
        }
 
        /**
@@ -162,7 +189,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> 
implements SinkFunct
         *                      Properties with the producer configuration.
         */
        public FlinkKafkaProducer010(String topicId, SerializationSchema<T> 
serializationSchema, Properties producerConfig) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FixedPartitioner<T>());
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FlinkFixedPartitioner<T>());
        }
 
        /**
@@ -173,11 +200,26 @@ public class FlinkKafkaProducer010<T> extends 
StreamSink<T> implements SinkFunct
         * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
         * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+        * @deprecated Use {@link 
FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead
         */
+       @Deprecated
        public FlinkKafkaProducer010(String topicId, SerializationSchema<T> 
serializationSchema, Properties producerConfig, KafkaPartitioner<T> 
customPartitioner) {
                this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
        }
 
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param topicId The topic to write data to
+        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+        */
+       public FlinkKafkaProducer010(String topicId, SerializationSchema<T> 
serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> 
customPartitioner) {
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
+       }
+
        // ------------------- Key/Value serialization schema constructors 
----------------------
 
        /**
@@ -192,7 +234,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> 
implements SinkFunct
         *                      User defined serialization schema supporting 
key/value messages
         */
        public FlinkKafkaProducer010(String brokerList, String topicId, 
KeyedSerializationSchema<T> serializationSchema) {
-               this(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<T>());
+               this(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<T>());
        }
 
        /**
@@ -207,19 +249,32 @@ public class FlinkKafkaProducer010<T> extends 
StreamSink<T> implements SinkFunct
         *                      Properties with the producer configuration.
         */
        public FlinkKafkaProducer010(String topicId, 
KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) {
-               this(topicId, serializationSchema, producerConfig, new 
FixedPartitioner<T>());
+               this(topicId, serializationSchema, producerConfig, new 
FlinkFixedPartitioner<T>());
        }
 
        /**
         * Create Kafka producer
         *
         * This constructor does not allow writing timestamps to Kafka, it 
follow approach (a) (see above)
+        * @deprecated Use {@link 
FlinkKafkaProducer010#FlinkKafkaProducer010(String, KeyedSerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead
         */
+       @Deprecated
        public FlinkKafkaProducer010(String topicId, 
KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, 
KafkaPartitioner<T> customPartitioner) {
                // We create a Kafka 09 producer instance here and only 
"override" (by intercepting) the
                // invoke call.
                super(new FlinkKafkaProducer09<>(topicId, serializationSchema, 
producerConfig, customPartitioner));
        }
+       
+       /**
+        * Create Kafka producer
+        *
+        * This constructor does not allow writing timestamps to Kafka, it 
follow approach (a) (see above)
+        */
+       public FlinkKafkaProducer010(String topicId, 
KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, 
FlinkKafkaPartitioner<T> customPartitioner) {
+               // We create a Kafka 09 producer instance here and only 
"override" (by intercepting) the
+               // invoke call.
+               super(new FlinkKafkaProducer09<>(topicId, serializationSchema, 
producerConfig, customPartitioner));
+       }
 
 
        // ----------------------------- Generic element processing  
---------------------------
@@ -243,10 +298,15 @@ public class FlinkKafkaProducer010<T> extends 
StreamSink<T> implements SinkFunct
                }
 
                ProducerRecord<byte[], byte[]> record;
-               if (internalProducer.partitioner == null) {
+               int[] partitions = 
internalProducer.topicPartitionsMap.get(targetTopic);
+               if(null == partitions) {
+                       partitions = 
internalProducer.getPartitionsByTopic(targetTopic, internalProducer.producer);
+                       internalProducer.topicPartitionsMap.put(targetTopic, 
partitions);
+               }
+               if (internalProducer.flinkKafkaPartitioner == null) {
                        record = new ProducerRecord<>(targetTopic, null, 
timestamp, serializedKey, serializedValue);
                } else {
-                       record = new ProducerRecord<>(targetTopic, 
internalProducer.partitioner.partition(next, serializedKey, serializedValue, 
internalProducer.partitions.length), timestamp, serializedKey, serializedValue);
+                       record = new ProducerRecord<>(targetTopic, 
internalProducer.flinkKafkaPartitioner.partition(next, serializedKey, 
serializedValue, targetTopic, partitions), timestamp, serializedKey, 
serializedValue);
                }
                if (internalProducer.flushOnCheckpoint) {
                        synchronized (internalProducer.pendingRecordsLock) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
index 39b2b8f..add623e 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java
@@ -33,7 +33,7 @@ import 
org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
@@ -208,11 +208,11 @@ public class Kafka010ITCase extends KafkaConsumerTestBase 
{
                });
 
                final TypeInformationSerializationSchema<Long> longSer = new 
TypeInformationSerializationSchema<>(TypeInfoParser.<Long>parse("Long"), 
env.getConfig());
-               FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = 
FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, 
new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new 
KafkaPartitioner<Long>() {
+               FlinkKafkaProducer010.FlinkKafkaProducer010Configuration prod = 
FlinkKafkaProducer010.writeToKafkaWithTimestamps(streamWithTimestamps, topic, 
new KeyedSerializationSchemaWrapper<>(longSer), standardProps, new 
FlinkKafkaPartitioner<Long>() {
                        private static final long serialVersionUID = 
-6730989584364230617L;
 
                        @Override
-                       public int partition(Long next, byte[] serializedKey, 
byte[] serializedValue, int numPartitions) {
+                       public int partition(Long next, byte[] key, byte[] 
value, String targetTopic, int[] partitions) {
                                return (int)(next % 3);
                        }
                });

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index d27e53a..c88c858 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -30,8 +30,8 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
@@ -46,9 +46,9 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.net.BindException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.HashMap;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -116,7 +116,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        @Override
-       public <T> StreamSink<T> getProducerSink(String topic, 
KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> 
partitioner) {
+       public <T> StreamSink<T> getProducerSink(String topic, 
KeyedSerializationSchema<T> serSchema, Properties props, 
FlinkKafkaPartitioner<T> partitioner) {
                FlinkKafkaProducer010<T> prod = new 
FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
                prod.setFlushOnCheckpoint(true);
                return new StreamSink<>(prod);
@@ -124,7 +124,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
 
 
        @Override
-       public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic, KeyedSerializationSchema<T> serSchema, Properties props, 
KafkaPartitioner<T> partitioner) {
+       public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic, KeyedSerializationSchema<T> serSchema, Properties props, 
FlinkKafkaPartitioner<T> partitioner) {
                FlinkKafkaProducer010<T> prod = new 
FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
                prod.setFlushOnCheckpoint(true);
                return stream.addSink(prod);

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index a7b89f8..98dac3e 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
@@ -37,7 +38,7 @@ public class FlinkKafkaProducer<IN> extends 
FlinkKafkaProducer08<IN>  {
         */
        @Deprecated
        public FlinkKafkaProducer(String brokerList, String topicId, 
SerializationSchema<IN> serializationSchema) {
-               super(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), null);
+               super(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner<IN>)null);
        }
 
        /**
@@ -45,7 +46,7 @@ public class FlinkKafkaProducer<IN> extends 
FlinkKafkaProducer08<IN>  {
         */
        @Deprecated
        public FlinkKafkaProducer(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
-               super(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, null);
+               super(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
(FlinkKafkaPartitioner<IN>)null);
        }
 
        /**
@@ -62,7 +63,7 @@ public class FlinkKafkaProducer<IN> extends 
FlinkKafkaProducer08<IN>  {
         */
        @Deprecated
        public FlinkKafkaProducer(String brokerList, String topicId, 
KeyedSerializationSchema<IN> serializationSchema) {
-               super(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), null);
+               super(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner<IN>)null);
        }
 
        /**
@@ -70,7 +71,7 @@ public class FlinkKafkaProducer<IN> extends 
FlinkKafkaProducer08<IN>  {
         */
        @Deprecated
        public FlinkKafkaProducer(String topicId, KeyedSerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
-               super(topicId, serializationSchema, producerConfig, null);
+               super(topicId, serializationSchema, producerConfig, 
(FlinkKafkaPartitioner<IN>)null);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
index 65de5fc..64d3716 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java
@@ -17,7 +17,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
@@ -50,7 +51,7 @@ public class FlinkKafkaProducer08<IN> extends 
FlinkKafkaProducerBase<IN>  {
         *                      User defined (keyless) serialization schema.
         */
        public FlinkKafkaProducer08(String brokerList, String topicId, 
SerializationSchema<IN> serializationSchema) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
        }
 
        /**
@@ -65,7 +66,7 @@ public class FlinkKafkaProducer08<IN> extends 
FlinkKafkaProducerBase<IN>  {
         *                      Properties with the producer configuration.
         */
        public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FixedPartitioner<IN>());
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FlinkFixedPartitioner<IN>());
        }
 
        /**
@@ -75,12 +76,27 @@ public class FlinkKafkaProducer08<IN> extends 
FlinkKafkaProducerBase<IN>  {
         * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
         * @param customPartitioner A serializable partitioner for assining 
messages to Kafka partitions.
+        * @deprecated Use {@link 
FlinkKafkaProducer08#FlinkKafkaProducer08(String, SerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead
         */
+       @Deprecated
        public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, KafkaPartitioner<IN> 
customPartitioner) {
                this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
 
        }
 
+       /**
+        * The main constructor for creating a FlinkKafkaProducer.
+        *
+        * @param topicId The topic to write data to
+        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assining 
messages to Kafka partitions.
+        */
+       public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> 
customPartitioner) {
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
+
+       }
+
        // ------------------- Key/Value serialization schema constructors 
----------------------
 
        /**
@@ -95,7 +111,7 @@ public class FlinkKafkaProducer08<IN> extends 
FlinkKafkaProducerBase<IN>  {
         *                      User defined serialization schema supporting 
key/value messages
         */
        public FlinkKafkaProducer08(String brokerList, String topicId, 
KeyedSerializationSchema<IN> serializationSchema) {
-               this(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+               this(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
        }
 
        /**
@@ -110,7 +126,7 @@ public class FlinkKafkaProducer08<IN> extends 
FlinkKafkaProducerBase<IN>  {
         *                      Properties with the producer configuration.
         */
        public FlinkKafkaProducer08(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
-               this(topicId, serializationSchema, producerConfig, new 
FixedPartitioner<IN>());
+               this(topicId, serializationSchema, producerConfig, new 
FlinkFixedPartitioner<IN>());
        }
 
        /**
@@ -120,11 +136,25 @@ public class FlinkKafkaProducer08<IN> extends 
FlinkKafkaProducerBase<IN>  {
         * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
         * @param customPartitioner A serializable partitioner for assining 
messages to Kafka partitions.
+        * @deprecated Use {@link 
FlinkKafkaProducer08#FlinkKafkaProducer08(String, KeyedSerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead
         */
+       @Deprecated
        public FlinkKafkaProducer08(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
KafkaPartitioner<IN> customPartitioner) {
                super(topicId, serializationSchema, producerConfig, 
customPartitioner);
        }
 
+       /**
+        * The main constructor for creating a FlinkKafkaProducer.
+        *
+        * @param topicId The topic to write data to
+        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assining 
messages to Kafka partitions.
+        */
+       public FlinkKafkaProducer08(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
FlinkKafkaPartitioner<IN> customPartitioner) {
+               super(topicId, serializationSchema, producerConfig, 
customPartitioner);
+       }
+
        @Override
        protected void flush() {
                // The Kafka 0.8 producer doesn't support flushing, we wait here

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index 839388f..5a066ec0 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
@@ -38,6 +39,17 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink 
{
        public Kafka08JsonTableSink(String topic, Properties properties, 
KafkaPartitioner<Row> partitioner) {
                super(topic, properties, partitioner);
        }
+       
+       /**
+        * Creates {@link KafkaTableSink} for Kafka 0.8
+        *
+        * @param topic topic in Kafka to which table is written
+        * @param properties properties to connect to Kafka
+        * @param partitioner Kafka partitioner
+        */
+       public Kafka08JsonTableSink(String topic, Properties properties, 
FlinkKafkaPartitioner<Row> partitioner) {
+               super(topic, properties, partitioner);
+       }
 
        @Override
        protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, 
Properties properties, SerializationSchema<Row> serializationSchema, 
KafkaPartitioner<Row> partitioner) {
@@ -45,6 +57,11 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink 
{
        }
 
        @Override
+       protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, 
Properties properties, SerializationSchema<Row> serializationSchema, 
FlinkKafkaPartitioner<Row> partitioner) {
+               return new FlinkKafkaProducer08<>(topic, serializationSchema, 
properties, partitioner);
+       }
+
+       @Override
        protected Kafka08JsonTableSink createCopy() {
                return new Kafka08JsonTableSink(topic, properties, partitioner);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 0ac452e..164c162 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
@@ -40,6 +41,19 @@ public class Kafka08JsonTableSinkTest extends 
KafkaTableSinkTestBase {
        }
 
        @Override
+       protected KafkaTableSink createTableSinkWithFlinkPartitioner(String 
topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner,
+                       final FlinkKafkaProducerBase<Row> kafkaProducer) {
+               
+               return new Kafka08JsonTableSink(topic, properties, partitioner) 
{
+                       @Override
+                       protected FlinkKafkaProducerBase<Row> 
createKafkaProducer(String topic, Properties properties,
+                                       SerializationSchema<Row> 
serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+                               return kafkaProducer;
+                       }
+               };
+       }
+
+       @Override
        @SuppressWarnings("unchecked")
        protected SerializationSchema<Row> getSerializationSchema() {
                return new JsonRowSerializationSchema(FIELD_NAMES);

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index 65d7596..c7da5af 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
@@ -83,7 +84,7 @@ public class KafkaProducerTest extends TestLogger {
                        // (1) producer that propagates errors
 
                        FlinkKafkaProducer08<String> producerPropagating = new 
FlinkKafkaProducer08<>(
-                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), null);
+                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner)null);
 
                        OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
                                        new 
OneInputStreamOperatorTestHarness<>(new StreamSink<>(producerPropagating));
@@ -106,7 +107,7 @@ public class KafkaProducerTest extends TestLogger {
                        // (2) producer that only logs errors
 
                        FlinkKafkaProducer08<String> producerLogging = new 
FlinkKafkaProducer08<>(
-                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), null);
+                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner)null);
                        producerLogging.setLogFailuresOnly(true);
 
                        testHarness = new 
OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 643ee8e..2419b53 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -36,8 +36,8 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
 import 
org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
@@ -109,7 +109,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                        String topic,
                        KeyedSerializationSchema<T> serSchema,
                        Properties props,
-                       KafkaPartitioner<T> partitioner) {
+                       FlinkKafkaPartitioner<T> partitioner) {
                FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(
                                topic,
                                serSchema,
@@ -120,7 +120,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        @Override
-       public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic, KeyedSerializationSchema<T> serSchema, Properties props, 
KafkaPartitioner<T> partitioner) {
+       public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic, KeyedSerializationSchema<T> serSchema, Properties props, 
FlinkKafkaPartitioner<T> partitioner) {
                FlinkKafkaProducer08<T> prod = new 
FlinkKafkaProducer08<>(topic, serSchema, props, partitioner);
                prod.setFlushOnCheckpoint(true);
                return stream.addSink(prod);

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
index 2a3e39d..4f41c43 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java
@@ -17,7 +17,8 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
@@ -51,7 +52,7 @@ public class FlinkKafkaProducer09<IN> extends 
FlinkKafkaProducerBase<IN> {
         *                      User defined (keyless) serialization schema.
         */
        public FlinkKafkaProducer09(String brokerList, String topicId, 
SerializationSchema<IN> serializationSchema) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), 
getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
        }
 
        /**
@@ -66,7 +67,7 @@ public class FlinkKafkaProducer09<IN> extends 
FlinkKafkaProducerBase<IN> {
         *                      Properties with the producer configuration.
         */
        public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig) {
-               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FixedPartitioner<IN>());
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new 
FlinkFixedPartitioner<IN>());
        }
 
        /**
@@ -77,12 +78,28 @@ public class FlinkKafkaProducer09<IN> extends 
FlinkKafkaProducerBase<IN> {
         * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
         * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+        * @deprecated Use {@link 
FlinkKafkaProducer09#FlinkKafkaProducer09(String, SerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead
         */
+       @Deprecated
        public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, KafkaPartitioner<IN> 
customPartitioner) {
                this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
 
        }
 
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. the sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param topicId The topic to write data to
+        * @param serializationSchema A (keyless) serializable serialization 
schema for turning user objects into a kafka-consumable byte[]
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)
+        */
+       public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> 
serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> 
customPartitioner) {
+               this(topicId, new 
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, 
customPartitioner);
+
+       }
+
        // ------------------- Key/Value serialization schema constructors 
----------------------
 
        /**
@@ -97,7 +114,7 @@ public class FlinkKafkaProducer09<IN> extends 
FlinkKafkaProducerBase<IN> {
         *                      User defined serialization schema supporting 
key/value messages
         */
        public FlinkKafkaProducer09(String brokerList, String topicId, 
KeyedSerializationSchema<IN> serializationSchema) {
-               this(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), new FixedPartitioner<IN>());
+               this(topicId, serializationSchema, 
getPropertiesFromBrokerList(brokerList), new FlinkFixedPartitioner<IN>());
        }
 
        /**
@@ -112,7 +129,7 @@ public class FlinkKafkaProducer09<IN> extends 
FlinkKafkaProducerBase<IN> {
         *                      Properties with the producer configuration.
         */
        public FlinkKafkaProducer09(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
-               this(topicId, serializationSchema, producerConfig, new 
FixedPartitioner<IN>());
+               this(topicId, serializationSchema, producerConfig, new 
FlinkFixedPartitioner<IN>());
        }
 
        /**
@@ -123,11 +140,26 @@ public class FlinkKafkaProducer09<IN> extends 
FlinkKafkaProducerBase<IN> {
         * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
         * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        * @deprecated Use {@link 
FlinkKafkaProducer09#FlinkKafkaProducer09(String, KeyedSerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead
         */
+       @Deprecated
        public FlinkKafkaProducer09(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
KafkaPartitioner<IN> customPartitioner) {
                super(topicId, serializationSchema, producerConfig, 
customPartitioner);
        }
 
+       /**
+        * Creates a FlinkKafkaProducer for a given topic. The sink produces a 
DataStream to
+        * the topic.
+        *
+        * @param topicId The topic to write data to
+        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions.
+        */
+       public FlinkKafkaProducer09(String topicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
FlinkKafkaPartitioner<IN> customPartitioner) {
+               super(topicId, serializationSchema, producerConfig, 
customPartitioner);
+       }
+
        @Override
        protected void flush() {
                if (this.producer != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index edbebd0..b82ebc4 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
@@ -33,17 +34,45 @@ public class Kafka09JsonTableSink extends 
KafkaJsonTableSink {
         * @param topic topic in Kafka to which table is written
         * @param properties properties to connect to Kafka
         * @param partitioner Kafka partitioner
+        * @deprecated Use {@link 
Kafka09JsonTableSink#Kafka09JsonTableSink(String, Properties, 
FlinkKafkaPartitioner)} instead
         */
+       @Deprecated
        public Kafka09JsonTableSink(String topic, Properties properties, 
KafkaPartitioner<Row> partitioner) {
                super(topic, properties, partitioner);
        }
+       
+       /**
+        * Creates {@link KafkaTableSink} for Kafka 0.9
+        *
+        * @param topic topic in Kafka to which table is written
+        * @param properties properties to connect to Kafka
+        * @param partitioner Kafka partitioner
+        */
+       public Kafka09JsonTableSink(String topic, Properties properties, 
FlinkKafkaPartitioner<Row> partitioner) {
+               super(topic, properties, partitioner);
+       }
 
+       /**
+        *
+        * @param topic               Kafka topic to produce to.
+        * @param properties          Properties for the Kafka producer.
+        * @param serializationSchema Serialization schema to use to create 
Kafka records.
+        * @param partitioner         Partitioner to select Kafka partition.
+        * @return The version-specific Kafka producer
+        * @deprecated Use {@link 
Kafka09JsonTableSink#createKafkaProducer(String, Properties, 
SerializationSchema, FlinkKafkaPartitioner)} instead
+        */
+       @Deprecated
        @Override
        protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, 
Properties properties, SerializationSchema<Row> serializationSchema, 
KafkaPartitioner<Row> partitioner) {
                return new FlinkKafkaProducer09<>(topic, serializationSchema, 
properties, partitioner);
        }
 
        @Override
+       protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, 
Properties properties, SerializationSchema<Row> serializationSchema, 
FlinkKafkaPartitioner<Row> partitioner) {
+               return new FlinkKafkaProducer09<>(topic, serializationSchema, 
properties, partitioner);
+       }
+
+       @Override
        protected Kafka09JsonTableSink createCopy() {
                return new Kafka09JsonTableSink(topic, properties, partitioner);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index df84a0f..ad8f623 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
@@ -26,6 +27,7 @@ import java.util.Properties;
 
 public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
 
+       @Deprecated
        @Override
        protected KafkaTableSink createTableSink(String topic, Properties 
properties, KafkaPartitioner<Row> partitioner,
                        final FlinkKafkaProducerBase<Row> kafkaProducer) {
@@ -40,6 +42,19 @@ public class Kafka09JsonTableSinkTest extends 
KafkaTableSinkTestBase {
        }
 
        @Override
+       protected KafkaTableSink createTableSinkWithFlinkPartitioner(String 
topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner,
+                       final FlinkKafkaProducerBase<Row> kafkaProducer) {
+
+               return new Kafka09JsonTableSink(topic, properties, partitioner) 
{
+                       @Override
+                       protected FlinkKafkaProducerBase<Row> 
createKafkaProducer(String topic, Properties properties,
+                                       SerializationSchema<Row> 
serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
+                               return kafkaProducer;
+                       }
+               };
+       }
+
+       @Override
        @SuppressWarnings("unchecked")
        protected SerializationSchema<Row> getSerializationSchema() {
                return new JsonRowSerializationSchema(FIELD_NAMES);

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index 18b2aec..e9a4947 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
@@ -83,9 +84,8 @@ public class KafkaProducerTest extends TestLogger {
                        
whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
                        
                        // (1) producer that propagates errors
-
                        FlinkKafkaProducer09<String> producerPropagating = new 
FlinkKafkaProducer09<>(
-                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), null);
+                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner<String>)null);
 
                        OneInputStreamOperatorTestHarness<String, Object> 
testHarness =
                                        new 
OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating));
@@ -106,7 +106,7 @@ public class KafkaProducerTest extends TestLogger {
                        // (2) producer that only logs errors
 
                        FlinkKafkaProducer09<String> producerLogging = new 
FlinkKafkaProducer09<>(
-                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), null);
+                                       "mock_topic", new SimpleStringSchema(), 
FakeStandardProducerConfig.get(), (FlinkKafkaPartitioner<String>)null);
                        producerLogging.setLogFailuresOnly(true);
 
                        testHarness = new 
OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index c9ef6da..84fdbf8 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -19,8 +19,8 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import kafka.admin.AdminUtils;
-import kafka.common.KafkaException;
 import kafka.api.PartitionMetadata;
+import kafka.common.KafkaException;
 import kafka.network.SocketServer;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
@@ -32,8 +32,8 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
@@ -48,9 +48,9 @@ import scala.collection.Seq;
 import java.io.File;
 import java.net.BindException;
 import java.util.ArrayList;
-import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -106,14 +106,14 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                        String topic,
                        KeyedSerializationSchema<T> serSchema,
                        Properties props,
-                       KafkaPartitioner<T> partitioner) {
+                       FlinkKafkaPartitioner<T> partitioner) {
                FlinkKafkaProducer09<T> prod = new 
FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
                prod.setFlushOnCheckpoint(true);
                return new StreamSink<>(prod);
        }
 
        @Override
-       public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic, KeyedSerializationSchema<T> serSchema, Properties props, 
KafkaPartitioner<T> partitioner) {
+       public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, 
String topic, KeyedSerializationSchema<T> serSchema, Properties props, 
FlinkKafkaPartitioner<T> partitioner) {
                FlinkKafkaProducer09<T> prod = new 
FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
                prod.setFlushOnCheckpoint(true);
                return stream.addSink(prod);

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index 6a7b17f..f9a1e41 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -17,6 +17,14 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.OperatorStateStore;
@@ -30,6 +38,8 @@ import 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.util.NetUtils;
@@ -45,13 +55,6 @@ import 
org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Collections;
-import java.util.Comparator;
-
 import static java.util.Objects.requireNonNull;
 
 
@@ -76,12 +79,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
        public static final String KEY_DISABLE_METRICS = 
"flink.disable-metrics";
 
        /**
-        * Array with the partition ids of the given defaultTopicId
-        * The size of this array is the number of partitions
-        */
-       protected int[] partitions;
-
-       /**
         * User defined properties for the Producer
         */
        protected final Properties producerConfig;
@@ -98,9 +95,14 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
        protected final KeyedSerializationSchema<IN> schema;
 
        /**
-        * User-provided partitioner for assigning an object to a Kafka 
partition.
+        * User-provided partitioner for assigning an object to a Kafka 
partition for each topic
+        */
+       protected final FlinkKafkaPartitioner flinkKafkaPartitioner;
+
+       /**
+        * Partitions for each topic
         */
-       protected final KafkaPartitioner<IN> partitioner;
+       protected final Map<String, int[]> topicPartitionsMap;
 
        /**
         * Flag indicating whether to accept failures (and log them), or to 
fail on failures
@@ -111,7 +113,12 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
         * If true, the producer will wait until all outstanding records have 
been send to the broker.
         */
        protected boolean flushOnCheckpoint;
-       
+
+       /**
+        * Retry times of fetching kafka meta
+        */
+       protected long kafkaMetaRetryTimes;
+
        // -------------------------------- Runtime fields 
------------------------------------------
 
        /** KafkaProducer instance */
@@ -133,14 +140,28 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
 
 
        /**
-        * The main constructor for creating a FlinkKafkaProducer.
+        * The main constructor for creating a FlinkKafkaProducer. For 
customPartitioner parameter, use {@link 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner} 
instead
         *
         * @param defaultTopicId The default topic to write data to
         * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
         * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
-        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions. Passing null will use Kafka's partitioner
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions. Passing null will use Kafka's partitioner.
+        * @deprecated Use {@link 
FlinkKafkaProducerBase#FlinkKafkaProducerBase(String, KeyedSerializationSchema, 
Properties, FlinkKafkaPartitioner)} instead
         */
+       @Deprecated
        public FlinkKafkaProducerBase(String defaultTopicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
KafkaPartitioner<IN> customPartitioner) {
+               this(defaultTopicId, serializationSchema, producerConfig, null 
== customPartitioner ? null : new 
FlinkKafkaDelegatePartitioner<>(customPartitioner));
+       }
+
+       /**
+        * The main constructor for creating a FlinkKafkaProducer.
+        *
+        * @param defaultTopicId The default topic to write data to
+        * @param serializationSchema A serializable serialization schema for 
turning user objects into a kafka-consumable byte[] supporting key/value 
messages
+        * @param producerConfig Configuration properties for the 
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+        * @param customPartitioner A serializable partitioner for assigning 
messages to Kafka partitions. Passing null will use Kafka's partitioner.
+        */
+       public FlinkKafkaProducerBase(String defaultTopicId, 
KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, 
FlinkKafkaPartitioner<IN> customPartitioner) {
                requireNonNull(defaultTopicId, "TopicID not set");
                requireNonNull(serializationSchema, "serializationSchema not 
set");
                requireNonNull(producerConfig, "producerConfig not set");
@@ -150,6 +171,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
                this.defaultTopicId = defaultTopicId;
                this.schema = serializationSchema;
                this.producerConfig = producerConfig;
+               this.flinkKafkaPartitioner = customPartitioner;
 
                // set the producer configuration properties for kafka record 
key value serializers.
                if 
(!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
@@ -169,7 +191,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
                        throw new 
IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be 
supplied in the producer config properties.");
                }
 
-               this.partitioner = customPartitioner;
+               this.topicPartitionsMap = new HashMap<>();
        }
 
        // ---------------------------------- Properties 
--------------------------
@@ -177,9 +199,9 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
        /**
         * Defines whether the producer should fail on errors, or only log them.
         * If this is set to true, then exceptions will be only logged, if set 
to false,
-        * exceptions will be eventually thrown and cause the streaming program 
to 
+        * exceptions will be eventually thrown and cause the streaming program 
to
         * fail (and enter recovery).
-        * 
+        *
         * @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
         */
        public void setLogFailuresOnly(boolean logFailuresOnly) {
@@ -205,7 +227,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
        }
 
        // ----------------------------------- Utilities 
--------------------------
-       
+
        /**
         * Initializes the connection to Kafka.
         */
@@ -214,27 +236,14 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
                producer = getKafkaProducer(this.producerConfig);
 
                RuntimeContext ctx = getRuntimeContext();
-               if (partitioner != null) {
-                       // the fetched list is immutable, so we're creating a 
mutable copy in order to sort it
-                       List<PartitionInfo> partitionsList = new 
ArrayList<>(producer.partitionsFor(defaultTopicId));
-
-                       // sort the partitions by partition id to make sure the 
fetched partition list is the same across subtasks
-                       Collections.sort(partitionsList, new 
Comparator<PartitionInfo>() {
-                               @Override
-                               public int compare(PartitionInfo o1, 
PartitionInfo o2) {
-                                       return Integer.compare(o1.partition(), 
o2.partition());
-                               }
-                       });
-
-                       partitions = new int[partitionsList.size()];
-                       for (int i = 0; i < partitions.length; i++) {
-                               partitions[i] = 
partitionsList.get(i).partition();
+               if(null != flinkKafkaPartitioner) {
+                       if(flinkKafkaPartitioner instanceof 
FlinkKafkaDelegatePartitioner) {
+                               
((FlinkKafkaDelegatePartitioner)flinkKafkaPartitioner).setPartitions(getPartitionsByTopic(this.defaultTopicId,
 this.producer));
                        }
-
-                       partitioner.open(ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks(), partitions);
+                       flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks());
                }
 
-               LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into 
topic {}", 
+               LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into 
default topic {}",
                                ctx.getIndexOfThisSubtask() + 1, 
ctx.getNumberOfParallelSubtasks(), defaultTopicId);
 
                // register Kafka metrics to Flink accumulators
@@ -281,6 +290,26 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
                }
        }
 
+       protected int[] getPartitionsByTopic(String topic, 
KafkaProducer<byte[], byte[]> producer) {
+               // the fetched list is immutable, so we're creating a mutable 
copy in order to sort it
+               List<PartitionInfo> partitionsList = new 
ArrayList<>(producer.partitionsFor(topic));
+
+               // sort the partitions by partition id to make sure the fetched 
partition list is the same across subtasks
+               Collections.sort(partitionsList, new 
Comparator<PartitionInfo>() {
+                               @Override
+                               public int compare(PartitionInfo o1, 
PartitionInfo o2) {
+                               return Integer.compare(o1.partition(), 
o2.partition());
+                       }
+                       });
+
+               int[] partitions = new int[partitionsList.size()];
+               for (int i = 0; i < partitions.length; i++) {
+                       partitions[i] = partitionsList.get(i).partition();
+               }
+
+               return partitions;
+       }
+
        /**
         * Called when new data arrives to the sink, and forwards it to Kafka.
         *
@@ -299,11 +328,17 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
                        targetTopic = defaultTopicId;
                }
 
+               int[] partitions = this.topicPartitionsMap.get(targetTopic);
+               if(null == partitions) {
+                       partitions = this.getPartitionsByTopic(targetTopic, 
producer);
+                       this.topicPartitionsMap.put(targetTopic, partitions);
+               }
+
                ProducerRecord<byte[], byte[]> record;
-               if (partitioner == null) {
+               if (flinkKafkaPartitioner == null) {
                        record = new ProducerRecord<>(targetTopic, 
serializedKey, serializedValue);
                } else {
-                       record = new ProducerRecord<>(targetTopic, 
partitioner.partition(next, serializedKey, serializedValue, partitions.length), 
serializedKey, serializedValue);
+                       record = new ProducerRecord<>(targetTopic, 
flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, 
targetTopic, partitions), serializedKey, serializedValue);
                }
                if (flushOnCheckpoint) {
                        synchronized (pendingRecordsLock) {
@@ -319,7 +354,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
                if (producer != null) {
                        producer.close();
                }
-               
+
                // make sure we propagate pending errors
                checkErroneous();
        }
@@ -376,15 +411,15 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
                        throw new Exception("Failed to send data to Kafka: " + 
e.getMessage(), e);
                }
        }
-       
+
        public static Properties getPropertiesFromBrokerList(String brokerList) 
{
                String[] elements = brokerList.split(",");
-               
+
                // validate the broker addresses
                for (String broker: elements) {
                        NetUtils.getCorrectHostnamePort(broker);
                }
-               
+
                Properties props = new Properties();
                props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerList);
                return props;

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index 27c4de7..a0b5033 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -17,10 +17,11 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.types.Row;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.types.Row;
 
 import java.util.Properties;
 
@@ -35,10 +36,23 @@ public abstract class KafkaJsonTableSink extends 
KafkaTableSink {
         * @param topic topic in Kafka to which table is written
         * @param properties properties to connect to Kafka
         * @param partitioner Kafka partitioner
+        * @deprecated Use {@link KafkaJsonTableSink#KafkaJsonTableSink(String, 
Properties, FlinkKafkaPartitioner)} instead
         */
+       @Deprecated
        public KafkaJsonTableSink(String topic, Properties properties, 
KafkaPartitioner<Row> partitioner) {
                super(topic, properties, partitioner);
        }
+       
+       /**
+        * Creates KafkaJsonTableSink
+        *
+        * @param topic topic in Kafka to which table is written
+        * @param properties properties to connect to Kafka
+        * @param partitioner Kafka partitioner
+        */
+       public KafkaJsonTableSink(String topic, Properties properties, 
FlinkKafkaPartitioner<Row> partitioner) {
+               super(topic, properties, partitioner);
+       }
 
        @Override
        protected SerializationSchema<Row> createSerializationSchema(String[] 
fieldNames) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index a8a2fd0..0a937d6 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.types.Row;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
@@ -39,7 +41,7 @@ public abstract class KafkaTableSink implements 
AppendStreamTableSink<Row> {
        protected final String topic;
        protected final Properties properties;
        protected SerializationSchema<Row> serializationSchema;
-       protected final KafkaPartitioner<Row> partitioner;
+       protected final FlinkKafkaPartitioner<Row> partitioner;
        protected String[] fieldNames;
        protected TypeInformation[] fieldTypes;
 
@@ -49,12 +51,27 @@ public abstract class KafkaTableSink implements 
AppendStreamTableSink<Row> {
         * @param topic                 Kafka topic to write to.
         * @param properties            Properties for the Kafka consumer.
         * @param partitioner           Partitioner to select Kafka partition 
for each item
+        * @deprecated Use {@link KafkaTableSink#KafkaTableSink(String, 
Properties, FlinkKafkaPartitioner)} instead
         */
+       @Deprecated
        public KafkaTableSink(
                        String topic,
                        Properties properties,
                        KafkaPartitioner<Row> partitioner) {
+               this(topic, properties, new 
FlinkKafkaDelegatePartitioner<Row>(partitioner));
+       }
 
+       /**
+        * Creates KafkaTableSink
+        * 
+        * @param topic                 Kafka topic to write to.
+        * @param properties            Properties for the Kafka consumer.
+        * @param partitioner           Partitioner to select Kafka partition 
for each item
+        */
+       public KafkaTableSink(
+                       String topic,
+                       Properties properties,
+                       FlinkKafkaPartitioner<Row> partitioner) {
                this.topic = Preconditions.checkNotNull(topic, "topic");
                this.properties = Preconditions.checkNotNull(properties, 
"properties");
                this.partitioner = Preconditions.checkNotNull(partitioner, 
"partitioner");
@@ -68,13 +85,29 @@ public abstract class KafkaTableSink implements 
AppendStreamTableSink<Row> {
         * @param serializationSchema Serialization schema to use to create 
Kafka records.
         * @param partitioner         Partitioner to select Kafka partition.
         * @return The version-specific Kafka producer
+        * @deprecated Use {@link KafkaTableSink#createKafkaProducer(String, 
Properties, SerializationSchema, FlinkKafkaPartitioner)} instead
         */
+       @Deprecated
        protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
                String topic, Properties properties,
                SerializationSchema<Row> serializationSchema,
                KafkaPartitioner<Row> partitioner);
 
        /**
+        * Returns the version-specifid Kafka producer.
+        *
+        * @param topic               Kafka topic to produce to.
+        * @param properties          Properties for the Kafka producer.
+        * @param serializationSchema Serialization schema to use to create 
Kafka records.
+        * @param partitioner         Partitioner to select Kafka partition.
+        * @return The version-specific Kafka producer
+        */
+       protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer(
+               String topic, Properties properties,
+               SerializationSchema<Row> serializationSchema,
+               FlinkKafkaPartitioner<Row> partitioner);
+
+       /**
         * Create serialization schema for converting table rows into bytes.
         *
         * @param fieldNames Field names in table rows.

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
index 9b848e0..edabfe0 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
@@ -48,9 +48,10 @@ import java.io.Serializable;
  *  Not all Kafka partitions contain data
  *  To avoid such an unbalanced partitioning, use a round-robin kafka 
partitioner. (note that this will
  *  cause a lot of network connections between all the Flink instances and all 
the Kafka brokers
- *
+ *  @deprecated Use {@link FlinkFixedPartitioner} instead.
  *
  */
+@Deprecated
 public class FixedPartitioner<T> extends KafkaPartitioner<T> implements 
Serializable {
        private static final long serialVersionUID = 1627268846962918126L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
new file mode 100644
index 0000000..d2eb7af
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+/**
+ * A partitioner ensuring that each internal Flink partition ends up in one 
Kafka partition.
+ *
+ * Note, one Kafka partition can contain multiple Flink partitions.
+ *
+ * Cases:
+ *     # More Flink partitions than kafka partitions
+ * <pre>
+ *             Flink Sinks:            Kafka Partitions
+ *                     1       ----------------&gt;    1
+ *                     2   --------------/
+ *                     3   -------------/
+ *                     4       ------------/
+ * </pre>
+ * Some (or all) kafka partitions contain the output of more than one flink 
partition
+ *
+ *# Fewer Flink partitions than Kafka
+ * <pre>
+ *             Flink Sinks:            Kafka Partitions
+ *                     1       ----------------&gt;    1
+ *                     2       ----------------&gt;    2
+ *                                                                             
3
+ *                                                                             
4
+ *                                                                             
5
+ * </pre>
+ *
+ *  Not all Kafka partitions contain data
+ *  To avoid such an unbalanced partitioning, use a round-robin kafka 
partitioner. (note that this will
+ *  cause a lot of network connections between all the Flink instances and all 
the Kafka brokers
+ *
+ */
+public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
+
+       private int parallelInstanceId;
+
+       @Override
+       public void open(int parallelInstanceId, int parallelInstances) {
+               if (parallelInstanceId < 0 || parallelInstances <= 0) {
+                       throw new IllegalArgumentException();
+               }
+               this.parallelInstanceId = parallelInstanceId;
+       }
+       
+       @Override
+       public int partition(T record, byte[] key, byte[] value, String 
targetTopic, int[] partitions) {
+               if(null == partitions || partitions.length == 0) {
+                       throw new IllegalArgumentException();
+               }
+               
+               return partitions[parallelInstanceId % partitions.length];
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
new file mode 100644
index 0000000..469fd1b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+/**
+ * Delegate for KafkaPartitioner
+ * @param <T>
+ * @deprecated Delegate for {@link KafkaPartitioner}, use {@link 
FlinkKafkaPartitioner} instead
+ */
+@Deprecated
+public class FlinkKafkaDelegatePartitioner<T> extends FlinkKafkaPartitioner<T> 
{
+       private final KafkaPartitioner<T> kafkaPartitioner;
+       private int[] partitions;
+
+       public FlinkKafkaDelegatePartitioner(KafkaPartitioner<T> 
kafkaPartitioner) {
+               this.kafkaPartitioner = kafkaPartitioner;
+       }
+
+       public void setPartitions(int[] partitions) {
+               this.partitions = partitions;
+       }
+
+       @Override
+       public void open(int parallelInstanceId, int parallelInstances) {
+               this.kafkaPartitioner.open(parallelInstanceId, 
parallelInstances, partitions);
+       }
+
+       @Override
+       public int partition(T next, byte[] serializedKey, byte[] 
serializedValue, String targetTopic, int[] partitions) {
+               return this.kafkaPartitioner.partition(next, serializedKey, 
serializedValue, this.partitions.length);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
new file mode 100644
index 0000000..e074b9b
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import java.io.Serializable;
+
+/**
+ * It contains a open() method which is called on each parallel instance.
+ * Partitioners must be serializable!
+ */
+public abstract class FlinkKafkaPartitioner<T> implements Serializable {
+       private static final long serialVersionUID = -9086719227828020494L;
+
+       /**
+        * Initializer for the Partitioner.
+        * @param parallelInstanceId 0-indexed id of the parallel instance in 
Flink
+        * @param parallelInstances the total number of parallel instances
+        */
+       public void open(int parallelInstanceId, int parallelInstances) {
+               // overwrite this method if needed.
+       }
+       
+       public abstract int partition(T record, byte[] key, byte[] value, 
String targetTopic, int[] partitions);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
index 37e2ef6..7c82bd1 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -22,7 +22,9 @@ import java.io.Serializable;
 /**
  * It contains a open() method which is called on each parallel instance.
  * Partitioners must be serializable!
+ * @deprecated Use {@link FlinkKafkaPartitioner} instead.
  */
+@Deprecated
 public abstract class KafkaPartitioner<T> implements Serializable {
 
        private static final long serialVersionUID = -1974260817778593473L;
@@ -32,10 +34,22 @@ public abstract class KafkaPartitioner<T> implements 
Serializable {
         * @param parallelInstanceId 0-indexed id of the parallel instance in 
Flink
         * @param parallelInstances the total number of parallel instances
         * @param partitions an array describing the partition IDs of the 
available Kafka partitions.
+        * @deprecated Use {@link FlinkKafkaPartitioner#open(int, int)} instead.
         */
+       @Deprecated
        public void open(int parallelInstanceId, int parallelInstances, int[] 
partitions) {
                // overwrite this method if needed.
        }
 
+       /**
+        *
+        * @param next
+        * @param serializedKey
+        * @param serializedValue
+        * @param numPartitions
+        * @return
+        * @deprecated Use {@link FlinkKafkaPartitioner#partition(T, byte[], 
byte[], String, int[])} instead.
+        */
+       @Deprecated
        public abstract int partition(T next, byte[] serializedKey, byte[] 
serializedValue, int numPartitions);
 }

Reply via email to