http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 2adb5ec..203d814 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -68,7 +68,7 @@ import org.apache.flink.streaming.connectors.kafka.testutils.FailingIdentityMapp import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils; import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidatingMapper; import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper; -import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner; +import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner; import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; @@ -1199,9 +1199,9 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { /** * Test producing and consuming into multiple topics - * @throws java.lang.Exception + * @throws Exception */ - public void runProduceConsumeMultipleTopics() throws java.lang.Exception { + public void runProduceConsumeMultipleTopics() throws Exception { final int NUM_TOPICS = 5; final int NUM_ELEMENTS = 20; @@ -1291,6 +1291,55 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { } } + private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>, + KeyedSerializationSchema<Tuple3<Integer, Integer, String>> { + + private final TypeSerializer<Tuple2<Integer, Integer>> ts; + + public Tuple2WithTopicSchema(ExecutionConfig ec) { + ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec); + } + + @Override + public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); + Tuple2<Integer, Integer> t2 = ts.deserialize(in); + return new Tuple3<>(t2.f0, t2.f1, topic); + } + + @Override + public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) { + return false; + } + + @Override + public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() { + return TypeInfoParser.parse("Tuple3<Integer, Integer, String>"); + } + + @Override + public byte[] serializeKey(Tuple3<Integer, Integer, String> element) { + return null; + } + + @Override + public byte[] serializeValue(Tuple3<Integer, Integer, String> element) { + ByteArrayOutputStream by = new ByteArrayOutputStream(); + DataOutputView out = new DataOutputViewStreamWrapper(by); + try { + ts.serialize(new Tuple2<>(element.f0, element.f1), out); + } catch (IOException e) { + throw new RuntimeException("Error" ,e); + } + return by.toByteArray(); + } + + @Override + public String getTargetTopic(Tuple3<Integer, Integer, String> element) { + return element.f2; + } + } + /** * Test Flink's Kafka integration also with very big records (30MB) * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message @@ -1975,7 +2024,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { producerProperties.setProperty("retries", "0"); producerProperties.putAll(secureProps); - kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2Partitioner(parallelism)) + kafkaServer.produceIntoKafka(stream, topicName, serSchema, producerProperties, new Tuple2FlinkPartitioner(parallelism)) .setParallelism(parallelism); try { @@ -2227,53 +2276,4 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { this.numElementsTotal = state.get(0); } } - - private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>, - KeyedSerializationSchema<Tuple3<Integer, Integer, String>> { - - private final TypeSerializer<Tuple2<Integer, Integer>> ts; - - public Tuple2WithTopicSchema(ExecutionConfig ec) { - ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec); - } - - @Override - public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { - DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); - Tuple2<Integer, Integer> t2 = ts.deserialize(in); - return new Tuple3<>(t2.f0, t2.f1, topic); - } - - @Override - public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) { - return false; - } - - @Override - public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() { - return TypeInfoParser.parse("Tuple3<Integer, Integer, String>"); - } - - @Override - public byte[] serializeKey(Tuple3<Integer, Integer, String> element) { - return null; - } - - @Override - public byte[] serializeValue(Tuple3<Integer, Integer, String> element) { - ByteArrayOutputStream by = new ByteArrayOutputStream(); - DataOutputView out = new DataOutputViewStreamWrapper(by); - try { - ts.serialize(new Tuple2<>(element.f0, element.f1), out); - } catch (IOException e) { - throw new RuntimeException("Error" ,e); - } - return by.toByteArray(); - } - - @Override - public String getTargetTopic(Tuple3<Integer, Integer, String> element) { - return element.f2; - } - } }
http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java index 6f61392..8285048 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java @@ -27,12 +27,11 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; import org.apache.flink.test.util.SuccessException; - import java.io.Serializable; import java.util.Properties; @@ -174,7 +173,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { // ------------------------------------------------------------------------ - public static class CustomPartitioner extends KafkaPartitioner<Tuple2<Long, String>> implements Serializable { + public static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable { private final int expectedPartitions; @@ -184,10 +183,10 @@ public abstract class KafkaProducerTestBase extends KafkaTestBase { @Override - public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { - assertEquals(expectedPartitions, numPartitions); + public int partition(Tuple2<Long, String> next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) { + assertEquals(expectedPartitions, partitions.length); - return (int) (next.f0 % numPartitions); + return (int) (next.f0 % expectedPartitions); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java index e0e8f84..0fdc82e 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -17,25 +17,26 @@ */ package org.apache.flink.streaming.connectors.kafka; +import java.io.Serializable; +import java.util.Properties; + import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.api.Types; -import org.apache.flink.types.Row; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.table.api.Types; +import org.apache.flink.types.Row; import org.junit.Test; -import java.io.Serializable; -import java.util.Properties; - import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotSame; import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -45,6 +46,7 @@ public abstract class KafkaTableSinkTestBase { protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"}; private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { Types.INT(), Types.STRING() }; private static final KafkaPartitioner<Row> PARTITIONER = new CustomPartitioner(); + private static final FlinkKafkaPartitioner<Row> FLINK_PARTITIONER = new FlinkCustomPartitioner(); private static final Properties PROPERTIES = createSinkProperties(); @SuppressWarnings("unchecked") private final FlinkKafkaProducerBase<Row> PRODUCER = new FlinkKafkaProducerBase<Row>( @@ -72,6 +74,23 @@ public abstract class KafkaTableSinkTestBase { } @Test + @SuppressWarnings("unchecked") + public void testKafkaTableSinkWithFlinkPartitioner() throws Exception { + DataStream dataStream = mock(DataStream.class); + + KafkaTableSink kafkaTableSink = spy(createTableSinkWithFlinkPartitioner()); + kafkaTableSink.emitDataStream(dataStream); + + verify(dataStream).addSink(eq(PRODUCER)); + + verify(kafkaTableSink).createKafkaProducer( + eq(TOPIC), + eq(PROPERTIES), + any(getSerializationSchema().getClass()), + eq(FLINK_PARTITIONER)); + } + + @Test public void testConfiguration() { KafkaTableSink kafkaTableSink = createTableSink(); KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES); @@ -82,15 +101,33 @@ public abstract class KafkaTableSinkTestBase { assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType()); } + @Test + public void testConfigurationWithFlinkPartitioner() { + KafkaTableSink kafkaTableSink = createTableSinkWithFlinkPartitioner(); + KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES); + assertNotSame(kafkaTableSink, newKafkaTableSink); + + assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames()); + assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes()); + assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType()); + } + protected abstract KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer); + protected abstract KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, + Properties properties, FlinkKafkaPartitioner<Row> partitioner, FlinkKafkaProducerBase<Row> kafkaProducer); + protected abstract SerializationSchema<Row> getSerializationSchema(); private KafkaTableSink createTableSink() { return createTableSink(TOPIC, PROPERTIES, PARTITIONER, PRODUCER); } + private KafkaTableSink createTableSinkWithFlinkPartitioner() { + return createTableSinkWithFlinkPartitioner(TOPIC, PROPERTIES, FLINK_PARTITIONER, PRODUCER); + } + private static Properties createSinkProperties() { Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:12345"); @@ -103,4 +140,11 @@ public abstract class KafkaTableSinkTestBase { return 0; } } + + private static class FlinkCustomPartitioner extends FlinkKafkaPartitioner<Row> { + @Override + public int partition(Row record, byte[] key, byte[] value, String targetTopic, int[] partitions) { + return 0; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java index 9a7c96a..311a1a4 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -17,20 +17,20 @@ package org.apache.flink.streaming.connectors.kafka; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + import kafka.server.KafkaServer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.StreamSink; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; -import java.util.Collections; -import java.util.List; -import java.util.Properties; - /** * Abstract class providing a Kafka test environment */ @@ -81,11 +81,11 @@ public abstract class KafkaTestEnvironment { public abstract <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, - KafkaPartitioner<T> partitioner); + FlinkKafkaPartitioner<T> partitioner); public abstract <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, - KafkaPartitioner<T> partitioner); + FlinkKafkaPartitioner<T> partitioner); // -- offset handlers http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java new file mode 100644 index 0000000..fa84199 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkFixedPartitioner.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.junit.Assert; +import org.junit.Test; + +public class TestFlinkFixedPartitioner { + + + /** + * <pre> + * Flink Sinks: Kafka Partitions + * 1 ----------------> 1 + * 2 --------------/ + * 3 -------------/ + * 4 ------------/ + * </pre> + */ + @Test + public void testMoreFlinkThanBrokers() { + FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>(); + + int[] partitions = new int[]{0}; + + part.open(0, 4); + Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions)); + + part.open(1, 4); + Assert.assertEquals(0, part.partition("abc2", null, null, null, partitions)); + + part.open(2, 4); + Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); + Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); // check if it is changing ;) + + part.open(3, 4); + Assert.assertEquals(0, part.partition("abc4", null, null, null, partitions)); + } + + /** + * + * <pre> + * Flink Sinks: Kafka Partitions + * 1 ----------------> 1 + * 2 ----------------> 2 + * 3 + * 4 + * 5 + * + * </pre> + */ + @Test + public void testFewerPartitions() { + FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>(); + + int[] partitions = new int[]{0, 1, 2, 3, 4}; + part.open(0, 2); + Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions)); + Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions)); + + part.open(1, 2); + Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions)); + Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions)); + } + + /* + * Flink Sinks: Kafka Partitions + * 1 ------------>---> 1 + * 2 -----------/----> 2 + * 3 ----------/ + */ + @Test + public void testMixedCase() { + FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>(); + int[] partitions = new int[]{0,1}; + + part.open(0, 3); + Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions)); + + part.open(1, 3); + Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions)); + + part.open(2, 3); + Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions)); + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java new file mode 100644 index 0000000..c6be71c --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/TestFlinkKafkaDelegatePartitioner.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test FlinkKafkaDelegatePartitioner using FixedPartitioner + */ +public class TestFlinkKafkaDelegatePartitioner { + + /** + * <pre> + * Flink Sinks: Kafka Partitions + * 1 ----------------> 1 + * 2 --------------/ + * 3 -------------/ + * 4 ------------/ + * </pre> + */ + @Test + public void testMoreFlinkThanBrokers() { + FlinkFixedPartitioner<String> part = new FlinkFixedPartitioner<>(); + + int[] partitions = new int[]{0}; + + part.open(0, 4); + Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions)); + + part.open(1, 4); + Assert.assertEquals(0, part.partition("abc2", null, null, null, partitions)); + + part.open(2, 4); + Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); + Assert.assertEquals(0, part.partition("abc3", null, null, null, partitions)); // check if it is changing ;) + + part.open(3, 4); + Assert.assertEquals(0, part.partition("abc4", null, null, null, partitions)); + } + + /** + * + * <pre> + * Flink Sinks: Kafka Partitions + * 1 ----------------> 1 + * 2 ----------------> 2 + * 3 + * 4 + * 5 + * + * </pre> + */ + @Test + public void testFewerPartitions() { + FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>()); + + int[] partitions = new int[]{0, 1, 2, 3, 4}; + part.setPartitions(partitions); + + part.open(0, 2); + Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions)); + Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions)); + + part.open(1, 2); + Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions)); + Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions)); + } + + /* + * Flink Sinks: Kafka Partitions + * 1 ------------>---> 1 + * 2 -----------/----> 2 + * 3 ----------/ + */ + @Test + public void testMixedCase() { + FlinkKafkaDelegatePartitioner<String> part = new FlinkKafkaDelegatePartitioner<>(new FixedPartitioner<String>()); + int[] partitions = new int[]{0,1}; + part.setPartitions(partitions); + + part.open(0, 3); + Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions)); + + part.open(1, 3); + Assert.assertEquals(1, part.partition("abc1", null, null, null, partitions)); + + part.open(2, 3); + Assert.assertEquals(0, part.partition("abc1", null, null, null, partitions)); + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java index c383eb5..c0fb836 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java @@ -18,6 +18,10 @@ package org.apache.flink.streaming.connectors.kafka.testutils; +import java.util.Collection; +import java.util.Properties; +import java.util.Random; + import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -31,18 +35,14 @@ import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase; import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment; -import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; -import java.util.Collection; -import java.util.Properties; -import java.util.Random; - @SuppressWarnings("serial") public class DataGenerators { @@ -107,10 +107,10 @@ public class DataGenerators { testServer.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new TypeInformationSerializationSchema<>(BasicTypeInfo.INT_TYPE_INFO, env.getConfig())), props, - new KafkaPartitioner<Integer>() { + new FlinkKafkaPartitioner<Integer>() { @Override - public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { - return next % numPartitions; + public int partition(Integer next, byte[] serializedKey, byte[] serializedValue, String topic, int[] partitions) { + return next % partitions.length; } }); @@ -149,7 +149,7 @@ public class DataGenerators { topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), producerProperties, - new FixedPartitioner<String>()); + new FlinkFixedPartitioner<String>()); OneInputStreamOperatorTestHarness<String, Object> testHarness = new OneInputStreamOperatorTestHarness<>(sink); http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java new file mode 100644 index 0000000..e7fff52 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.testutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; + +/** + * Special partitioner that uses the first field of a 2-tuple as the partition, + * and that expects a specific number of partitions. + */ +public class Tuple2FlinkPartitioner extends FlinkKafkaPartitioner<Tuple2<Integer, Integer>> { + private static final long serialVersionUID = -3589898230375281549L; + + private final int expectedPartitions; + + public Tuple2FlinkPartitioner(int expectedPartitions) { + this.expectedPartitions = expectedPartitions; + } + + @Override + public int partition(Tuple2<Integer, Integer> next, byte[] key, byte[] value, String targetTopic, int[] partitions) { + if (partitions.length != expectedPartitions) { + throw new IllegalArgumentException("Expected " + expectedPartitions + " partitions"); + } + + return next.f0; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9ed9b683/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java index c9e9ac1..43e1aa7 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2Partitioner.java @@ -18,15 +18,16 @@ package org.apache.flink.streaming.connectors.kafka.testutils; +import java.io.Serializable; + import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import java.io.Serializable; - /** * Special partitioner that uses the first field of a 2-tuple as the partition, - * and that expects a specific number of partitions. + * and that expects a specific number of partitions. Use Tuple2FlinkPartitioner instead */ +@Deprecated public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer>> implements Serializable { private static final long serialVersionUID = 1L; @@ -45,4 +46,4 @@ public class Tuple2Partitioner extends KafkaPartitioner<Tuple2<Integer, Integer> return next.f0; } -} \ No newline at end of file +}
