http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
index 8285048..bcc8328 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java
@@ -29,10 +29,14 @@ import 
org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import 
org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
 import org.apache.flink.test.util.SuccessException;
+import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.apache.flink.test.util.TestUtils.tryExecute;
@@ -42,31 +46,50 @@ import static org.junit.Assert.fail;
 @SuppressWarnings("serial")
 public abstract class KafkaProducerTestBase extends KafkaTestBase {
 
-
        /**
-        * 
+        * This tests verifies that custom partitioning works correctly, with a 
default topic
+        * and dynamic topic. The number of partitions for each topic is 
deliberately different.
+        *
+        * Test topology:
+        *
         * <pre>
-        *             +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) 
--+
-        *            /                  |                                      
 \
-        *           /                   |                                      
  \
-        * (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) 
-----+-> (sink)
-        *           \                   |                                      
  /
-        *            \                  |                                      
 /
-        *             +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) 
--+
+        *             +------> (sink) --+--> [DEFAULT_TOPIC-1] --> (source) -> 
(map) -----+
+        *            /                  |                             |        
  |        |
+        *           |                   |                             |        
  |  ------+--> (sink)
+        *             +------> (sink) --+--> [DEFAULT_TOPIC-2] --> (source) -> 
(map) -----+
+        *            /                  |
+        *           |                   |
+        * (source) ----------> (sink) --+--> [DYNAMIC_TOPIC-1] --> (source) -> 
(map) -----+
+        *           |                   |                             |        
  |        |
+        *            \                  |                             |        
  |        |
+        *             +------> (sink) --+--> [DYNAMIC_TOPIC-2] --> (source) -> 
(map) -----+--> (sink)
+        *           |                   |                             |        
  |        |
+        *            \                  |                             |        
  |        |
+        *             +------> (sink) --+--> [DYNAMIC_TOPIC-3] --> (source) -> 
(map) -----+
         * </pre>
         * 
-        * The mapper validates that the values come consistently from the 
correct Kafka partition.
+        * Each topic has an independent mapper that validates the values come 
consistently from
+        * the correct Kafka partition of the topic is is responsible of.
         * 
-        * The final sink validates that there are no duplicates and that all 
partitions are present.
+        * Each topic also has a final sink that validates that there are no 
duplicates and that all
+        * partitions are present.
         */
        public void runCustomPartitioningTest() {
                try {
                        LOG.info("Starting 
KafkaProducerITCase.testCustomPartitioning()");
 
-                       final String topic = "customPartitioningTestTopic";
-                       final int parallelism = 3;
-                       
-                       createTestTopic(topic, parallelism, 1);
+                       final String defaultTopic = "defaultTopic";
+                       final int defaultTopicPartitions = 2;
+
+                       final String dynamicTopic = "dynamicTopic";
+                       final int dynamicTopicPartitions = 3;
+
+                       createTestTopic(defaultTopic, defaultTopicPartitions, 
1);
+                       createTestTopic(dynamicTopic, dynamicTopicPartitions, 
1);
+
+                       Map<String, Integer> expectedTopicsToNumPartitions = 
new HashMap<>(2);
+                       expectedTopicsToNumPartitions.put(defaultTopic, 
defaultTopicPartitions);
+                       expectedTopicsToNumPartitions.put(dynamicTopic, 
dynamicTopicPartitions);
 
                        TypeInformation<Tuple2<Long, String>> longStringInfo = 
TypeInfoParser.parse("Tuple2<Long, String>");
 
@@ -75,13 +98,13 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
                        env.getConfig().disableSysoutLogging();
 
                        TypeInformationSerializationSchema<Tuple2<Long, 
String>> serSchema =
-                                       new 
TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
+                               new 
TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
 
                        TypeInformationSerializationSchema<Tuple2<Long, 
String>> deserSchema =
-                                       new 
TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
+                               new 
TypeInformationSerializationSchema<>(longStringInfo, env.getConfig());
 
                        // ------ producing topology ---------
-                       
+
                        // source has DOP 1 to make sure it generates no 
duplicates
                        DataStream<Tuple2<Long, String>> stream = 
env.addSource(new SourceFunction<Tuple2<Long, String>>() {
 
@@ -100,69 +123,44 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
                                public void cancel() {
                                        running = false;
                                }
-                       })
-                       .setParallelism(1);
+                       }).setParallelism(1);
 
                        Properties props = new Properties();
                        
props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings));
                        props.putAll(secureProps);
