[FLINK-6288] [kafka] Cleanup and improvements to FlinkKafkaPartitioner custom partitioning
This commit wraps up some general improvements to the new Kafka sink custom partitioning API, most notably: 1. remove deprecated constructors from base classes, as they are not user-facing. 2. modify producer IT test to test custom partitioning for dynamic topics. 3. improve documentation and Javadocs of the new interfaces. This closes #3901. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d3b58709 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d3b58709 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d3b58709 Branch: refs/heads/release-1.3 Commit: d3b587096b1fd694625429aa32557e70ed84955d Parents: 58c4eed Author: Tzu-Li (Gordon) Tai <[email protected]> Authored: Thu May 18 21:52:16 2017 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Fri May 19 14:42:04 2017 +0800 ---------------------------------------------------------------------- docs/dev/connectors/kafka.md | 4 +- .../connectors/kafka/FlinkKafkaProducer010.java | 113 +++++----- .../connectors/kafka/FlinkKafkaProducer08.java | 50 +++-- .../connectors/kafka/Kafka08JsonTableSink.java | 21 +- .../kafka/Kafka08JsonTableSinkTest.java | 25 +-- .../connectors/kafka/FlinkKafkaProducer09.java | 52 +++-- .../connectors/kafka/Kafka09JsonTableSink.java | 28 +-- .../kafka/Kafka09JsonTableSinkTest.java | 26 +-- .../kafka/FlinkKafkaProducerBase.java | 89 ++++---- .../connectors/kafka/KafkaJsonTableSink.java | 14 -- .../connectors/kafka/KafkaTableSink.java | 40 +--- .../kafka/partitioner/FixedPartitioner.java | 77 ------- .../partitioner/FlinkFixedPartitioner.java | 19 +- .../FlinkKafkaDelegatePartitioner.java | 5 +- .../partitioner/FlinkKafkaPartitioner.java | 24 +- .../kafka/partitioner/KafkaPartitioner.java | 16 +- .../kafka/FlinkKafkaProducerBaseTest.java | 38 ++-- .../connectors/kafka/KafkaConsumerTestBase.java | 98 ++++---- .../connectors/kafka/KafkaProducerTestBase.java | 222 ++++++++++++------- .../kafka/KafkaTableSinkTestBase.java | 51 +---- .../connectors/kafka/TestFixedPartitioner.java | 104 --------- .../TestFlinkKafkaDelegatePartitioner.java | 111 ---------- .../kafka/testutils/Tuple2Partitioner.java | 49 ---- 23 files changed, 458 insertions(+), 818 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/docs/dev/connectors/kafka.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md index 60a8039..bc7e7de 100644 --- a/docs/dev/connectors/kafka.md +++ b/docs/dev/connectors/kafka.md @@ -456,9 +456,9 @@ are other constructor variants that allow providing the following: Please refer to the [Apache Kafka documentation](https://kafka.apache.org/documentation.html) for details on how to configure Kafka Producers. * *Custom partitioner*: To assign records to specific - partitions, you can provide an implementation of a `KafkaPartitioner` to the + partitions, you can provide an implementation of a `FlinkKafkaPartitioner` to the constructor. This partitioner will be called for each record in the stream - to determine which exact partition the record will be sent to. + to determine which exact partition of the target topic the record should be sent to. * *Advanced serialization schema*: Similar to the consumer, the producer also allows using an advanced serialization schema called `KeyedSerializationSchema`, which allows serializing the key and value separately. It also allows to override the target topic, http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java index 7addafa..711fe07 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java @@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -38,6 +39,7 @@ import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWra import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.kafka.clients.producer.ProducerRecord; +import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPartitionsByTopic; import static org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList; @@ -121,32 +123,6 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. - * @deprecated Use {@link FlinkKafkaProducer010Configuration#writeToKafkaWithTimestamps(DataStream, String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead - */ - @Deprecated - public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, - String topicId, - KeyedSerializationSchema<T> serializationSchema, - Properties producerConfig, - KafkaPartitioner<T> customPartitioner) { - - GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class); - FlinkKafkaProducer010<T> kafkaProducer = new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, customPartitioner); - SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer); - return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to - * the topic. - * - * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) - * - * @param inStream The stream to write to Kafka - * @param topicId The name of the target topic - * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages - * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. */ public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, String topicId, @@ -200,21 +176,6 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) - * @deprecated Use {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead - */ - @Deprecated - public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); - } - - /** - * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to - * the topic. - * - * @param topicId The topic to write data to - * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] - * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) */ public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) { this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); @@ -251,32 +212,84 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig) { this(topicId, serializationSchema, producerConfig, new FlinkFixedPartitioner<T>()); } - + /** * Create Kafka producer * * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above) - * @deprecated Use {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead */ - @Deprecated - public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) { + public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) { // We create a Kafka 09 producer instance here and only "override" (by intercepting) the // invoke call. super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner)); } - + + // ----------------------------- Deprecated constructors / factory methods --------------------------- + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to + * the topic. + * + * This constructor allows writing timestamps to Kafka, it follow approach (b) (see above) + * + * @param inStream The stream to write to Kafka + * @param topicId The name of the target topic + * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + * + * @deprecated This is a deprecated since it does not correctly handle partitioning when + * producing to multiple topics. Use + * {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead. + */ + @Deprecated + public static <T> FlinkKafkaProducer010Configuration<T> writeToKafkaWithTimestamps(DataStream<T> inStream, + String topicId, + KeyedSerializationSchema<T> serializationSchema, + Properties producerConfig, + KafkaPartitioner<T> customPartitioner) { + + GenericTypeInfo<Object> objectTypeInfo = new GenericTypeInfo<>(Object.class); + FlinkKafkaProducer010<T> kafkaProducer = + new FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner)); + SingleOutputStreamOperator<Object> transformation = inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer); + return new FlinkKafkaProducer010Configuration<>(transformation, kafkaProducer); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId The topic to write data to + * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) + * + * @deprecated This is a deprecated since it does not correctly handle partitioning when + * producing to multiple topics. Use + * {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead. + */ + @Deprecated + public FlinkKafkaProducer010(String topicId, SerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); + } + /** * Create Kafka producer * * This constructor does not allow writing timestamps to Kafka, it follow approach (a) (see above) + * + * @deprecated This is a deprecated constructor that does not correctly handle partitioning when + * producing to multiple topics. Use + * {@link FlinkKafkaProducer010#FlinkKafkaProducer010(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead. */ - public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<T> customPartitioner) { + @Deprecated + public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, KafkaPartitioner<T> customPartitioner) { // We create a Kafka 09 producer instance here and only "override" (by intercepting) the // invoke call. - super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, customPartitioner)); + super(new FlinkKafkaProducer09<>(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner))); } - // ----------------------------- Generic element processing --------------------------- private void invokeInternal(T next, long elementTimestamp) throws Exception { @@ -300,7 +313,7 @@ public class FlinkKafkaProducer010<T> extends StreamSink<T> implements SinkFunct ProducerRecord<byte[], byte[]> record; int[] partitions = internalProducer.topicPartitionsMap.get(targetTopic); if(null == partitions) { - partitions = internalProducer.getPartitionsByTopic(targetTopic, internalProducer.producer); + partitions = getPartitionsByTopic(targetTopic, internalProducer.producer); internalProducer.topicPartitionsMap.put(targetTopic, partitions); } if (internalProducer.flinkKafkaPartitioner == null) { http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java index 64d3716..08dcb2f 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; @@ -76,21 +77,6 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> { * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions. - * @deprecated Use {@link FlinkKafkaProducer08#FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead - */ - @Deprecated - public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); - - } - - /** - * The main constructor for creating a FlinkKafkaProducer. - * - * @param topicId The topic to write data to - * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] - * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions. */ public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) { this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); @@ -136,13 +122,30 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> { * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions. - * @deprecated Use {@link FlinkKafkaProducer08#FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead */ - @Deprecated - public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { + public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) { super(topicId, serializationSchema, producerConfig, customPartitioner); } + // ------------------- Deprecated constructors ---------------------- + + /** + * The main constructor for creating a FlinkKafkaProducer. + * + * @param topicId The topic to write data to + * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions. + * + * @deprecated This is a deprecated constructor that does not correctly handle partitioning when + * producing to multiple topics. Use + * {@link FlinkKafkaProducer08(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead. + */ + @Deprecated + public FlinkKafkaProducer08(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); + } + /** * The main constructor for creating a FlinkKafkaProducer. * @@ -150,11 +153,18 @@ public class FlinkKafkaProducer08<IN> extends FlinkKafkaProducerBase<IN> { * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param customPartitioner A serializable partitioner for assining messages to Kafka partitions. + * + * @deprecated This is a deprecated constructor that does not correctly handle partitioning when + * producing to multiple topics. Use + * {@link FlinkKafkaProducer08(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead. */ - public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) { - super(topicId, serializationSchema, producerConfig, customPartitioner); + @Deprecated + public FlinkKafkaProducer08(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { + super(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner)); } + // --------------------------------------------------------------------- + @Override protected void flush() { // The Kafka 0.8 producer doesn't support flushing, we wait here http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java index 5a066ec0..80bd180 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java @@ -17,6 +17,7 @@ */ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.types.Row; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; @@ -28,7 +29,7 @@ import java.util.Properties; * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format. */ public class Kafka08JsonTableSink extends KafkaJsonTableSink { - + /** * Creates {@link KafkaTableSink} for Kafka 0.8 * @@ -36,24 +37,24 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink { * @param properties properties to connect to Kafka * @param partitioner Kafka partitioner */ - public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) { + public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) { super(topic, properties, partitioner); } - + /** * Creates {@link KafkaTableSink} for Kafka 0.8 * * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka * @param partitioner Kafka partitioner + * + * @deprecated This is a deprecated constructor that does not correctly handle partitioning when + * producing to multiple topics. Use + * {@link #Kafka08JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead. */ - public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) { - super(topic, properties, partitioner); - } - - @Override - protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) { - return new FlinkKafkaProducer08<>(topic, serializationSchema, properties, partitioner); + @Deprecated + public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) { + super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java index 164c162..2136476 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.types.Row; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.apache.flink.streaming.util.serialization.SerializationSchema; @@ -28,26 +27,20 @@ import java.util.Properties; public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase { @Override - protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, + protected KafkaTableSink createTableSink( + String topic, + Properties properties, + FlinkKafkaPartitioner<Row> partitioner, final FlinkKafkaProducerBase<Row> kafkaProducer) { return new Kafka08JsonTableSink(topic, properties, partitioner) { @Override - protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, - SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) { - return kafkaProducer; - } - }; - } + protected FlinkKafkaProducerBase<Row> createKafkaProducer( + String topic, + Properties properties, + SerializationSchema<Row> serializationSchema, + FlinkKafkaPartitioner<Row> partitioner) { - @Override - protected KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner, - final FlinkKafkaProducerBase<Row> kafkaProducer) { - - return new Kafka08JsonTableSink(topic, properties, partitioner) { - @Override - protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, - SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) { return kafkaProducer; } }; http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java index 4f41c43..cbed361 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; @@ -78,22 +79,6 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> { * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) - * @deprecated Use {@link FlinkKafkaProducer09#FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead - */ - @Deprecated - public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); - - } - - /** - * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to - * the topic. - * - * @param topicId The topic to write data to - * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] - * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) */ public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) { this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); @@ -140,13 +125,31 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> { * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. - * @deprecated Use {@link FlinkKafkaProducer09#FlinkKafkaProducer09(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead */ - @Deprecated - public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { + public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) { super(topicId, serializationSchema, producerConfig, customPartitioner); } + // ------------------- Deprecated constructors ---------------------- + + /** + * Creates a FlinkKafkaProducer for a given topic. the sink produces a DataStream to + * the topic. + * + * @param topicId The topic to write data to + * @param serializationSchema A (keyless) serializable serialization schema for turning user objects into a kafka-consumable byte[] + * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner) + * + * @deprecated This is a deprecated constructor that does not correctly handle partitioning when + * producing to multiple topics. Use + * {@link FlinkKafkaProducer09(String, SerializationSchema, Properties, FlinkKafkaPartitioner)} instead. + */ + @Deprecated + public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { + this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); + } + /** * Creates a FlinkKafkaProducer for a given topic. The sink produces a DataStream to * the topic. @@ -155,11 +158,18 @@ public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> { * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. + * + * @deprecated This is a deprecated constructor that does not correctly handle partitioning when + * producing to multiple topics. Use + * {@link FlinkKafkaProducer09(String, org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema, Properties, FlinkKafkaPartitioner)} instead. */ - public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) { - super(topicId, serializationSchema, producerConfig, customPartitioner); + @Deprecated + public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { + super(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner<>(customPartitioner)); } + // ------------------------------------------------------------------ + @Override protected void flush() { if (this.producer != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java index b82ebc4..a81422e 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java @@ -17,6 +17,7 @@ */ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.types.Row; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; @@ -28,43 +29,32 @@ import java.util.Properties; * Kafka 0.9 {@link KafkaTableSink} that serializes data in JSON format. */ public class Kafka09JsonTableSink extends KafkaJsonTableSink { + /** * Creates {@link KafkaTableSink} for Kafka 0.9 * * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka * @param partitioner Kafka partitioner - * @deprecated Use {@link Kafka09JsonTableSink#Kafka09JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead */ - @Deprecated - public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) { + public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) { super(topic, properties, partitioner); } - + /** * Creates {@link KafkaTableSink} for Kafka 0.9 * * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka * @param partitioner Kafka partitioner - */ - public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) { - super(topic, properties, partitioner); - } - - /** * - * @param topic Kafka topic to produce to. - * @param properties Properties for the Kafka producer. - * @param serializationSchema Serialization schema to use to create Kafka records. - * @param partitioner Partitioner to select Kafka partition. - * @return The version-specific Kafka producer - * @deprecated Use {@link Kafka09JsonTableSink#createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)} instead + * @deprecated This is a deprecated constructor that does not correctly handle partitioning when + * producing to multiple topics. Use + * {@link #Kafka09JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead. */ @Deprecated - @Override - protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) { - return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner); + public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) { + super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java index ad8f623..3afb5e4 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java @@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.types.Row; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.apache.flink.streaming.util.serialization.SerializationSchema; @@ -27,28 +26,21 @@ import java.util.Properties; public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase { - @Deprecated @Override - protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner, + protected KafkaTableSink createTableSink( + String topic, + Properties properties, + FlinkKafkaPartitioner<Row> partitioner, final FlinkKafkaProducerBase<Row> kafkaProducer) { return new Kafka09JsonTableSink(topic, properties, partitioner) { @Override - protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, - SerializationSchema<Row> serializationSchema, KafkaPartitioner<Row> partitioner) { - return kafkaProducer; - } - }; - } + protected FlinkKafkaProducerBase<Row> createKafkaProducer( + String topic, + Properties properties, + SerializationSchema<Row> serializationSchema, + FlinkKafkaPartitioner<Row> partitioner) { - @Override - protected KafkaTableSink createTableSinkWithFlinkPartitioner(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner, - final FlinkKafkaProducerBase<Row> kafkaProducer) { - - return new Kafka09JsonTableSink(topic, properties, partitioner) { - @Override - protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, - SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) { return kafkaProducer; } }; http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java index f9a1e41..3a8228c 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java @@ -40,7 +40,6 @@ import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.util.NetUtils; import org.apache.kafka.clients.producer.Callback; @@ -74,38 +73,38 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im private static final long serialVersionUID = 1L; /** - * Configuration key for disabling the metrics reporting + * Configuration key for disabling the metrics reporting. */ public static final String KEY_DISABLE_METRICS = "flink.disable-metrics"; /** - * User defined properties for the Producer + * User defined properties for the Producer. */ protected final Properties producerConfig; /** - * The name of the default topic this producer is writing data to + * The name of the default topic this producer is writing data to. */ protected final String defaultTopicId; /** - * (Serializable) SerializationSchema for turning objects used with Flink into + * (Serializable) SerializationSchema for turning objects used with Flink into. * byte[] for Kafka. */ protected final KeyedSerializationSchema<IN> schema; /** - * User-provided partitioner for assigning an object to a Kafka partition for each topic + * User-provided partitioner for assigning an object to a Kafka partition for each topic. */ - protected final FlinkKafkaPartitioner flinkKafkaPartitioner; + protected final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner; /** - * Partitions for each topic + * Partitions of each topic */ protected final Map<String, int[]> topicPartitionsMap; /** - * Flag indicating whether to accept failures (and log them), or to fail on failures + * Flag indicating whether to accept failures (and log them), or to fail on failures. */ protected boolean logFailuresOnly; @@ -114,11 +113,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im */ protected boolean flushOnCheckpoint; - /** - * Retry times of fetching kafka meta - */ - protected long kafkaMetaRetryTimes; - // -------------------------------- Runtime fields ------------------------------------------ /** KafkaProducer instance */ @@ -138,21 +132,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im protected OperatorStateStore stateStore; - - /** - * The main constructor for creating a FlinkKafkaProducer. For customPartitioner parameter, use {@link org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner} instead - * - * @param defaultTopicId The default topic to write data to - * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages - * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. Passing null will use Kafka's partitioner. - * @deprecated Use {@link FlinkKafkaProducerBase#FlinkKafkaProducerBase(String, KeyedSerializationSchema, Properties, FlinkKafkaPartitioner)} instead - */ - @Deprecated - public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) { - this(defaultTopicId, serializationSchema, producerConfig, null == customPartitioner ? null : new FlinkKafkaDelegatePartitioner<>(customPartitioner)); - } - /** * The main constructor for creating a FlinkKafkaProducer. * @@ -236,9 +215,11 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im producer = getKafkaProducer(this.producerConfig); RuntimeContext ctx = getRuntimeContext(); + if(null != flinkKafkaPartitioner) { if(flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) { - ((FlinkKafkaDelegatePartitioner)flinkKafkaPartitioner).setPartitions(getPartitionsByTopic(this.defaultTopicId, this.producer)); + ((FlinkKafkaDelegatePartitioner) flinkKafkaPartitioner).setPartitions( + getPartitionsByTopic(this.defaultTopicId, this.producer)); } flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks()); } @@ -290,26 +271,6 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im } } - protected int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) { - // the fetched list is immutable, so we're creating a mutable copy in order to sort it - List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic)); - - // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks - Collections.sort(partitionsList, new Comparator<PartitionInfo>() { - @Override - public int compare(PartitionInfo o1, PartitionInfo o2) { - return Integer.compare(o1.partition(), o2.partition()); - } - }); - - int[] partitions = new int[partitionsList.size()]; - for (int i = 0; i < partitions.length; i++) { - partitions[i] = partitionsList.get(i).partition(); - } - - return partitions; - } - /** * Called when new data arrives to the sink, and forwards it to Kafka. * @@ -330,7 +291,7 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im int[] partitions = this.topicPartitionsMap.get(targetTopic); if(null == partitions) { - partitions = this.getPartitionsByTopic(targetTopic, producer); + partitions = getPartitionsByTopic(targetTopic, producer); this.topicPartitionsMap.put(targetTopic, partitions); } @@ -338,7 +299,11 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im if (flinkKafkaPartitioner == null) { record = new ProducerRecord<>(targetTopic, serializedKey, serializedValue); } else { - record = new ProducerRecord<>(targetTopic, flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), serializedKey, serializedValue); + record = new ProducerRecord<>( + targetTopic, + flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), + serializedKey, + serializedValue); } if (flushOnCheckpoint) { synchronized (pendingRecordsLock) { @@ -425,6 +390,26 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im return props; } + protected static int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) { + // the fetched list is immutable, so we're creating a mutable copy in order to sort it + List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(topic)); + + // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks + Collections.sort(partitionsList, new Comparator<PartitionInfo>() { + @Override + public int compare(PartitionInfo o1, PartitionInfo o2) { + return Integer.compare(o1.partition(), o2.partition()); + } + }); + + int[] partitions = new int[partitionsList.size()]; + for (int i = 0; i < partitions.length; i++) { + partitions[i] = partitionsList.get(i).partition(); + } + + return partitions; + } + @VisibleForTesting protected long numPendingRecords() { synchronized (pendingRecordsLock) { http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java index a0b5033..41bb329 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.types.Row; @@ -29,19 +28,6 @@ import java.util.Properties; * Base class for {@link KafkaTableSink} that serializes data in JSON format */ public abstract class KafkaJsonTableSink extends KafkaTableSink { - - /** - * Creates KafkaJsonTableSink - * - * @param topic topic in Kafka to which table is written - * @param properties properties to connect to Kafka - * @param partitioner Kafka partitioner - * @deprecated Use {@link KafkaJsonTableSink#KafkaJsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead - */ - @Deprecated - public KafkaJsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) { - super(topic, properties, partitioner); - } /** * Creates KafkaJsonTableSink http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index 0a937d6..1c38816 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -18,13 +18,11 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.types.Row; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.util.Preconditions; @@ -34,7 +32,7 @@ import java.util.Properties; * A version-agnostic Kafka {@link AppendStreamTableSink}. * * <p>The version-specific Kafka consumers need to extend this class and - * override {@link #createKafkaProducer(String, Properties, SerializationSchema, KafkaPartitioner)}}. + * override {@link #createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)}}. */ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> { @@ -47,22 +45,6 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> { /** * Creates KafkaTableSink - * - * @param topic Kafka topic to write to. - * @param properties Properties for the Kafka consumer. - * @param partitioner Partitioner to select Kafka partition for each item - * @deprecated Use {@link KafkaTableSink#KafkaTableSink(String, Properties, FlinkKafkaPartitioner)} instead - */ - @Deprecated - public KafkaTableSink( - String topic, - Properties properties, - KafkaPartitioner<Row> partitioner) { - this(topic, properties, new FlinkKafkaDelegatePartitioner<Row>(partitioner)); - } - - /** - * Creates KafkaTableSink * * @param topic Kafka topic to write to. * @param properties Properties for the Kafka consumer. @@ -85,22 +67,6 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> { * @param serializationSchema Serialization schema to use to create Kafka records. * @param partitioner Partitioner to select Kafka partition. * @return The version-specific Kafka producer - * @deprecated Use {@link KafkaTableSink#createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)} instead - */ - @Deprecated - protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer( - String topic, Properties properties, - SerializationSchema<Row> serializationSchema, - KafkaPartitioner<Row> partitioner); - - /** - * Returns the version-specifid Kafka producer. - * - * @param topic Kafka topic to produce to. - * @param properties Properties for the Kafka producer. - * @param serializationSchema Serialization schema to use to create Kafka records. - * @param partitioner Partitioner to select Kafka partition. - * @return The version-specific Kafka producer */ protected abstract FlinkKafkaProducerBase<Row> createKafkaProducer( String topic, Properties properties, @@ -153,8 +119,4 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> { return copy; } - - - - } http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java deleted file mode 100644 index edabfe0..0000000 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka.partitioner; - -import java.io.Serializable; - -/** - * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition. - * - * Note, one Kafka partition can contain multiple Flink partitions. - * - * Cases: - * # More Flink partitions than kafka partitions - * <pre> - * Flink Sinks: Kafka Partitions - * 1 ----------------> 1 - * 2 --------------/ - * 3 -------------/ - * 4 ------------/ - * </pre> - * Some (or all) kafka partitions contain the output of more than one flink partition - * - *# Fewer Flink partitions than Kafka - * <pre> - * Flink Sinks: Kafka Partitions - * 1 ----------------> 1 - * 2 ----------------> 2 - * 3 - * 4 - * 5 - * </pre> - * - * Not all Kafka partitions contain data - * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will - * cause a lot of network connections between all the Flink instances and all the Kafka brokers - * @deprecated Use {@link FlinkFixedPartitioner} instead. - * - */ -@Deprecated -public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable { - private static final long serialVersionUID = 1627268846962918126L; - - private int targetPartition = -1; - - @Override - public void open(int parallelInstanceId, int parallelInstances, int[] partitions) { - if (parallelInstanceId < 0 || parallelInstances <= 0 || partitions.length == 0) { - throw new IllegalArgumentException(); - } - - this.targetPartition = partitions[parallelInstanceId % partitions.length]; - } - - @Override - public int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { - if (targetPartition >= 0) { - return targetPartition; - } else { - throw new RuntimeException("The partitioner has not been initialized properly"); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java index d2eb7af..e47c667 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java @@ -17,6 +17,8 @@ */ package org.apache.flink.streaming.connectors.kafka.partitioner; +import org.apache.flink.util.Preconditions; + /** * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition. * @@ -44,9 +46,8 @@ package org.apache.flink.streaming.connectors.kafka.partitioner; * </pre> * * Not all Kafka partitions contain data - * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will - * cause a lot of network connections between all the Flink instances and all the Kafka brokers - * + * To avoid such an unbalanced partitioning, use a round-robin kafka partitioner (note that this will + * cause a lot of network connections between all the Flink instances and all the Kafka brokers). */ public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> { @@ -54,17 +55,17 @@ public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> { @Override public void open(int parallelInstanceId, int parallelInstances) { - if (parallelInstanceId < 0 || parallelInstances <= 0) { - throw new IllegalArgumentException(); - } + Preconditions.checkArgument(parallelInstanceId >= 0, "Id of this subtask cannot be negative."); + Preconditions.checkArgument(parallelInstances > 0, "Number of subtasks must be larger than 0."); + this.parallelInstanceId = parallelInstanceId; } @Override public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) { - if(null == partitions || partitions.length == 0) { - throw new IllegalArgumentException(); - } + Preconditions.checkArgument( + partitions != null && partitions.length > 0, + "Partitions of the target topic is empty."); return partitions[parallelInstanceId % partitions.length]; } http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java index 469fd1b..b7b4143 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaDelegatePartitioner.java @@ -18,8 +18,9 @@ package org.apache.flink.streaming.connectors.kafka.partitioner; /** - * Delegate for KafkaPartitioner - * @param <T> + * Delegate for the deprecated {@link KafkaPartitioner}. + * This should only be used for bridging deprecated partitioning API methods. + * * @deprecated Delegate for {@link KafkaPartitioner}, use {@link FlinkKafkaPartitioner} instead */ @Deprecated http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java index e074b9b..b634af7 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKafkaPartitioner.java @@ -20,20 +20,34 @@ package org.apache.flink.streaming.connectors.kafka.partitioner; import java.io.Serializable; /** - * It contains a open() method which is called on each parallel instance. - * Partitioners must be serializable! + * A {@link FlinkKafkaPartitioner} wraps logic on how to partition records + * across partitions of multiple Kafka topics. */ public abstract class FlinkKafkaPartitioner<T> implements Serializable { + private static final long serialVersionUID = -9086719227828020494L; /** - * Initializer for the Partitioner. - * @param parallelInstanceId 0-indexed id of the parallel instance in Flink + * Initializer for the partitioner. This is called once on each parallel sink instance of + * the Flink Kafka producer. This method should be overridden if necessary. + * + * @param parallelInstanceId 0-indexed id of the parallel sink instance in Flink * @param parallelInstances the total number of parallel instances */ public void open(int parallelInstanceId, int parallelInstances) { // overwrite this method if needed. } - + + /** + * Determine the id of the partition that the record should be written to. + * + * @param record the record value + * @param key serialized key of the record + * @param value serialized value of the record + * @param targetTopic target topic for the record + * @param partitions found partitions for the target topic + * + * @return the id of the target partition + */ public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions); } http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java index 7c82bd1..eebc619 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java @@ -22,7 +22,9 @@ import java.io.Serializable; /** * It contains a open() method which is called on each parallel instance. * Partitioners must be serializable! - * @deprecated Use {@link FlinkKafkaPartitioner} instead. + * + * @deprecated This partitioner does not handle partitioning properly in the case of + * multiple topics, and has been deprecated. Please use {@link FlinkKafkaPartitioner} instead. */ @Deprecated public abstract class KafkaPartitioner<T> implements Serializable { @@ -34,22 +36,10 @@ public abstract class KafkaPartitioner<T> implements Serializable { * @param parallelInstanceId 0-indexed id of the parallel instance in Flink * @param parallelInstances the total number of parallel instances * @param partitions an array describing the partition IDs of the available Kafka partitions. - * @deprecated Use {@link FlinkKafkaPartitioner#open(int, int)} instead. */ - @Deprecated public void open(int parallelInstanceId, int parallelInstances, int[] partitions) { // overwrite this method if needed. } - /** - * - * @param next - * @param serializedKey - * @param serializedValue - * @param numPartitions - * @return - * @deprecated Use {@link FlinkKafkaPartitioner#partition(T, byte[], byte[], String, int[])} instead. - */ - @Deprecated public abstract int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions); } http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java index 1f16d8e..6b2cc02 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java @@ -22,12 +22,14 @@ import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -61,7 +63,7 @@ public class FlinkKafkaProducerBaseTest { // no bootstrap servers set in props Properties props = new Properties(); // should throw IllegalArgumentException - new DummyFlinkKafkaProducer<>(props, null); + new DummyFlinkKafkaProducer<>(props, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null); } /** @@ -72,7 +74,7 @@ public class FlinkKafkaProducerBaseTest { Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:12345"); // should set missing key value deserializers - new DummyFlinkKafkaProducer<>(props, null); + new DummyFlinkKafkaProducer<>(props, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null); assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); @@ -83,9 +85,10 @@ public class FlinkKafkaProducerBaseTest { /** * Tests that partitions list is determinate and correctly provided to custom partitioner */ + @SuppressWarnings("unchecked") @Test - public void testPartitionerOpenedWithDeterminatePartitionList() throws Exception { - KafkaPartitioner mockPartitioner = mock(KafkaPartitioner.class); + public void testPartitionerInvokedWithDeterminatePartitionList() throws Exception { + FlinkKafkaPartitioner<String> mockPartitioner = mock(FlinkKafkaPartitioner.class); RuntimeContext mockRuntimeContext = mock(RuntimeContext.class); when(mockRuntimeContext.getIndexOfThisSubtask()).thenReturn(0); @@ -98,8 +101,8 @@ public class FlinkKafkaProducerBaseTest { mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 0, null, null, null)); mockPartitionsList.add(new PartitionInfo(DummyFlinkKafkaProducer.DUMMY_TOPIC, 2, null, null, null)); - final DummyFlinkKafkaProducer producer = new DummyFlinkKafkaProducer( - FakeStandardProducerConfig.get(), mockPartitioner); + final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>( + FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), mockPartitioner); producer.setRuntimeContext(mockRuntimeContext); final KafkaProducer mockProducer = producer.getMockKafkaProducer(); @@ -107,10 +110,11 @@ public class FlinkKafkaProducerBaseTest { when(mockProducer.metrics()).thenReturn(null); producer.open(new Configuration()); + verify(mockPartitioner, times(1)).open(0, 1); - // the out-of-order partitions list should be sorted before provided to the custom partitioner's open() method - int[] correctPartitionList = {0, 1, 2, 3}; - verify(mockPartitioner).open(0, 1, correctPartitionList); + producer.invoke("foobar"); + verify(mockPartitioner, times(1)).partition( + "foobar", null, "foobar".getBytes(), DummyFlinkKafkaProducer.DUMMY_TOPIC, new int[] {0, 1, 2, 3}); } /** @@ -119,7 +123,7 @@ public class FlinkKafkaProducerBaseTest { @Test public void testAsyncErrorRethrownOnInvoke() throws Throwable { final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>( - FakeStandardProducerConfig.get(), null); + FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null); OneInputStreamOperatorTestHarness<String, Object> testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer)); @@ -150,7 +154,7 @@ public class FlinkKafkaProducerBaseTest { @Test public void testAsyncErrorRethrownOnCheckpoint() throws Throwable { final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>( - FakeStandardProducerConfig.get(), null); + FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null); OneInputStreamOperatorTestHarness<String, Object> testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producer)); @@ -186,7 +190,7 @@ public class FlinkKafkaProducerBaseTest { @Test(timeout=5000) public void testAsyncErrorRethrownOnCheckpointAfterFlush() throws Throwable { final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>( - FakeStandardProducerConfig.get(), null); + FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null); producer.setFlushOnCheckpoint(true); final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer(); @@ -239,7 +243,7 @@ public class FlinkKafkaProducerBaseTest { @Test(timeout=10000) public void testAtLeastOnceProducer() throws Throwable { final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>( - FakeStandardProducerConfig.get(), null); + FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null); producer.setFlushOnCheckpoint(true); final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer(); @@ -299,7 +303,7 @@ public class FlinkKafkaProducerBaseTest { @Test(timeout=5000) public void testDoesNotWaitForPendingRecordsIfFlushingDisabled() throws Throwable { final DummyFlinkKafkaProducer<String> producer = new DummyFlinkKafkaProducer<>( - FakeStandardProducerConfig.get(), null); + FakeStandardProducerConfig.get(), new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null); producer.setFlushOnCheckpoint(false); final KafkaProducer<?, ?> mockProducer = producer.getMockKafkaProducer(); @@ -333,9 +337,9 @@ public class FlinkKafkaProducerBaseTest { private boolean isFlushed; @SuppressWarnings("unchecked") - DummyFlinkKafkaProducer(Properties producerConfig, KafkaPartitioner partitioner) { + DummyFlinkKafkaProducer(Properties producerConfig, KeyedSerializationSchema<T> schema, FlinkKafkaPartitioner partitioner) { - super(DUMMY_TOPIC, (KeyedSerializationSchema<T>) mock(KeyedSerializationSchema.class), producerConfig, partitioner); + super(DUMMY_TOPIC, schema, producerConfig, partitioner); this.mockProducer = mock(KafkaProducer.class); when(mockProducer.send(any(ProducerRecord.class), any(Callback.class))).thenAnswer(new Answer<Object>() { http://git-wip-us.apache.org/repos/asf/flink/blob/d3b58709/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 203d814..ac278fb 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -1291,55 +1291,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { } } - private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>, - KeyedSerializationSchema<Tuple3<Integer, Integer, String>> { - - private final TypeSerializer<Tuple2<Integer, Integer>> ts; - - public Tuple2WithTopicSchema(ExecutionConfig ec) { - ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec); - } - - @Override - public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { - DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); - Tuple2<Integer, Integer> t2 = ts.deserialize(in); - return new Tuple3<>(t2.f0, t2.f1, topic); - } - - @Override - public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) { - return false; - } - - @Override - public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() { - return TypeInfoParser.parse("Tuple3<Integer, Integer, String>"); - } - - @Override - public byte[] serializeKey(Tuple3<Integer, Integer, String> element) { - return null; - } - - @Override - public byte[] serializeValue(Tuple3<Integer, Integer, String> element) { - ByteArrayOutputStream by = new ByteArrayOutputStream(); - DataOutputView out = new DataOutputViewStreamWrapper(by); - try { - ts.serialize(new Tuple2<>(element.f0, element.f1), out); - } catch (IOException e) { - throw new RuntimeException("Error" ,e); - } - return by.toByteArray(); - } - - @Override - public String getTargetTopic(Tuple3<Integer, Integer, String> element) { - return element.f2; - } - } - /** * Test Flink's Kafka integration also with very big records (30MB) * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message @@ -2276,4 +2227,53 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { this.numElementsTotal = state.get(0); } } + + private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>, + KeyedSerializationSchema<Tuple3<Integer, Integer, String>> { + + private final TypeSerializer<Tuple2<Integer, Integer>> ts; + + public Tuple2WithTopicSchema(ExecutionConfig ec) { + ts = TypeInfoParser.<Tuple2<Integer, Integer>>parse("Tuple2<Integer, Integer>").createSerializer(ec); + } + + @Override + public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); + Tuple2<Integer, Integer> t2 = ts.deserialize(in); + return new Tuple3<>(t2.f0, t2.f1, topic); + } + + @Override + public boolean isEndOfStream(Tuple3<Integer, Integer, String> nextElement) { + return false; + } + + @Override + public TypeInformation<Tuple3<Integer, Integer, String>> getProducedType() { + return TypeInfoParser.parse("Tuple3<Integer, Integer, String>"); + } + + @Override + public byte[] serializeKey(Tuple3<Integer, Integer, String> element) { + return null; + } + + @Override + public byte[] serializeValue(Tuple3<Integer, Integer, String> element) { + ByteArrayOutputStream by = new ByteArrayOutputStream(); + DataOutputView out = new DataOutputViewStreamWrapper(by); + try { + ts.serialize(new Tuple2<>(element.f0, element.f1), out); + } catch (IOException e) { + throw new RuntimeException("Error" ,e); + } + return by.toByteArray(); + } + + @Override + public String getTargetTopic(Tuple3<Integer, Integer, String> element) { + return element.f2; + } + } }
