Repository: flink Updated Branches: refs/heads/release-1.4 5f523e6ab -> 9d2861946
[FLINK-8117] [runtime] Eliminate modulo operation from round-robin partitioners This closes #5041 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f26edb88 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f26edb88 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f26edb88 Branch: refs/heads/release-1.4 Commit: f26edb88d19623d93e563be417e50df969177c6a Parents: 5f523e6 Author: Gabor Gevay <[email protected]> Authored: Mon Nov 20 15:12:17 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Nov 22 10:09:36 2017 +0100 ---------------------------------------------------------------------- .../io/network/api/writer/RoundRobinChannelSelector.java | 5 ++++- .../runtime/partitioner/CustomPartitionerWrapper.java | 2 +- .../streaming/runtime/partitioner/ForwardPartitioner.java | 2 +- .../streaming/runtime/partitioner/GlobalPartitioner.java | 2 +- .../streaming/runtime/partitioner/RebalancePartitioner.java | 7 +++++-- .../streaming/runtime/partitioner/RescalePartitioner.java | 7 +++++-- .../streaming/runtime/partitioner/ShufflePartitioner.java | 2 +- 7 files changed, 18 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f26edb88/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java index 46af5a7..c7d25e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RoundRobinChannelSelector.java @@ -46,7 +46,10 @@ public class RoundRobinChannelSelector<T extends IOReadableWritable> implements @Override public int[] selectChannels(final T record, final int numberOfOutputChannels) { - this.nextChannelToSendTo[0] = (this.nextChannelToSendTo[0] + 1) % numberOfOutputChannels; + int newChannel = ++this.nextChannelToSendTo[0]; + if (newChannel >= numberOfOutputChannels) { + this.nextChannelToSendTo[0] = 0; + } return this.nextChannelToSendTo; } http://git-wip-us.apache.org/repos/asf/flink/blob/f26edb88/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java index a51cede..f19c87d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/CustomPartitionerWrapper.java @@ -35,7 +35,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; public class CustomPartitionerWrapper<K, T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; - private int[] returnArray = new int[1]; + private final int[] returnArray = new int[1]; Partitioner<K> partitioner; KeySelector<T, K> keySelector; http://git-wip-us.apache.org/repos/asf/flink/blob/f26edb88/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java index 0ae737c..c952282 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitioner.java @@ -30,7 +30,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; public class ForwardPartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; - private int[] returnArray = new int[] {0}; + private final int[] returnArray = new int[] {0}; @Override public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) { http://git-wip-us.apache.org/repos/asf/flink/blob/f26edb88/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java index 67eaa73..69c8d00 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitioner.java @@ -30,7 +30,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; public class GlobalPartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; - private int[] returnArray = new int[] { 0 }; + private final int[] returnArray = new int[] { 0 }; @Override public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, http://git-wip-us.apache.org/repos/asf/flink/blob/f26edb88/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java index a81f973..bb88d17 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitioner.java @@ -31,12 +31,15 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; public class RebalancePartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; - private int[] returnArray = new int[] {-1}; + private final int[] returnArray = new int[] {-1}; @Override public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) { - this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels; + int newChannel = ++this.returnArray[0]; + if (newChannel >= numberOfOutputChannels) { + this.returnArray[0] = 0; + } return this.returnArray; } http://git-wip-us.apache.org/repos/asf/flink/blob/f26edb88/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java index 9061523..b9af629 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java @@ -48,11 +48,14 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; public class RescalePartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; - private int[] returnArray = new int[] {-1}; + private final int[] returnArray = new int[] {-1}; @Override public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) { - this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels; + int newChannel = ++this.returnArray[0]; + if (newChannel >= numberOfOutputChannels) { + this.returnArray[0] = 0; + } return this.returnArray; } http://git-wip-us.apache.org/repos/asf/flink/blob/f26edb88/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java index 60c3fbc..ddcbec7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitioner.java @@ -36,7 +36,7 @@ public class ShufflePartitioner<T> extends StreamPartitioner<T> { private Random random = new Random(); - private int[] returnArray = new int[1]; + private final int[] returnArray = new int[1]; @Override public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record,