-                       
-                       // sink partitions into 
-                       kafkaServer.produceIntoKafka(stream, topic,
-                                       new 
KeyedSerializationSchemaWrapper<>(serSchema),
+
+                       // sink partitions into
+                       kafkaServer.produceIntoKafka(stream, defaultTopic,
+                                       // this serialization schema will route 
between the default topic and dynamic topic
+                                       new 
CustomKeyedSerializationSchemaWrapper(serSchema, defaultTopic, dynamicTopic),
                                        props,
-                                       new 
CustomPartitioner(parallelism)).setParallelism(parallelism);
+                                       new 
CustomPartitioner(expectedTopicsToNumPartitions))
+                               
.setParallelism(Math.max(defaultTopicPartitions, dynamicTopicPartitions));
 
                        // ------ consuming topology ---------
 
                        Properties consumerProps = new Properties();
                        consumerProps.putAll(standardProps);
                        consumerProps.putAll(secureProps);
-                       FlinkKafkaConsumerBase<Tuple2<Long, String>> source = 
kafkaServer.getConsumer(topic, deserSchema, consumerProps);
-                       
-                       env.addSource(source).setParallelism(parallelism)
-
-                                       // mapper that validates partitioning 
and maps to partition
-                                       .map(new RichMapFunction<Tuple2<Long, 
String>, Integer>() {
-                                               
-                                               private int ourPartition = -1;
-                                               @Override
-                                               public Integer map(Tuple2<Long, 
String> value) {
-                                                       int partition = 
value.f0.intValue() % parallelism;
-                                                       if (ourPartition != -1) 
{
-                                                               
assertEquals("inconsistent partitioning", ourPartition, partition);
-                                                       } else {
-                                                               ourPartition = 
partition;
-                                                       }
-                                                       return partition;
-                                               }
-                                       }).setParallelism(parallelism)
-                                       
-                                       .addSink(new SinkFunction<Integer>() {
-                                               
-                                               private int[] 
valuesPerPartition = new int[parallelism];
-                                               
-                                               @Override
-                                               public void invoke(Integer 
value) throws Exception {
-                                                       
valuesPerPartition[value]++;
-                                                       
-                                                       boolean missing = false;
-                                                       for (int i : 
valuesPerPartition) {
-                                                               if (i < 100) {
-                                                                       missing 
= true;
-                                                                       break;
-                                                               }
-                                                       }
-                                                       if (!missing) {
-                                                               throw new 
SuccessException();
-                                                       }
-                                               }
-                                       }).setParallelism(1);
-                       
+
+                       FlinkKafkaConsumerBase<Tuple2<Long, String>> 
defaultTopicSource =
+                                       kafkaServer.getConsumer(defaultTopic, 
deserSchema, consumerProps);
+                       FlinkKafkaConsumerBase<Tuple2<Long, String>> 
dynamicTopicSource =
+                                       kafkaServer.getConsumer(dynamicTopic, 
deserSchema, consumerProps);
+
+                       
env.addSource(defaultTopicSource).setParallelism(defaultTopicPartitions)
+                               .map(new 
PartitionValidatingMapper(defaultTopicPartitions)).setParallelism(defaultTopicPartitions)
+                               .addSink(new 
PartitionValidatingSink(defaultTopicPartitions)).setParallelism(1);
+
+                       
env.addSource(dynamicTopicSource).setParallelism(dynamicTopicPartitions)
+                               .map(new 
PartitionValidatingMapper(dynamicTopicPartitions)).setParallelism(dynamicTopicPartitions)
+                               .addSink(new 
PartitionValidatingSink(dynamicTopicPartitions)).setParallelism(1);
+
                        tryExecute(env, "custom partitioning test");
 
