[hotfix] [kafka connector] Replace funky loop with simple division in 
FixedPartitioner


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9637ee78
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9637ee78
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9637ee78

Branch: refs/heads/master
Commit: 9637ee78846e4df5ef328c620cc991d394056f61
Parents: 1ea5e13
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jan 27 12:20:59 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 28 13:41:38 2016 +0100

----------------------------------------------------------------------
 .../kafka/partitioner/FixedPartitioner.java     | 20 ++++++++------------
 1 file changed, 8 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9637ee78/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
index d9dcfc1..9b848e0 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
@@ -54,27 +54,23 @@ import java.io.Serializable;
 public class FixedPartitioner<T> extends KafkaPartitioner<T> implements 
Serializable {
        private static final long serialVersionUID = 1627268846962918126L;
 
-       int targetPartition = -1;
+       private int targetPartition = -1;
 
        @Override
        public void open(int parallelInstanceId, int parallelInstances, int[] 
partitions) {
-               int p = 0;
-               for (int i = 0; i < parallelInstances; i++) {
-                       if (i == parallelInstanceId) {
-                               targetPartition = partitions[p];
-                               return;
-                       }
-                       if (++p == partitions.length) {
-                               p = 0;
-                       }
+               if (parallelInstanceId < 0 || parallelInstances <= 0 || 
partitions.length == 0) {
+                       throw new IllegalArgumentException();
                }
+               
+               this.targetPartition = partitions[parallelInstanceId % 
partitions.length];
        }
 
        @Override
        public int partition(T next, byte[] serializedKey, byte[] 
serializedValue, int numPartitions) {
-               if (targetPartition == -1) {
+               if (targetPartition >= 0) {
+                       return targetPartition;
+               } else {
                        throw new RuntimeException("The partitioner has not 
been initialized properly");
                }
-               return targetPartition;
        }
 }

Reply via email to