Repository: flink
Updated Branches:
  refs/heads/master c4107d4c3 -> 98241d513


[FLINK-8117] [runtime] Eliminate modulo operation from round-robin partitioners

This closes #5041


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98241d51
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98241d51
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98241d51

Branch: refs/heads/master
Commit: 98241d513befcee460a3f7af805a96c794c33ada
Parents: c4107d4
Author: Gabor Gevay <[email protected]>
Authored: Mon Nov 20 15:12:17 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Nov 22 10:01:57 2017 +0100

----------------------------------------------------------------------
 .../io/network/api/writer/RoundRobinChannelSelector.java      | 5 ++++-
 .../runtime/partitioner/CustomPartitionerWrapper.java         | 2 +-
 .../streaming/runtime/partitioner/ForwardPartitioner.java     | 2 +-
 .../streaming/runtime/partitioner/GlobalPartitioner.java      | 2 +-
 .../streaming/runtime/partitioner/RebalancePartitioner.java   | 7 +++++--
 .../streaming/runtime/partitioner/RescalePartitioner.java     | 7 +++++--
 .../streaming/runtime/partitioner/ShufflePartitioner.java     | 2 +-
 7 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/98241d51/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
index 46af5a7..c7d25e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java
@@ -46,7 +46,10 @@ public class RoundRobinChannelSelector<T extends 
IOReadableWritable> implements
        @Override
        public int[] selectChannels(final T record, final int 
numberOfOutputChannels) {
 
-               this.nextChannelToSendTo[0] = (this.nextChannelToSendTo[0] + 1) 
% numberOfOutputChannels;
+               int newChannel = ++this.nextChannelToSendTo[0];
+               if (newChannel >= numberOfOutputChannels) {
+                       this.nextChannelToSendTo[0] = 0;
+               }
 
                return this.nextChannelToSendTo;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/98241d51/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
index a51cede..f19c87d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java
@@ -35,7 +35,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> {
        private static final long serialVersionUID = 1L;
 
-       private int[] returnArray = new int[1];
+       private final int[] returnArray = new int[1];
        Partitioner<K> partitioner;
        KeySelector<T, K> keySelector;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/98241d51/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
index 0ae737c..c952282 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 public class ForwardPartitioner<T> extends StreamPartitioner<T> {
        private static final long serialVersionUID = 1L;
 
-       private int[] returnArray = new int[] {0};
+       private final int[] returnArray = new int[] {0};
 
        @Override
        public int[] selectChannels(SerializationDelegate<StreamRecord<T>> 
record, int numberOfOutputChannels) {

http://git-wip-us.apache.org/repos/asf/flink/blob/98241d51/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
index 67eaa73..69c8d00 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 public class GlobalPartitioner<T> extends StreamPartitioner<T> {
        private static final long serialVersionUID = 1L;
 
-       private int[] returnArray = new int[] { 0 };
+       private final int[] returnArray = new int[] { 0 };
 
        @Override
        public int[] selectChannels(SerializationDelegate<StreamRecord<T>> 
record,

http://git-wip-us.apache.org/repos/asf/flink/blob/98241d51/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
index a81f973..bb88d17 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java
@@ -31,12 +31,15 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 public class RebalancePartitioner<T> extends StreamPartitioner<T> {
        private static final long serialVersionUID = 1L;
 
-       private int[] returnArray = new int[] {-1};
+       private final int[] returnArray = new int[] {-1};
 
        @Override
        public int[] selectChannels(SerializationDelegate<StreamRecord<T>> 
record,
                        int numberOfOutputChannels) {
-               this.returnArray[0] = (this.returnArray[0] + 1) % 
numberOfOutputChannels;
+               int newChannel = ++this.returnArray[0];
+               if (newChannel >= numberOfOutputChannels) {
+                       this.returnArray[0] = 0;
+               }
                return this.returnArray;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/98241d51/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
index 9061523..b9af629 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java
@@ -48,11 +48,14 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 public class RescalePartitioner<T> extends StreamPartitioner<T> {
        private static final long serialVersionUID = 1L;
 
-       private int[] returnArray = new int[] {-1};
+       private final int[] returnArray = new int[] {-1};
 
        @Override
        public int[] selectChannels(SerializationDelegate<StreamRecord<T>> 
record, int numberOfOutputChannels) {
-               this.returnArray[0] = (this.returnArray[0] + 1) % 
numberOfOutputChannels;
+               int newChannel = ++this.returnArray[0];
+               if (newChannel >= numberOfOutputChannels) {
+                       this.returnArray[0] = 0;
+               }
                return this.returnArray;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/98241d51/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
index 60c3fbc..ddcbec7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java
@@ -36,7 +36,7 @@ public class ShufflePartitioner<T> extends 
StreamPartitioner<T> {
 
        private Random random = new Random();
 
-       private int[] returnArray = new int[1];
+       private final int[] returnArray = new int[1];
 
        @Override
        public int[] selectChannels(SerializationDelegate<StreamRecord<T>> 
record,

Reply via email to