-                       deleteTestTopic(topic);
-                       
+                       deleteTestTopic(defaultTopic);
+                       deleteTestTopic(dynamicTopic);
+
                        LOG.info("Finished 
KafkaProducerITCase.testCustomPartitioning()");
                }
                catch (Exception e) {
@@ -175,18 +173,94 @@ public abstract class KafkaProducerTestBase extends 
KafkaTestBase {
 
        public static class CustomPartitioner extends 
FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
 
-               private final int expectedPartitions;
+               private final Map<String, Integer> 
expectedTopicsToNumPartitions;
 
-               public CustomPartitioner(int expectedPartitions) {
-                       this.expectedPartitions = expectedPartitions;
+               public CustomPartitioner(Map<String, Integer> 
expectedTopicsToNumPartitions) {
+                       this.expectedTopicsToNumPartitions = 
expectedTopicsToNumPartitions;
                }
 
-
                @Override
                public int partition(Tuple2<Long, String> next, byte[] 
serializedKey, byte[] serializedValue, String topic, int[] partitions) {
-                       assertEquals(expectedPartitions, partitions.length);
+                       
assertEquals(expectedTopicsToNumPartitions.get(topic).intValue(), 
partitions.length);
+
+                       return (int) (next.f0 % partitions.length);
+               }
+       }
+
+       /**
+        * A {@link KeyedSerializationSchemaWrapper} that supports routing 
serialized records to different target topics.
+        */
+       public static class CustomKeyedSerializationSchemaWrapper extends 
KeyedSerializationSchemaWrapper<Tuple2<Long, String>> {
+
+               private final String defaultTopic;
+               private final String dynamicTopic;
+
+               public CustomKeyedSerializationSchemaWrapper(
+                               SerializationSchema<Tuple2<Long, String>> 
serializationSchema,
+                               String defaultTopic,
+                               String dynamicTopic) {
+
+                       super(serializationSchema);
+
+                       this.defaultTopic = 
Preconditions.checkNotNull(defaultTopic);
+                       this.dynamicTopic = 
Preconditions.checkNotNull(dynamicTopic);
+               }
 
-                       return (int) (next.f0 % expectedPartitions);
+               @Override
+               public String getTargetTopic(Tuple2<Long, String> element) {
+                       return (element.f0 % 2 == 0) ? defaultTopic : 
dynamicTopic;
+               }
+       }
+
+       /**
+        * Mapper that validates partitioning and maps to partition.
+        */
+       public static class PartitionValidatingMapper extends 
RichMapFunction<Tuple2<Long, String>, Integer> {
+
+               private final int numPartitions;
+
+               private int ourPartition = -1;
+
+               public PartitionValidatingMapper(int numPartitions) {
+                       this.numPartitions = numPartitions;
+               }
+
+               @Override
+               public Integer map(Tuple2<Long, String> value) throws Exception 
{
+                       int partition = value.f0.intValue() % numPartitions;
+                       if (ourPartition != -1) {
+                               assertEquals("inconsistent partitioning", 
ourPartition, partition);
+                       } else {
+                               ourPartition = partition;
+                       }
+                       return partition;
+               }
+       }
+
+       /**
+        * Sink that validates records received from each partition and checks 
that there are no duplicates.
+        */
+       public static class PartitionValidatingSink implements 
SinkFunction<Integer> {
+               private final int[] valuesPerPartition;
+
+               public PartitionValidatingSink(int numPartitions) {
+                       this.valuesPerPartition = new int[numPartitions];
+               }
+
+               @Override
+               public void invoke(Integer value) throws Exception {
+                       valuesPerPartition[value]++;
+
+                       boolean missing = false;
+                       for (int i : valuesPerPartition) {
+                               if (i < 100) {
+                                       missing = true;
+                                       break;
+                               }
+                       }
+                       if (!missing) {
+                               throw new SuccessException();
+                       }
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index 0fdc82e..d4fe9cc 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -17,14 +17,12 @@
  */
 package org.apache.flink.streaming.connectors.kafka;
 
-import java.io.Serializable;
 import java.util.Properties;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.table.api.Types;
@@ -45,8 +43,7 @@ public abstract class KafkaTableSinkTestBase {
        private static final String TOPIC = "testTopic";
        protected static final String[] FIELD_NAMES = new String[] {"field1", 
"field2"};
        private static final TypeInformation[] FIELD_TYPES = new 
TypeInformation[] { Types.INT(), Types.STRING() };
-       private static final KafkaPartitioner<Row> PARTITIONER = new 
CustomPartitioner();
-       private static final FlinkKafkaPartitioner<Row> FLINK_PARTITIONER = new 
FlinkCustomPartitioner();
+       private static final FlinkKafkaPartitioner<Row> PARTITIONER = new 
CustomPartitioner();
        private static final Properties PROPERTIES = createSinkProperties();
        @SuppressWarnings("unchecked")
        private final FlinkKafkaProducerBase<Row> PRODUCER = new 
FlinkKafkaProducerBase<Row>(
@@ -74,23 +71,6 @@ public abstract class KafkaTableSinkTestBase {
        }
 
        @Test
-       @SuppressWarnings("unchecked")
-       public void testKafkaTableSinkWithFlinkPartitioner() throws Exception {
-               DataStream dataStream = mock(DataStream.class);
-
-               KafkaTableSink kafkaTableSink = 
spy(createTableSinkWithFlinkPartitioner());
-               kafkaTableSink.emitDataStream(dataStream);
-
-               verify(dataStream).addSink(eq(PRODUCER));
-
-               verify(kafkaTableSink).createKafkaProducer(
-                       eq(TOPIC),
-                       eq(PROPERTIES),
-                       any(getSerializationSchema().getClass()),
-                       eq(FLINK_PARTITIONER));
-       }
-
-       @Test
        public void testConfiguration() {
                KafkaTableSink kafkaTableSink = createTableSink();
                KafkaTableSink newKafkaTableSink = 
kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
@@ -101,22 +81,8 @@ public abstract class KafkaTableSinkTestBase {
                assertEquals(new RowTypeInfo(FIELD_TYPES), 
newKafkaTableSink.getOutputType());
        }
 
-       @Test
-       public void testConfigurationWithFlinkPartitioner() {
-               KafkaTableSink kafkaTableSink = 
createTableSinkWithFlinkPartitioner();
-               KafkaTableSink newKafkaTableSink = 
kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
-               assertNotSame(kafkaTableSink, newKafkaTableSink);
-
-               assertArrayEquals(FIELD_NAMES, 
newKafkaTableSink.getFieldNames());
-               assertArrayEquals(FIELD_TYPES, 
newKafkaTableSink.getFieldTypes());
-               assertEquals(new RowTypeInfo(FIELD_TYPES), 
newKafkaTableSink.getOutputType());
-       }
-
        protected abstract KafkaTableSink createTableSink(String topic, 
Properties properties,
-                       KafkaPartitioner<Row> partitioner, 
FlinkKafkaProducerBase<Row> kafkaProducer);
-
-       protected abstract KafkaTableSink 
createTableSinkWithFlinkPartitioner(String topic,
-                       Properties properties, FlinkKafkaPartitioner<Row> 
partitioner, FlinkKafkaProducerBase<Row> kafkaProducer);
+                       FlinkKafkaPartitioner<Row> partitioner, 
FlinkKafkaProducerBase<Row> kafkaProducer);
 
        protected abstract SerializationSchema<Row> getSerializationSchema();
 
@@ -124,24 +90,13 @@ public abstract class KafkaTableSinkTestBase {
                return createTableSink(TOPIC, PROPERTIES, PARTITIONER, 
PRODUCER);
        }
 
-       private KafkaTableSink createTableSinkWithFlinkPartitioner() {
-               return createTableSinkWithFlinkPartitioner(TOPIC, PROPERTIES, 
FLINK_PARTITIONER, PRODUCER);
-       }
-
        private static Properties createSinkProperties() {
                Properties properties = new Properties();
                properties.setProperty("bootstrap.servers", "localhost:12345");
                return properties;
        }
 
-       private static class CustomPartitioner extends KafkaPartitioner<Row> 
implements Serializable {
-               @Override
-               public int partition(Row next, byte[] serializedKey, byte[] 
serializedValue, int numPartitions) {
-                       return 0;
-               }
-       }
-
-       private static class FlinkCustomPartitioner extends 
FlinkKafkaPartitioner<Row> {
+       private static class CustomPartitioner extends 
FlinkKafkaPartitioner<Row> {
                @Override
                public int partition(Row record, byte[] key, byte[] value, 
String targetTopic, int[] partitions) {
                        return 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
deleted file mode 100644
index 5dab05a..0000000
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.junit.Assert;
-import org.junit.Test;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-
-public class TestFixedPartitioner {
-
-
-       /**
-        * <pre>
-        *              Flink Sinks:            Kafka Partitions
-        *                      1       ---------------->       1
-        *                      2   --------------/
-        *                      3   -------------/
-        *                      4       ------------/
-        * </pre>
-        */
-       @Test
-       public void testMoreFlinkThanBrokers() {
-               FixedPartitioner<String> part = new FixedPartitioner<>();
-
-               int[] partitions = new int[]{0};
-
-               part.open(0, 4, partitions);
-               Assert.assertEquals(0, part.partition("abc1", null, null, 
partitions.length));
-
-               part.open(1, 4, partitions);
-               Assert.assertEquals(0, part.partition("abc2", null, null, 
partitions.length));
-
-               part.open(2, 4, partitions);
-               Assert.assertEquals(0, part.partition("abc3", null, null, 
partitions.length));
-               Assert.assertEquals(0, part.partition("abc3", null, null, 
partitions.length)); // check if it is changing ;)
-
-               part.open(3, 4, partitions);
-               Assert.assertEquals(0, part.partition("abc4", null, null, 
partitions.length));
-       }
-
-       /**
-        *
-        * <pre>
-        *              Flink Sinks:            Kafka Partitions
-        *                      1       ---------------->       1
-        *                      2       ---------------->       2
-        *                                                                      
3
-        *                                                                      
4
-        *                                                                      
5
-        *
-        * </pre>
-        */
-       @Test
-       public void testFewerPartitions() {
-               FixedPartitioner<String> part = new FixedPartitioner<>();
-
-               int[] partitions = new int[]{0, 1, 2, 3, 4};
-               part.open(0, 2, partitions);
-               Assert.assertEquals(0, part.partition("abc1", null, null, 
partitions.length));
-               Assert.assertEquals(0, part.partition("abc1", null, null, 
partitions.length));
-
-               part.open(1, 2, partitions);
-               Assert.assertEquals(1, part.partition("abc1", null, null, 
partitions.length));
-               Assert.assertEquals(1, part.partition("abc1", null, null, 
partitions.length));
-       }
-
-       /*
-        *              Flink Sinks:            Kafka Partitions
-        *                      1       ------------>--->       1
-        *                      2       -----------/---->       2
-        *                      3       ----------/
-        */
-       @Test
-       public void testMixedCase() {
-               FixedPartitioner<String> part = new FixedPartitioner<>();
-               int[] partitions = new int[]{0,1};
-
-               part.open(0, 3, partitions);
-               Assert.assertEquals(0, part.partition("abc1", null, null, 
partitions.length));
-
-               part.open(1, 3, partitions);
-               Assert.assertEquals(1, part.partition("abc1", null, null, 
partitions.length));
-
-               part.open(2, 3, partitions);
-               Assert.assertEquals(0, part.partition("abc1", null, null, 
partitions.length));
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
deleted file mode 100644
index c6be71c..0000000
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test FlinkKafkaDelegatePartitioner using FixedPartitioner
- */
-public class TestFlinkKafkaDelegatePartitioner {
-
-       /**
-        * <pre>
-        *              Flink Sinks:            Kafka Partitions
-        *                      1       ---------------->       1
-        *                      2   --------------/
-        *                      3   -------------/
-        *                      4       ------------/
-        * </pre>
-        */
-       @Test
-       public void testMoreFlinkThanBrokers() {
-               FlinkFixedPartitioner<String> part = new 
FlinkFixedPartitioner<>();
-
-               int[] partitions = new int[]{0};
-
-               part.open(0, 4);
-               Assert.assertEquals(0, part.partition("abc1", null, null, null, 
partitions));
-
-               part.open(1, 4);
-               Assert.assertEquals(0, part.partition("abc2", null, null, null, 
partitions));
-
-               part.open(2, 4);
-               Assert.assertEquals(0, part.partition("abc3", null, null, null, 
partitions));
-               Assert.assertEquals(0, part.partition("abc3", null, null, null, 
partitions)); // check if it is changing ;)
-
-               part.open(3, 4);
-               Assert.assertEquals(0, part.partition("abc4", null, null, null, 
partitions));
-       }
-
-       /**
-        *
-        * <pre>
-        *              Flink Sinks:            Kafka Partitions
-        *                      1       ---------------->       1
-        *                      2       ---------------->       2
-        *                                                                      
3
-        *                                                                      
4
-        *                                                                      
5
-        *
-        * </pre>
-        */
-       @Test
-       public void testFewerPartitions() {
-               FlinkKafkaDelegatePartitioner<String> part = new 
FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
-
-               int[] partitions = new int[]{0, 1, 2, 3, 4};
-               part.setPartitions(partitions);
-               
-               part.open(0, 2);
-               Assert.assertEquals(0, part.partition("abc1", null, null, null, 
partitions));
-               Assert.assertEquals(0, part.partition("abc1", null, null, null, 
partitions));
-
-               part.open(1, 2);
-               Assert.assertEquals(1, part.partition("abc1", null, null, null, 
partitions));
-               Assert.assertEquals(1, part.partition("abc1", null, null, null, 
partitions));
-       }
-
-       /*
-        *              Flink Sinks:            Kafka Partitions
-        *                      1       ------------>--->       1
-        *                      2       -----------/---->       2
-        *                      3       ----------/
-        */
-       @Test
-       public void testMixedCase() {
-               FlinkKafkaDelegatePartitioner<String> part = new 
FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>());
-               int[] partitions = new int[]{0,1};
-               part.setPartitions(partitions);
-
-               part.open(0, 3);
-               Assert.assertEquals(0, part.partition("abc1", null, null, null, 
partitions));
-
-               part.open(1, 3);
-               Assert.assertEquals(1, part.partition("abc1", null, null, null, 
partitions));
-
-               part.open(2, 3);
-               Assert.assertEquals(0, part.partition("abc1", null, null, null, 
partitions));
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
deleted file mode 100644
index 43e1aa7..0000000
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import 
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
-
-/**
- * Special partitioner that uses the first field of a 2-tuple as the partition,
- * and that expects a specific number of partitions. Use 
Tuple2FlinkPartitioner instead
- */
-@Deprecated
-public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, 
Integer>> implements Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       private final int expectedPartitions;
-
-       public Tuple2Partitioner(int expectedPartitions) {
-               this.expectedPartitions = expectedPartitions;
-       }
-
-       @Override
-       public int partition(Tuple2<Integer, Integer> next, byte[] 
serializedKey, byte[] serializedValue, int numPartitions) {
-               if (numPartitions != expectedPartitions) {
-                       throw new IllegalArgumentException("Expected " + 
expectedPartitions + " partitions");
-               }
-
-               return next.f0;
-       }
-}

Reply via email to