http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java index 8285048..bcc8328 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java @@ -29,10 +29,14 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; import org.apache.flink.test.util.SuccessException; +import org.apache.flink.util.Preconditions; import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import static org.apache.flink.test.util.TestUtils.tryExecute; @@ -42,31 +46,50 @@ import static org.junit.Assert.fail; @SuppressWarnings("serial") public abstract class KafkaProducerTestBase extends KafkaTestBase { - /** - * + * This tests verifies that custom partitioning works correctly, with a default topic + * and dynamic topic. The number of partitions for each topic is deliberately different. + * + * Test topology: + * * <pre> - * +------> (sink) --+--> [KAFKA-1] --> (source) -> (map) --+ - * / | \ - * / | \ - * (source) ----------> (sink) --+--> [KAFKA-2] --> (source) -> (map) -----+-> (sink) - * \ | / - * \ | / - * +------> (sink) --+--> [KAFKA-3] --> (source) -> (map) --+ + * +------> (sink) --+--> [DEFAULT_TOPIC-1] --> (source) -> (map) -----+ + * / | | | | + * | | | | ------+--> (sink) + * +------> (sink) --+--> [DEFAULT_TOPIC-2] --> (source) -> (map) -----+ + * / | + * | | + * (source) ----------> (sink) --+--> [DYNAMIC_TOPIC-1] --> (source) -> (map) -----+ + * | | | | | + * \ | | | | + * +------> (sink) --+--> [DYNAMIC_TOPIC-2] --> (source) -> (map) -----+--> (sink) + * | | | | | + * \ | | | | + * +------> (sink) --+--> [DYNAMIC_TOPIC-3] --> (source) -> (map) -----+ * </pre> * - * The mapper validates that the values come consistently from the correct Kafka partition. + * Each topic has an independent mapper that validates the values come consistently from + * the correct Kafka partition of the topic is is responsible of. * - * The final sink validates that there are no duplicates and that all partitions are present. + * Each topic also has a final sink that validates that there are no duplicates and that all + * partitions are present. */ public void runCustomPartitioningTest() { try { LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()"); - final String topic = "customPartitioningTestTopic"; - final int parallelism = 3; - - createTestTopic(topic, parallelism, 1); + final String defaultTopic = "defaultTopic"; + final int defaultTopicPartitions = 2; + + final String dynamicTopic = "dynamicTopic"; + final int dynamicTopicPartitions = 3; + + createTestTopic(defaultTopic, defaultTopicPartitions, 1); + createTestTopic(dynamicTopic, dynamicTopicPartitions, 1); + + Map<String, Integer> expectedTopicsToNumPartitions = new HashMap<>(2); + expectedTopicsToNumPartitions.put(defaultTopic, defaultTopicPartitions); + expectedTopicsToNumPartitions.put(dynamicTopic, dynamicTopicPartitions); TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long, String>"); @@ -75,13 +98,13 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { env.getConfig().disableSysoutLogging(); TypeInformationSerializationSchema<Tuple2<Long, String>> serSchema = - new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig()); + new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig()); TypeInformationSerializationSchema<Tuple2<Long, String>> deserSchema = - new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig()); + new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig()); // ------ producing topology --------- - + // source has DOP 1 to make sure it generates no duplicates DataStream<Tuple2<Long, String>> stream = env.addSource(new SourceFunction<Tuple2<Long, String>>() { @@ -100,69 +123,44 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { public void cancel() { running = false; } - }) - .setParallelism(1); + }).setParallelism(1); Properties props = new Properties(); props.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings)); props.putAll(secureProps); - - // sink partitions into - kafkaServer.produceIntoKafka(stream, topic, - new KeyedSerializationSchemaWrapper<>(serSchema), + + // sink partitions into + kafkaServer.produceIntoKafka(stream, defaultTopic, + // this serialization schema will route between the default topic and dynamic topic + new CustomKeyedSerializationSchemaWrapper(serSchema, defaultTopic, dynamicTopic), props, - new CustomPartitioner(parallelism)).setParallelism(parallelism); + new CustomPartitioner(expectedTopicsToNumPartitions)) + .setParallelism(Math.max(defaultTopicPartitions, dynamicTopicPartitions)); // ------ consuming topology --------- Properties consumerProps = new Properties(); consumerProps.putAll(standardProps); consumerProps.putAll(secureProps); - FlinkKafkaConsumerBase<Tuple2<Long, String>> source = kafkaServer.getConsumer(topic, deserSchema, consumerProps); - - env.addSource(source).setParallelism(parallelism) - - // mapper that validates partitioning and maps to partition - .map(new RichMapFunction<Tuple2<Long, String>, Integer>() { - - private int ourPartition = -1; - @Override - public Integer map(Tuple2<Long, String> value) { - int partition = value.f0.intValue() % parallelism; - if (ourPartition != -1) { - assertEquals("inconsistent partitioning", ourPartition, partition); - } else { - ourPartition = partition; - } - return partition; - } - }).setParallelism(parallelism) - - .addSink(new SinkFunction<Integer>() { - - private int[] valuesPerPartition = new int[parallelism]; - - @Override - public void invoke(Integer value) throws Exception { - valuesPerPartition[value]++; - - boolean missing = false; - for (int i : valuesPerPartition) { - if (i < 100) { - missing = true; - break; - } - } - if (!missing) { - throw new SuccessException(); - } - } - }).setParallelism(1); - + + FlinkKafkaConsumerBase<Tuple2<Long, String>> defaultTopicSource = + kafkaServer.getConsumer(defaultTopic, deserSchema, consumerProps); + FlinkKafkaConsumerBase<Tuple2<Long, String>> dynamicTopicSource = + kafkaServer.getConsumer(dynamicTopic, deserSchema, consumerProps); + + env.addSource(defaultTopicSource).setParallelism(defaultTopicPartitions) + .map(new PartitionValidatingMapper(defaultTopicPartitions)).setParallelism(defaultTopicPartitions) + .addSink(new PartitionValidatingSink(defaultTopicPartitions)).setParallelism(1); + + env.addSource(dynamicTopicSource).setParallelism(dynamicTopicPartitions) + .map(new PartitionValidatingMapper(dynamicTopicPartitions)).setParallelism(dynamicTopicPartitions) + .addSink(new PartitionValidatingSink(dynamicTopicPartitions)).setParallelism(1); + tryExecute(env, "custom partitioning test"); - deleteTestTopic(topic); - + deleteTestTopic(defaultTopic); + deleteTestTopic(dynamicTopic); + LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()"); } catch (Exception e) { @@ -175,18 +173,94 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { public static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable { - private final int expectedPartitions; + private final Map<String, Integer> expectedTopicsToNumPartitions; - public CustomPartitioner(int expectedPartitions) { - this.expectedPartitions = expectedPartitions; + public CustomPartitioner(Map<String, Integer> expectedTopicsToNumPartitions) { + this.expectedTopicsToNumPartitions = expectedTopicsToNumPartitions; } - @Override public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) { - assertEquals(expectedPartitions, partitions.length); + assertEquals(expectedTopicsToNumPartitions.get(topic).intValue(), partitions.length); + + return (int) (next.f0 % partitions.length); + } + } + + /** + * A {@link KeyedSerializationSchemaWrapper} that supports routing serialized records to different target topics. + */ + public static class CustomKeyedSerializationSchemaWrapper extends KeyedSerializationSchemaWrapper<Tuple2<Long, String>> { + + private final String defaultTopic; + private final String dynamicTopic; + + public CustomKeyedSerializationSchemaWrapper( + SerializationSchema<Tuple2<Long, String>> serializationSchema, + String defaultTopic, + String dynamicTopic) { + + super(serializationSchema); + + this.defaultTopic = Preconditions.checkNotNull(defaultTopic); + this.dynamicTopic = Preconditions.checkNotNull(dynamicTopic); + } - return (int) (next.f0 % expectedPartitions); + @Override + public String getTargetTopic(Tuple2<Long, String> element) { + return (element.f0 % 2 == 0) ? defaultTopic : dynamicTopic; + } + } + + /** + * Mapper that validates partitioning and maps to partition. + */ + public static class PartitionValidatingMapper extends RichMapFunction<Tuple2<Long, String>, Integer> { + + private final int numPartitions; + + private int ourPartition = -1; + + public PartitionValidatingMapper(int numPartitions) { + this.numPartitions = numPartitions; + } + + @Override + public Integer map(Tuple2<Long, String> value) throws Exception { + int partition = value.f0.intValue() % numPartitions; + if (ourPartition != -1) { + assertEquals("inconsistent partitioning", ourPartition, partition); + } else { + ourPartition = partition; + } + return partition; + } + } + + /** + * Sink that validates records received from each partition and checks that there are no duplicates. + */ + public static class PartitionValidatingSink implements SinkFunction<Integer> { + private final int[] valuesPerPartition; + + public PartitionValidatingSink(int numPartitions) { + this.valuesPerPartition = new int[numPartitions]; + } + + @Override + public void invoke(Integer value) throws Exception { + valuesPerPartition[value]++; + + boolean missing = false; + for (int i : valuesPerPartition) { + if (i < 100) { + missing = true; + break; + } + } + if (!missing) { + throw new SuccessException(); + } } } }
http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java index 0fdc82e..d4fe9cc 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -17,14 +17,12 @@ */ package org.apache.flink.streaming.connectors.kafka; -import java.io.Serializable; import java.util.Properties; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.table.api.Types; @@ -45,8 +43,7 @@ public abstract class KafkaTableSinkTestBase { private static final String TOPIC = "testTopic"; protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"}; private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { Types.INT(), Types.STRING() }; - private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner(); - private static final FlinkKafkaPartitioner<Row> FLINK_PARTITIONER = new FlinkCustomPartitioner(); + private static final FlinkKafkaPartitioner<Row> PARTITIONER = new CustomPartitioner(); private static final Properties PROPERTIES = createSinkProperties(); @SuppressWarnings("unchecked") private final FlinkKafkaProducerBase<Row> PRODUCER = new FlinkKafkaProducerBase<Row>( @@ -74,23 +71,6 @@ public abstract class KafkaTableSinkTestBase { } @Test - @SuppressWarnings("unchecked") - public void testKafkaTableSinkWithFlinkPartitioner() throws Exception { - DataStream dataStream = mock(DataStream.class); - - KafkaTableSink kafkaTableSink = spy(createTableSinkWithFlinkPartitioner()); - kafkaTableSink.emitDataStream(dataStream); - - verify(dataStream).addSink(eq(PRODUCER)); - - verify(kafkaTableSink).createKafkaProducer( - eq(TOPIC), - eq(PROPERTIES), - any(getSerializationSchema().getClass()), - eq(FLINK_PARTITIONER)); - } - - @Test public void testConfiguration() { KafkaTableSink kafkaTableSink = createTableSink(); KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES); @@ -101,22 +81,8 @@ public abstract class KafkaTableSinkTestBase { assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType()); } - @Test - public void testConfigurationWithFlinkPartitioner() { - KafkaTableSink kafkaTableSink = createTableSinkWithFlinkPartitioner(); - KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES); - assertNotSame(kafkaTableSink, newKafkaTableSink); - - assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames()); - assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes()); - assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType()); - } - protected abstract KafkaTableSink createTableSink(String topic, Properties properties, - KafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer); - - protected abstract KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, - Properties properties, FlinkKafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer); + FlinkKafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer); protected abstract SerializationSchema<Row> getSerializationSchema(); @@ -124,24 +90,13 @@ public abstract class KafkaTableSinkTestBase { return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER); } - private KafkaTableSink createTableSinkWithFlinkPartitioner() { - return createTableSinkWithFlinkPartitioner(TOPIC, PROPERTIES, FLINK_PARTITIONER, PRODUCER); - } - private static Properties createSinkProperties() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:12345"); return properties; } - private static class CustomPartitioner extends KafkaPartitioner<Row> implements Serializable { - @Override - public int partition(Row next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { - return 0; - } - } - - private static class FlinkCustomPartitioner extends FlinkKafkaPartitioner<Row> { + private static class CustomPartitioner extends FlinkKafkaPartitioner<Row> { @Override public int partition(Row record, byte[] key, byte[] value, String targetTopic, int[] partitions) { return 0; http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java deleted file mode 100644 index 5dab05a..0000000 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFixedPartitioner.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import org.junit.Assert; -import org.junit.Test; -import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; - -public class TestFixedPartitioner { - - - /** - * <pre> - * Flink Sinks: Kafka Partitions - * 1 ----------------> 1 - * 2 --------------/ - * 3 -------------/ - * 4 ------------/ - * </pre> - */ - @Test - public void testMoreFlinkThanBrokers() { - FixedPartitioner<String> part = new FixedPartitioner<>(); - - int[] partitions = new int[]{0}; - - part.open(0, 4, partitions); - Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); - - part.open(1, 4, partitions); - Assert.assertEquals(0, part.partition("abc2", null, null, partitions.length)); - - part.open(2, 4, partitions); - Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length)); - Assert.assertEquals(0, part.partition("abc3", null, null, partitions.length)); // check if it is changing ;) - - part.open(3, 4, partitions); - Assert.assertEquals(0, part.partition("abc4", null, null, partitions.length)); - } - - /** - * - * <pre> - * Flink Sinks: Kafka Partitions - * 1 ----------------> 1 - * 2 ----------------> 2 - * 3 - * 4 - * 5 - * - * </pre> - */ - @Test - public void testFewerPartitions() { - FixedPartitioner<String> part = new FixedPartitioner<>(); - - int[] partitions = new int[]{0, 1, 2, 3, 4}; - part.open(0, 2, partitions); - Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); - Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); - - part.open(1, 2, partitions); - Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length)); - Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length)); - } - - /* - * Flink Sinks: Kafka Partitions - * 1 ------------>---> 1 - * 2 -----------/----> 2 - * 3 ----------/ - */ - @Test - public void testMixedCase() { - FixedPartitioner<String> part = new FixedPartitioner<>(); - int[] partitions = new int[]{0,1}; - - part.open(0, 3, partitions); - Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); - - part.open(1, 3, partitions); - Assert.assertEquals(1, part.partition("abc1", null, null, partitions.length)); - - part.open(2, 3, partitions); - Assert.assertEquals(0, part.partition("abc1", null, null, partitions.length)); - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java deleted file mode 100644 index c6be71c..0000000 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; -import org.junit.Assert; -import org.junit.Test; - -/** - * Test FlinkKafkaDelegatePartitioner using FixedPartitioner - */ -public class TestFlinkKafkaDelegatePartitioner { - - /** - * <pre> - * Flink Sinks: Kafka Partitions - * 1 ----------------> 1 - * 2 --------------/ - * 3 -------------/ - * 4 ------------/ - * </pre> - */ - @Test - public void testMoreFlinkThanBrokers() { - FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>(); - - int[] partitions = new int[]{0}; - - part.open(0, 4); - Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions)); - - part.open(1, 4); - Assert.assertEquals(0, part.partition("abc2", null, null, null, partitions)); - - part.open(2, 4); - Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); - Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); // check if it is changing ;) - - part.open(3, 4); - Assert.assertEquals(0, part.partition("abc4", null, null, null, partitions)); - } - - /** - * - * <pre> - * Flink Sinks: Kafka Partitions - * 1 ----------------> 1 - * 2 ----------------> 2 - * 3 - * 4 - * 5 - * - * </pre> - */ - @Test - public void testFewerPartitions() { - FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>()); - - int[] partitions = new int[]{0, 1, 2, 3, 4}; - part.setPartitions(partitions); - - part.open(0, 2); - Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions)); - Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions)); - - part.open(1, 2); - Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions)); - Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions)); - } - - /* - * Flink Sinks: Kafka Partitions - * 1 ------------>---> 1 - * 2 -----------/----> 2 - * 3 ----------/ - */ - @Test - public void testMixedCase() { - FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>()); - int[] partitions = new int[]{0,1}; - part.setPartitions(partitions); - - part.open(0, 3); - Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions)); - - part.open(1, 3); - Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions)); - - part.open(2, 3); - Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions)); - - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e3fcbb08/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java deleted file mode 100644 index 43e1aa7..0000000 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.testutils; - -import java.io.Serializable; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; - -/** - * Special partitioner that uses the first field of a 2-tuple as the partition, - * and that expects a specific number of partitions. Use Tuple2FlinkPartitioner instead - */ -@Deprecated -public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>> implements Serializable { - - private static final long serialVersionUID = 1L; - - private final int expectedPartitions; - - public Tuple2Partitioner(int expectedPartitions) { - this.expectedPartitions = expectedPartitions; - } - - @Override - public int partition(Tuple2<Integer, Integer> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { - if (numPartitions != expectedPartitions) { - throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions"); - } - - return next.f0; - } -}
