[hotfix][network] initialize SingleInputGate#enqueuedInputChannelsWithData with the right size
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6597e674 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6597e674 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6597e674 Branch: refs/heads/master Commit: 6597e6747804b9c8cb9029bab28e2514917c64ff Parents: d30df34 Author: Nico Kruber <[email protected]> Authored: Wed Feb 21 16:30:53 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Tue Feb 27 09:07:15 2018 +0100 ---------------------------------------------------------------------- .../runtime/io/network/partition/consumer/SingleInputGate.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6597e674/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 04b8ee6..a1f3cdc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -144,7 +144,7 @@ public class SingleInputGate implements InputGate { * Field guaranteeing uniqueness for inputChannelsWithData queue. Both of those fields should be unified * onto one. */ - private final BitSet enqueuedInputChannelsWithData = new BitSet(); + private final BitSet enqueuedInputChannelsWithData; private final BitSet channelsWithEndOfPartitionEvents; @@ -205,6 +205,7 @@ public class SingleInputGate implements InputGate { this.inputChannels = new HashMap<>(numberOfInputChannels); this.channelsWithEndOfPartitionEvents = new BitSet(numberOfInputChannels); + this.enqueuedInputChannelsWithData = new BitSet(numberOfInputChannels); this.taskActions = checkNotNull(taskActions); }
