Repository: flink
Updated Branches:
  refs/heads/master 3229dc07a -> 45b770b51


[FLINK-5128] [kafka] Get Kafka partitions in FlinkKafkaProducer only if a 
partitioner is set

This closes #2893.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/45b770b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/45b770b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/45b770b5

Branch: refs/heads/master
Commit: 45b770b517de509f4d8c058d57ae0e3e34f6a9dd
Parents: 3229dc0
Author: renkai <[email protected]>
Authored: Tue Nov 29 14:26:00 2016 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Fri Dec 2 23:09:58 2016 +0800

----------------------------------------------------------------------
 .../kafka/FlinkKafkaProducerBase.java           | 32 ++++++++++----------
 1 file changed, 16 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/45b770b5/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
index d413f1c..679b731 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.java
@@ -212,24 +212,24 @@ public abstract class FlinkKafkaProducerBase<IN> extends 
RichSinkFunction<IN> im
        public void open(Configuration configuration) {
                producer = getKafkaProducer(this.producerConfig);
 
-               // the fetched list is immutable, so we're creating a mutable 
copy in order to sort it
-               List<PartitionInfo> partitionsList = new 
ArrayList<>(producer.partitionsFor(defaultTopicId));
-
-               // sort the partitions by partition id to make sure the fetched 
partition list is the same across subtasks
-               Collections.sort(partitionsList, new 
Comparator<PartitionInfo>() {
-                       @Override
-                       public int compare(PartitionInfo o1, PartitionInfo o2) {
-                               return Integer.compare(o1.partition(), 
o2.partition());
-                       }
-               });
-
-               partitions = new int[partitionsList.size()];
-               for (int i = 0; i < partitions.length; i++) {
-                       partitions[i] = partitionsList.get(i).partition();
-               }
-
                RuntimeContext ctx = getRuntimeContext();
                if (partitioner != null) {
+                       // the fetched list is immutable, so we're creating a 
mutable copy in order to sort it
+                       List<PartitionInfo> partitionsList = new 
ArrayList<>(producer.partitionsFor(defaultTopicId));
+
+                       // sort the partitions by partition id to make sure the 
fetched partition list is the same across subtasks
+                       Collections.sort(partitionsList, new 
Comparator<PartitionInfo>() {
+                               @Override
+                               public int compare(PartitionInfo o1, 
PartitionInfo o2) {
+                                       return Integer.compare(o1.partition(), 
o2.partition());
+                               }
+                       });
+
+                       partitions = new int[partitionsList.size()];
+                       for (int i = 0; i < partitions.length; i++) {
+                               partitions[i] = 
partitionsList.get(i).partition();
+                       }
+
                        partitioner.open(ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks(), partitions);
                }
 

Reply via email to