Repository: flink Updated Branches: refs/heads/master 3229dc07a -> 45b770b51
[FLINK-5128] [kafka] Get Kafka partitions in FlinkKafkaProducer only if a partitioner is set This closes #2893. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/45b770b5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/45b770b5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/45b770b5 Branch: refs/heads/master Commit: 45b770b517de509f4d8c058d57ae0e3e34f6a9dd Parents: 3229dc0 Author: renkai <[email protected]> Authored: Tue Nov 29 14:26:00 2016 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Fri Dec 2 23:09:58 2016 +0800 ---------------------------------------------------------------------- .../kafka/FlinkKafkaProducerBase.java | 32 ++++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/45b770b5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java index d413f1c..679b731 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java @@ -212,24 +212,24 @@ public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> im public void open(Configuration configuration) { producer = getKafkaProducer(this.producerConfig); - // the fetched list is immutable, so we're creating a mutable copy in order to sort it - List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(defaultTopicId)); - - // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks - Collections.sort(partitionsList, new Comparator<PartitionInfo>() { - @Override - public int compare(PartitionInfo o1, PartitionInfo o2) { - return Integer.compare(o1.partition(), o2.partition()); - } - }); - - partitions = new int[partitionsList.size()]; - for (int i = 0; i < partitions.length; i++) { - partitions[i] = partitionsList.get(i).partition(); - } - RuntimeContext ctx = getRuntimeContext(); if (partitioner != null) { + // the fetched list is immutable, so we're creating a mutable copy in order to sort it + List<PartitionInfo> partitionsList = new ArrayList<>(producer.partitionsFor(defaultTopicId)); + + // sort the partitions by partition id to make sure the fetched partition list is the same across subtasks + Collections.sort(partitionsList, new Comparator<PartitionInfo>() { + @Override + public int compare(PartitionInfo o1, PartitionInfo o2) { + return Integer.compare(o1.partition(), o2.partition()); + } + }); + + partitions = new int[partitionsList.size()]; + for (int i = 0; i < partitions.length; i++) { + partitions[i] = partitionsList.get(i).partition(); + } + partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions); }
