http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 2adb5ec..203d814 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -68,7 +68,7 @@ import 
org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapp
 import 
org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper;
 import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
-import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
+import 
org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -1199,9 +1199,9 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
 
        /**
         * Test producing and consuming into multiple topics
-        * @throws java.lang.Exception
+        * @throws Exception
         */
-       public void runProduceConsumeMultipleTopics() throws 
java.lang.Exception {
+       public void runProduceConsumeMultipleTopics() throws Exception {
                final int NUM_TOPICS = 5;
                final int NUM_ELEMENTS = 20;
 
@@ -1291,6 +1291,55 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                }
        }
 
+       private static class Tuple2WithTopicSchema implements 
KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
+                       KeyedSerializationSchema<Tuple3<Integer, Integer, 
String>> {
+
+               private final TypeSerializer<Tuple2<Integer, Integer>> ts;
+               
+               public Tuple2WithTopicSchema(ExecutionConfig ec) {
+                       ts = TypeInfoParser.<Tuple2<Integer, 
Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
+               }
+
+               @Override
+               public Tuple3<Integer, Integer, String> deserialize(byte[] 
messageKey, byte[] message, String topic, int partition, long offset) throws 
IOException {
+                       DataInputView in = new DataInputViewStreamWrapper(new 
ByteArrayInputStream(message));
+                       Tuple2<Integer, Integer> t2 = ts.deserialize(in);
+                       return new Tuple3<>(t2.f0, t2.f1, topic);
+               }
+
+               @Override
+               public boolean isEndOfStream(Tuple3<Integer, Integer, String> 
nextElement) {
+                       return false;
+               }
+
+               @Override
+               public TypeInformation<Tuple3<Integer, Integer, String>> 
getProducedType() {
+                       return TypeInfoParser.parse("Tuple3<Integer, Integer, 
String>");
+               }
+
+               @Override
+               public byte[] serializeKey(Tuple3<Integer, Integer, String> 
element) {
+                       return null;
+               }
+
+               @Override
+               public byte[] serializeValue(Tuple3<Integer, Integer, String> 
element) {
+                       ByteArrayOutputStream by = new ByteArrayOutputStream();
+                       DataOutputView out = new 
DataOutputViewStreamWrapper(by);
+                       try {
+                               ts.serialize(new Tuple2<>(element.f0, 
element.f1), out);
+                       } catch (IOException e) {
+                               throw new RuntimeException("Error" ,e);
+                       }
+                       return by.toByteArray();
+               }
+
+               @Override
+               public String getTargetTopic(Tuple3<Integer, Integer, String> 
element) {
+                       return element.f2;
+               }
+       }
+
        /**
         * Test Flink's Kafka integration also with very big records (30MB)
         * see 
http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
@@ -1975,7 +2024,7 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        producerProperties.setProperty("retries", "0");
                        producerProperties.putAll(secureProps);
                        
-                       kafkaServer.produceIntoKafka(stream, topicName, 
serSchema, producerProperties, new Tuple2Partitioner(parallelism))
+                       kafkaServer.produceIntoKafka(stream, topicName, 
serSchema, producerProperties, new Tuple2FlinkPartitioner(parallelism))
                                        .setParallelism(parallelism);
 
                        try {
@@ -2227,53 +2276,4 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        this.numElementsTotal = state.get(0);
                }
        }
-
-       private static class Tuple2WithTopicSchema implements 
KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
-               KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
-
-               private final TypeSerializer<Tuple2<Integer, Integer>> ts;
-
-               public Tuple2WithTopicSchema(ExecutionConfig ec) {
-                       ts = TypeInfoParser.<Tuple2<Integer, 
Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec);
-               }
-
-               @Override
-               public Tuple3<Integer, Integer, String> deserialize(byte[] 
messageKey, byte[] message, String topic, int partition, long offset) throws 
IOException {
-                       DataInputView in = new DataInputViewStreamWrapper(new 
ByteArrayInputStream(message));
-                       Tuple2<Integer, Integer> t2 = ts.deserialize(in);
-                       return new Tuple3<>(t2.f0, t2.f1, topic);
-               }
-
-               @Override
-               public boolean isEndOfStream(Tuple3<Integer, Integer, String> 
nextElement) {
-                       return false;
-               }
-
-               @Override
-               public TypeInformation<Tuple3<Integer, Integer, String>> 
getProducedType() {
-                       return TypeInfoParser.parse("Tuple3<Integer, Integer, 
String>");
-               }
-
-               @Override
-               public byte[] serializeKey(Tuple3<Integer, Integer, String> 
element) {
-                       return null;
-               }
-
-               @Override
-               public byte[] serializeValue(Tuple3<Integer, Integer, String> 
element) {
-                       ByteArrayOutputStream by = new ByteArrayOutputStream();
-                       DataOutputView out = new 
DataOutputViewStreamWrapper(by);
-                       try {
-                               ts.serialize(new Tuple2<>(element.f0, 
element.f1), out);
-                       } catch (IOException e) {
-                               throw new RuntimeException("Error" ,e);
-                       }
-                       return by.toByteArray();
-               }
-
-               @Override
-               public String getTargetTopic(Tuple3<Integer, Integer, String> 
element) {
-                       return element.f2;
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 6f61392..8285048 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -27,12 +27,11 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import 
org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.test.util.SuccessException;
 
-
 import java.io.Serializable;
 import java.util.Properties;
 
@@ -174,7 +173,7 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
 
        // 
------------------------------------------------------------------------
 
-       public static class CustomPartitioner extends 
KafkaPartitioner<Tuple2<Long, String>> implements Serializable {
+       public static class CustomPartitioner extends 
FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
 
                private final int expectedPartitions;
 
@@ -184,10 +183,10 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
 
 
                @Override
-               public int partition(Tuple2<Long, String> next, byte[] 
serializedKey, byte[] serializedValue, int numPartitions) {
-                       assertEquals(expectedPartitions, numPartitions);
+               public int partition(Tuple2<Long, String> next, byte[] 
serializedKey, byte[] serializedValue, String topic, int[] partitions) {
+                       assertEquals(expectedPartitions, partitions.length);
 
-                       return (int) (next.f0 % numPartitions);
+                       return (int) (next.f0 % expectedPartitions);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index e0e8f84..0fdc82e 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -17,25 +17,26 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
+import java.io.Serializable;
+import java.util.Properties;
+
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.table.api.Types;
-import org.apache.flink.types.Row;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.types.Row;
 import org.junit.Test;
 
-import java.io.Serializable;
-import java.util.Properties;
-
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
@@ -45,6 +46,7 @@ public abstract class KafkaTableSinkTestBase {
        protected static final String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
        private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[] { Types.INT(), Types.STRING() };
        private static final KafkaPartitioner<Row> PARTITIONER = new 
CustomPartitioner();
+       private static final FlinkKafkaPartitioner<Row> FLINK_PARTITIONER = new 
FlinkCustomPartitioner();
        private static final Properties PROPERTIES = createSinkProperties();
        @SuppressWarnings("unchecked")
        private final FlinkKafkaProducerBase<Row> PRODUCER = new 
FlinkKafkaProducerBase<Row>(
@@ -72,6 +74,23 @@ public abstract class KafkaTableSinkTestBase {
        }
 
        @Test
+       @SuppressWarnings("unchecked")
+       public void testKafkaTableSinkWithFlinkPartitioner() throws Exception {
+               DataStream dataStream = mock(DataStream.class);
+
+               KafkaTableSink kafkaTableSink = 
spy(createTableSinkWithFlinkPartitioner());
+               kafkaTableSink.emitDataStream(dataStream);
+
+               verify(dataStream).addSink(eq(PRODUCER));
+
+               verify(kafkaTableSink).createKafkaProducer(
+                       eq(TOPIC),
+                       eq(PROPERTIES),
+                       any(getSerializationSchema().getClass()),
+                       eq(FLINK_PARTITIONER));
+       }
+
+       @Test
        public void testConfiguration() {
                KafkaTableSink kafkaTableSink = createTableSink();
                KafkaTableSink newKafkaTableSink = 
kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
@@ -82,15 +101,33 @@ public abstract class KafkaTableSinkTestBase {
                assertEquals(new RowTypeInfo(FIELD_TYPES), 
newKafkaTableSink.getOutputType());
        }
 
+       @Test
+       public void testConfigurationWithFlinkPartitioner() {
+               KafkaTableSink kafkaTableSink = 
createTableSinkWithFlinkPartitioner();
+               KafkaTableSink newKafkaTableSink = 
kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
+               assertNotSame(kafkaTableSink, newKafkaTableSink);
+
+               assertArrayEquals(FIELD_NAMES, 
newKafkaTableSink.getFieldNames());
+               assertArrayEquals(FIELD_TYPES, 
newKafkaTableSink.getFieldTypes());
+               assertEquals(new RowTypeInfo(FIELD_TYPES), 
newKafkaTableSink.getOutputType());
+       }
+
        protected abstract KafkaTableSink createTableSink(String topic, 
Properties properties,
                        KafkaPartitioner<Row> partitioner, 
FlinkKafkaProducerBase<Row> kafkaProducer);
 
+       protected abstract KafkaTableSink 
createTableSinkWithFlinkPartitioner(String topic,
+                       Properties properties, FlinkKafkaPartitioner<Row> 
partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
+
        protected abstract SerializationSchema<Row> getSerializationSchema();
 
        private KafkaTableSink createTableSink() {
                return createTableSink(TOPIC, PROPERTIES, PARTITIONER, 
PRODUCER);
        }
 
+       private KafkaTableSink createTableSinkWithFlinkPartitioner() {
+               return createTableSinkWithFlinkPartitioner(TOPIC, PROPERTIES, 
FLINK_PARTITIONER, PRODUCER);
+       }
+
        private static Properties createSinkProperties() {
                Properties properties = new Properties();
                properties.setProperty("bootstrap.servers", "localhost:12345");
@@ -103,4 +140,11 @@ public abstract class KafkaTableSinkTestBase {
                        return 0;
                }
        }
+
+       private static class FlinkCustomPartitioner extends 
FlinkKafkaPartitioner<Row> {
+               @Override
+               public int partition(Row record, byte[] key, byte[] value, 
String targetTopic, int[] partitions) {
+                       return 0;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 9a7c96a..311a1a4 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -17,20 +17,20 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
 import kafka.server.KafkaServer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.operators.StreamSink;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
 /**
  * Abstract class providing a Kafka test environment
  */
@@ -81,11 +81,11 @@ public abstract class KafkaTestEnvironment {
 
        public abstract <T> StreamSink<T> getProducerSink(String topic,
                        KeyedSerializationSchema<T> serSchema, Properties props,
-                       KafkaPartitioner<T> partitioner);
+                       FlinkKafkaPartitioner<T> partitioner);
 
        public abstract <T> DataStreamSink<T> produceIntoKafka(DataStream<T> 
stream, String topic,
                                                                                
                                KeyedSerializationSchema<T> serSchema, 
Properties props,
-                                                                               
                                KafkaPartitioner<T> partitioner);
+                                                                               
                                FlinkKafkaPartitioner<T> partitioner);
 
        // -- offset handlers
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
new file mode 100644
index 0000000..fa84199
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestFlinkFixedPartitioner {
+
+
+       /**
+        * <pre>
+        *              Flink Sinks:            Kafka Partitions
+        *                      1       ---------------->       1
+        *                      2   --------------/
+        *                      3   -------------/
+        *                      4       ------------/
+        * </pre>
+        */
+       @Test
+       public void testMoreFlinkThanBrokers() {
+               FlinkFixedPartitioner<String> part = new 
FlinkFixedPartitioner<>();
+
+               int[] partitions = new int[]{0};
+
+               part.open(0, 4);
+               Assert.assertEquals(0, part.partition("abc1", null, null, null, 
partitions));
+
+               part.open(1, 4);
+               Assert.assertEquals(0, part.partition("abc2", null, null, null, 
partitions));
+
+               part.open(2, 4);
+               Assert.assertEquals(0, part.partition("abc3", null, null, null, 
partitions));
+               Assert.assertEquals(0, part.partition("abc3", null, null, null, 
partitions)); // check if it is changing ;)
+
+               part.open(3, 4);
+               Assert.assertEquals(0, part.partition("abc4", null, null, null, 
partitions));
+       }
+
+       /**
+        *
+        * <pre>
+        *              Flink Sinks:            Kafka Partitions
+        *                      1       ---------------->       1
+        *                      2       ---------------->       2
+        *                                                                      
3
+        *                                                                      
4
+        *                                                                      
5
+        *
+        * </pre>
+        */
+       @Test
+       public void testFewerPartitions() {
+               FlinkFixedPartitioner<String> part = new 
FlinkFixedPartitioner<>();
+
+               int[] partitions = new int[]{0, 1, 2, 3, 4};
+               part.open(0, 2);
+               Assert.assertEquals(0, part.partition("abc1", null, null, null, 
partitions));
+               Assert.assertEquals(0, part.partition("abc1", null, null, null, 
partitions));
+
+               part.open(1, 2);
+               Assert.assertEquals(1, part.partition("abc1", null, null, null, 
partitions));
+               Assert.assertEquals(1, part.partition("abc1", null, null, null, 
partitions));
+       }
+
+       /*
+        *              Flink Sinks:            Kafka Partitions
+        *                      1       ------------>--->       1
+        *                      2       -----------/---->       2
+        *                      3       ----------/
+        */
+       @Test
+       public void testMixedCase() {
+               FlinkFixedPartitioner<String> part = new 
FlinkFixedPartitioner<>();
+               int[] partitions = new int[]{0,1};
+
+               part.open(0, 3);
+               Assert.assertEquals(0, part.partition("abc1", null, null, null, 
partitions));
+
+               part.open(1, 3);
+               Assert.assertEquals(1, part.partition("abc1", null, null, null, 
partitions));
+
+               part.open(2, 3);
+               Assert.assertEquals(0, part.partition("abc1", null, null, null, 
partitions));
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
new file mode 100644
index 0000000..c6be71c
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test FlinkKafkaDelegatePartitioner using FixedPartitioner
+ */
+public class TestFlinkKafkaDelegatePartitioner {
+
+       /**
+        * <pre>
+        *              Flink Sinks:            Kafka Partitions
+        *                      1       ---------------->       1
+        *                      2   --------------/
+        *                      3   -------------/
+        *                      4       ------------/
+        * </pre>
+        */
+       @Test
+       public void testMoreFlinkThanBrokers() {
+               FlinkFixedPartitioner<String> part = new 
FlinkFixedPartitioner<>();
+
+               int[] partitions = new int[]{0};
+
+               part.open(0, 4);
+               Assert.assertEquals(0, part.partition("abc1", null, null, null, 
partitions));
+
+               part.open(1, 4);
+               Assert.assertEquals(0, part.partition("abc2", null, null, null, 
partitions));
+
+               part.open(2, 4);
+               Assert.assertEquals(0, part.partition("abc3", null, null, null, 
partitions));
+               Assert.assertEquals(0, part.partition("abc3", null, null, null, 
partitions)); // check if it is changing ;)
+
+               part.open(3, 4);
+               Assert.assertEquals(0, part.partition("abc4", null, null, null, 
partitions));
+       }
+
+       /**
+        *
+        * <pre>
+        *              Flink Sinks:            Kafka Partitions
+        *                      1       ---------------->       1
+        *                      2       ---------------->       2
+        *                                                                      
3
+        *                                                                      
4
+        *                                                                      
5
+        *
+        * </pre>
+        */
+       @Test
+       public void testFewerPartitions() {
+               FlinkKafkaDelegatePartitioner<String> part = new 
FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
+
+               int[] partitions = new int[]{0, 1, 2, 3, 4};
+               part.setPartitions(partitions);
+               
+               part.open(0, 2);
+               Assert.assertEquals(0, part.partition("abc1", null, null, null, 
partitions));
+               Assert.assertEquals(0, part.partition("abc1", null, null, null, 
partitions));
+
+               part.open(1, 2);
+               Assert.assertEquals(1, part.partition("abc1", null, null, null, 
partitions));
+               Assert.assertEquals(1, part.partition("abc1", null, null, null, 
partitions));
+       }
+
+       /*
+        *              Flink Sinks:            Kafka Partitions
+        *                      1       ------------>--->       1
+        *                      2       -----------/---->       2
+        *                      3       ----------/
+        */
+       @Test
+       public void testMixedCase() {
+               FlinkKafkaDelegatePartitioner<String> part = new 
FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
+               int[] partitions = new int[]{0,1};
+               part.setPartitions(partitions);
+
+               part.open(0, 3);
+               Assert.assertEquals(0, part.partition("abc1", null, null, null, 
partitions));
+
+               part.open(1, 3);
+               Assert.assertEquals(1, part.partition("abc1", null, null, null, 
partitions));
+
+               part.open(2, 3);
+               Assert.assertEquals(0, part.partition("abc1", null, null, null, 
partitions));
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index c383eb5..c0fb836 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
+import java.util.Collection;
+import java.util.Properties;
+import java.util.Random;
+
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -31,18 +35,14 @@ import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
 import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import 
org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 
-import java.util.Collection;
-import java.util.Properties;
-import java.util.Random;
-
 @SuppressWarnings("serial")
 public class DataGenerators {
 
@@ -107,10 +107,10 @@ public class DataGenerators {
                testServer.produceIntoKafka(stream, topic,
                                new KeyedSerializationSchemaWrapper<>(new 
TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, 
env.getConfig())),
                                props,
-                               new KafkaPartitioner<Integer>() {
+                               new FlinkKafkaPartitioner<Integer>() {
                                        @Override
-                                       public int partition(Integer next, 
byte[] serializedKey, byte[] serializedValue, int numPartitions) {
-                                               return next % numPartitions;
+                                       public int partition(Integer next, 
byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) {
+                                               return next % partitions.length;
                                        }
                                });
 
@@ -149,7 +149,7 @@ public class DataGenerators {
                                                topic,
                                                new 
KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
                                                producerProperties,
-                                               new FixedPartitioner<String>());
+                                               new 
FlinkFixedPartitioner<String>());
 
                                OneInputStreamOperatorTestHarness<String, 
Object> testHarness =
                                                new 
OneInputStreamOperatorTestHarness<>(sink);

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
new file mode 100644
index 0000000..e7fff52
--- /dev/null
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+
+/**
+ * Special partitioner that uses the first field of a 2-tuple as the partition,
+ * and that expects a specific number of partitions.
+ */
+public class Tuple2FlinkPartitioner extends 
FlinkKafkaPartitioner<Tuple2<Integer, Integer>> {
+       private static final long serialVersionUID = -3589898230375281549L;
+
+       private final int expectedPartitions;
+
+       public Tuple2FlinkPartitioner(int expectedPartitions) {
+               this.expectedPartitions = expectedPartitions;
+       }
+       
+       @Override
+       public int partition(Tuple2<Integer, Integer> next, byte[] key, byte[] 
value, String targetTopic, int[] partitions) {
+               if (partitions.length != expectedPartitions) {
+                       throw new IllegalArgumentException("Expected " + 
expectedPartitions + " partitions");
+               }
+               
+               return next.f0;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
index c9e9ac1..43e1aa7 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
@@ -18,15 +18,16 @@
 
 package org.apache.flink.streaming.connectors.kafka.testutils;
 
+import java.io.Serializable;
+
 import org.apache.flink.api.java.tuple.Tuple2;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 
-import java.io.Serializable;
-
 /**
  * Special partitioner that uses the first field of a 2-tuple as the partition,
- * and that expects a specific number of partitions.
+ * and that expects a specific number of partitions. Use 
Tuple2FlinkPartitioner instead
  */
+@Deprecated
 public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, 
Integer>> implements Serializable {
 
        private static final long serialVersionUID = 1L;
@@ -45,4 +46,4 @@ public class Tuple2Partitioner extends 
KafkaPartitioner<Tuple2<Integer, Integer>
 
                return next.f0;
        }
-}
\ No newline at end of file
+}

Reply via email to