This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5494fe68d867694a1ce14eb613a087b99fcd6509 Author: Weijie Guo <[email protected]> AuthorDate: Mon Jul 18 18:08:35 2022 +0800 [hotfix] Introduce BufferIndexAndChannel and make HsSpillingStrategy using it instead of BufferWithIdentity. --- .../partition/hybrid/BufferIndexAndChannel.java | 39 ++++++++++++++++++++++ .../partition/hybrid/HsFullSpillingStrategy.java | 14 ++++---- .../hybrid/HsSelectiveSpillingStrategy.java | 6 ++-- .../partition/hybrid/HsSpillingInfoProvider.java | 2 +- .../partition/hybrid/HsSpillingStrategy.java | 25 +++++++------- .../partition/hybrid/HsSpillingStrategyUtils.java | 28 ++++++++-------- .../hybrid/HsFullSpillingStrategyTest.java | 23 ++++++------- .../hybrid/HsSelectiveSpillingStrategyTest.java | 23 ++++++------- .../hybrid/HsSpillingStrategyTestUtils.java | 23 +++++-------- .../hybrid/HsSpillingStrategyUtilsTest.java | 24 ++++++------- .../hybrid/TestingSpillingInfoProvider.java | 14 ++++---- 11 files changed, 127 insertions(+), 94 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/BufferIndexAndChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/BufferIndexAndChannel.java new file mode 100644 index 00000000000..3d76244f0cd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/BufferIndexAndChannel.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid; + +/** Integrate the buffer index and the channel id which it belongs. */ +public class BufferIndexAndChannel { + private final int bufferIndex; + + private final int channel; + + public BufferIndexAndChannel(int bufferIndex, int channel) { + this.bufferIndex = bufferIndex; + this.channel = channel; + } + + public int getBufferIndex() { + return bufferIndex; + } + + public int getChannel() { + return channel; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java index bfddc73dc45..3c4d58369e0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java @@ -57,7 +57,7 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy { // For the case of buffer consumed, there is no need to take action for HsFullSpillingStrategy. @Override - public Optional<Decision> onBufferConsumed(BufferWithIdentity consumedBuffer) { + public Optional<Decision> onBufferConsumed(BufferIndexAndChannel consumedBuffer) { return Optional.of(Decision.NO_ACTION); } @@ -85,7 +85,7 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy { return; } // Spill all not spill buffers. - List<BufferWithIdentity> unSpillBuffers = new ArrayList<>(); + List<BufferIndexAndChannel> unSpillBuffers = new ArrayList<>(); for (int i = 0; i < spillingInfoProvider.getNumSubpartitions(); i++) { unSpillBuffers.addAll( spillingInfoProvider.getBuffersInOrder( @@ -105,13 +105,13 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy { int releaseNum = (int) (spillingInfoProvider.getPoolSize() * releaseBufferRatio); // first, release all consumed buffers - TreeMap<Integer, Deque<BufferWithIdentity>> consumedBuffersToRelease = new TreeMap<>(); + TreeMap<Integer, Deque<BufferIndexAndChannel>> consumedBuffersToRelease = new TreeMap<>(); int numConsumedBuffers = 0; for (int subpartitionId = 0; subpartitionId < spillingInfoProvider.getNumSubpartitions(); subpartitionId++) { - Deque<BufferWithIdentity> consumedSpillSubpartitionBuffers = + Deque<BufferIndexAndChannel> consumedSpillSubpartitionBuffers = spillingInfoProvider.getBuffersInOrder( subpartitionId, SpillStatus.SPILL, ConsumeStatus.CONSUMED); numConsumedBuffers += consumedSpillSubpartitionBuffers.size(); @@ -119,9 +119,9 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy { } // make up the releaseNum with unconsumed buffers, if needed, w.r.t. the consuming priority - TreeMap<Integer, List<BufferWithIdentity>> unconsumedBufferToRelease = new TreeMap<>(); + TreeMap<Integer, List<BufferIndexAndChannel>> unconsumedBufferToRelease = new TreeMap<>(); if (releaseNum > numConsumedBuffers) { - TreeMap<Integer, Deque<BufferWithIdentity>> unconsumedBuffers = new TreeMap<>(); + TreeMap<Integer, Deque<BufferIndexAndChannel>> unconsumedBuffers = new TreeMap<>(); for (int subpartitionId = 0; subpartitionId < spillingInfoProvider.getNumSubpartitions(); subpartitionId++) { @@ -138,7 +138,7 @@ public class HsFullSpillingStrategy implements HsSpillingStrategy { } // collect results in order - List<BufferWithIdentity> toRelease = new ArrayList<>(); + List<BufferIndexAndChannel> toRelease = new ArrayList<>(); for (int i = 0; i < spillingInfoProvider.getNumSubpartitions(); i++) { toRelease.addAll(consumedBuffersToRelease.getOrDefault(i, new ArrayDeque<>())); toRelease.addAll(unconsumedBufferToRelease.getOrDefault(i, new ArrayList<>())); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java index 6b8f0b2a1d5..cd8c6edb8ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java @@ -52,7 +52,7 @@ public class HsSelectiveSpillingStrategy implements HsSpillingStrategy { // For the case of buffer consumed, this buffer need release. The control of the buffer is taken // over by the downstream task. @Override - public Optional<Decision> onBufferConsumed(BufferWithIdentity consumedBuffer) { + public Optional<Decision> onBufferConsumed(BufferIndexAndChannel consumedBuffer) { return Optional.of(Decision.builder().addBufferToRelease(consumedBuffer).build()); } @@ -80,7 +80,7 @@ public class HsSelectiveSpillingStrategy implements HsSpillingStrategy { int spillNum = (int) (spillingInfoProvider.getPoolSize() * spillBufferRatio); - TreeMap<Integer, Deque<BufferWithIdentity>> subpartitionToBuffers = new TreeMap<>(); + TreeMap<Integer, Deque<BufferIndexAndChannel>> subpartitionToBuffers = new TreeMap<>(); for (int channel = 0; channel < spillingInfoProvider.getNumSubpartitions(); channel++) { subpartitionToBuffers.put( channel, @@ -88,7 +88,7 @@ public class HsSelectiveSpillingStrategy implements HsSpillingStrategy { channel, SpillStatus.NOT_SPILL, ConsumeStatus.NOT_CONSUMED)); } - TreeMap<Integer, List<BufferWithIdentity>> subpartitionToHighPriorityBuffers = + TreeMap<Integer, List<BufferIndexAndChannel>> subpartitionToHighPriorityBuffers = getBuffersByConsumptionPriorityInOrder( spillingInfoProvider.getNextBufferIndexToConsume(), subpartitionToBuffers, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java index 01d2dfcee04..6168c637de4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java @@ -48,7 +48,7 @@ public interface HsSpillingInfoProvider { * according to bufferIndex from small to large, in other words, head is the buffer with the * minimum bufferIndex in the current subpartition. */ - Deque<BufferWithIdentity> getBuffersInOrder( + Deque<BufferIndexAndChannel> getBuffersInOrder( int subpartitionId, SpillStatus spillStatus, ConsumeStatus consumeStatus); /** Get total number of not decided to spill buffers. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java index 16f75d0cd0b..01041aaed26 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java @@ -57,7 +57,7 @@ public interface HsSpillingStrategy { * @return A {@link Decision} based on the provided information, or {@link Optional#empty()} if * the decision cannot be made, which indicates global information is needed. */ - Optional<Decision> onBufferConsumed(BufferWithIdentity consumedBuffer); + Optional<Decision> onBufferConsumed(BufferIndexAndChannel consumedBuffer); /** * Make a decision based on global information. Because this method will directly touch the @@ -74,25 +74,26 @@ public interface HsSpillingStrategy { */ class Decision { /** A collection of buffer that needs to be spilled to disk. */ - private final List<BufferWithIdentity> bufferToSpill; + private final List<BufferIndexAndChannel> bufferToSpill; /** A collection of buffer that needs to be released. */ - private final List<BufferWithIdentity> bufferToRelease; + private final List<BufferIndexAndChannel> bufferToRelease; public static final Decision NO_ACTION = new Decision(Collections.emptyList(), Collections.emptyList()); private Decision( - List<BufferWithIdentity> bufferToSpill, List<BufferWithIdentity> bufferToRelease) { + List<BufferIndexAndChannel> bufferToSpill, + List<BufferIndexAndChannel> bufferToRelease) { this.bufferToSpill = bufferToSpill; this.bufferToRelease = bufferToRelease; } - public List<BufferWithIdentity> getBufferToSpill() { + public List<BufferIndexAndChannel> getBufferToSpill() { return bufferToSpill; } - public List<BufferWithIdentity> getBufferToRelease() { + public List<BufferIndexAndChannel> getBufferToRelease() { return bufferToRelease; } @@ -103,29 +104,29 @@ public interface HsSpillingStrategy { /** Builder for {@link Decision}. */ static class Builder { /** A collection of buffer that needs to be spilled to disk. */ - private final List<BufferWithIdentity> bufferToSpill = new ArrayList<>(); + private final List<BufferIndexAndChannel> bufferToSpill = new ArrayList<>(); /** A collection of buffer that needs to be released. */ - private final List<BufferWithIdentity> bufferToRelease = new ArrayList<>(); + private final List<BufferIndexAndChannel> bufferToRelease = new ArrayList<>(); private Builder() {} - public Builder addBufferToSpill(BufferWithIdentity buffer) { + public Builder addBufferToSpill(BufferIndexAndChannel buffer) { bufferToSpill.add(buffer); return this; } - public Builder addBufferToSpill(List<BufferWithIdentity> buffers) { + public Builder addBufferToSpill(List<BufferIndexAndChannel> buffers) { bufferToSpill.addAll(buffers); return this; } - public Builder addBufferToRelease(BufferWithIdentity buffer) { + public Builder addBufferToRelease(BufferIndexAndChannel buffer) { bufferToRelease.add(buffer); return this; } - public Builder addBufferToRelease(List<BufferWithIdentity> buffers) { + public Builder addBufferToRelease(List<BufferIndexAndChannel> buffers) { bufferToRelease.addAll(buffers); return this; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java index b5283aeeb79..5dd11719f57 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java @@ -45,10 +45,11 @@ public class HsSpillingStrategyUtils { * @return mapping for subpartitionId to buffers, the value of map entry must be order by * bufferIndex ascending. */ - public static TreeMap<Integer, List<BufferWithIdentity>> getBuffersByConsumptionPriorityInOrder( - List<Integer> nextBufferIndexToConsume, - TreeMap<Integer, Deque<BufferWithIdentity>> subpartitionToAllBuffers, - int expectedSize) { + public static TreeMap<Integer, List<BufferIndexAndChannel>> + getBuffersByConsumptionPriorityInOrder( + List<Integer> nextBufferIndexToConsume, + TreeMap<Integer, Deque<BufferIndexAndChannel>> subpartitionToAllBuffers, + int expectedSize) { if (expectedSize <= 0) { return new TreeMap<>(); } @@ -63,17 +64,17 @@ public class HsSpillingStrategyUtils { } }); - TreeMap<Integer, List<BufferWithIdentity>> subpartitionToHighPriorityBuffers = + TreeMap<Integer, List<BufferIndexAndChannel>> subpartitionToHighPriorityBuffers = new TreeMap<>(); for (int i = 0; i < expectedSize; i++) { if (heap.isEmpty()) { break; } BufferConsumptionPriorityIterator bufferConsumptionPriorityIterator = heap.poll(); - BufferWithIdentity bufferWithIdentity = bufferConsumptionPriorityIterator.next(); + BufferIndexAndChannel bufferIndexAndChannel = bufferConsumptionPriorityIterator.next(); subpartitionToHighPriorityBuffers - .computeIfAbsent(bufferWithIdentity.getChannelIndex(), ArrayList::new) - .add(bufferWithIdentity); + .computeIfAbsent(bufferIndexAndChannel.getChannel(), ArrayList::new) + .add(bufferIndexAndChannel); // if this iterator has next, re-added it. if (bufferConsumptionPriorityIterator.hasNext()) { heap.add(bufferConsumptionPriorityIterator); @@ -89,24 +90,25 @@ public class HsSpillingStrategyUtils { /** * Special {@link Iterator} for hybrid shuffle mode that wrapped a deque of {@link - * BufferWithIdentity}. Tow iterator can compare by compute consumption priority of peek + * BufferIndexAndChannel}. Tow iterator can compare by compute consumption priority of peek * element. */ private static class BufferConsumptionPriorityIterator - implements Comparable<BufferConsumptionPriorityIterator>, Iterator<BufferWithIdentity> { + implements Comparable<BufferConsumptionPriorityIterator>, + Iterator<BufferIndexAndChannel> { private final int consumptionProgress; - private final PeekingIterator<BufferWithIdentity> bufferIterator; + private final PeekingIterator<BufferIndexAndChannel> bufferIterator; public BufferConsumptionPriorityIterator( - Deque<BufferWithIdentity> bufferQueue, int consumptionProgress) { + Deque<BufferIndexAndChannel> bufferQueue, int consumptionProgress) { this.consumptionProgress = consumptionProgress; this.bufferIterator = Iterators.peekingIterator(bufferQueue.descendingIterator()); } // move the iterator to next item. - public BufferWithIdentity next() { + public BufferIndexAndChannel next() { return bufferIterator.next(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java index d39ad315a05..eee3ecb3165 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java @@ -27,8 +27,7 @@ import java.util.Arrays; import java.util.List; import java.util.Optional; -import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBuffer; -import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferWithIdentitiesList; +import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferIndexAndChannelsList; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link HsFullSpillingStrategy}. */ @@ -67,9 +66,9 @@ class HsFullSpillingStrategyTest { @Test void testOnBufferConsumed() { - BufferWithIdentity bufferWithIdentity = new BufferWithIdentity(createBuffer(), 0, 0); + BufferIndexAndChannel bufferIndexAndChannel = new BufferIndexAndChannel(0, 0); Optional<Decision> bufferConsumedDecision = - spillStrategy.onBufferConsumed(bufferWithIdentity); + spillStrategy.onBufferConsumed(bufferIndexAndChannel); assertThat(bufferConsumedDecision).hasValue(Decision.NO_ACTION); } @@ -96,16 +95,16 @@ class HsFullSpillingStrategyTest { final int progress1 = 10; final int progress2 = 20; - List<BufferWithIdentity> subpartitionBuffers1 = - createBufferWithIdentitiesList( + List<BufferIndexAndChannel> subpartitionBuffers1 = + createBufferIndexAndChannelsList( subpartition1, progress1, progress1 + 2, progress1 + 4, progress1 + 6, progress1 + 8); - List<BufferWithIdentity> subpartitionBuffers2 = - createBufferWithIdentitiesList( + List<BufferIndexAndChannel> subpartitionBuffers2 = + createBufferIndexAndChannelsList( subpartition2, progress2 + 1, progress2 + 3, @@ -132,13 +131,13 @@ class HsFullSpillingStrategyTest { Decision decision = spillStrategy.decideActionWithGlobalInfo(spillInfoProvider); // all not spilled buffers need to spill. - ArrayList<BufferWithIdentity> expectedSpillBuffers = + ArrayList<BufferIndexAndChannel> expectedSpillBuffers = new ArrayList<>(subpartitionBuffers1.subList(4, 5)); expectedSpillBuffers.add(subpartitionBuffers2.get(0)); expectedSpillBuffers.addAll(subpartitionBuffers2.subList(4, 5)); assertThat(decision.getBufferToSpill()).isEqualTo(expectedSpillBuffers); - ArrayList<BufferWithIdentity> expectedReleaseBuffers = new ArrayList<>(); + ArrayList<BufferIndexAndChannel> expectedReleaseBuffers = new ArrayList<>(); // all consumed spill buffers should release. expectedReleaseBuffers.addAll(subpartitionBuffers1.subList(0, 2)); // priority higher buffers should release. @@ -154,8 +153,8 @@ class HsFullSpillingStrategyTest { @Test void testDecideActionWithGlobalInfoAllConsumedSpillBufferShouldRelease() { final int subpartitionId = 0; - List<BufferWithIdentity> subpartitionBuffers = - createBufferWithIdentitiesList(subpartitionId, 0, 1, 2, 3, 4); + List<BufferIndexAndChannel> subpartitionBuffers = + createBufferIndexAndChannelsList(subpartitionId, 0, 1, 2, 3, 4); final int poolSize = 5; TestingSpillingInfoProvider spillInfoProvider = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java index fd5548d641a..eee7be9f409 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java @@ -28,8 +28,7 @@ import java.util.Collections; import java.util.List; import java.util.Optional; -import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBuffer; -import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferWithIdentitiesList; +import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferIndexAndChannelsList; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link HsSelectiveSpillingStrategy}. */ @@ -55,15 +54,15 @@ class HsSelectiveSpillingStrategyTest { @Test void testOnBufferConsumed() { - BufferWithIdentity bufferWithIdentity = new BufferWithIdentity(createBuffer(), 0, 0); - Optional<Decision> consumedDecision = spillStrategy.onBufferConsumed(bufferWithIdentity); + BufferIndexAndChannel bufferIndexAndChannel = new BufferIndexAndChannel(0, 0); + Optional<Decision> consumedDecision = spillStrategy.onBufferConsumed(bufferIndexAndChannel); assertThat(consumedDecision) .hasValueSatisfying( (decision -> { assertThat(decision.getBufferToRelease()) .hasSize(1) .element(0) - .isEqualTo(bufferWithIdentity); + .isEqualTo(bufferIndexAndChannel); assertThat(decision.getBufferToSpill()).isEmpty(); })); } @@ -87,14 +86,14 @@ class HsSelectiveSpillingStrategyTest { final int progress2 = 20; final int progress3 = 30; - List<BufferWithIdentity> subpartitionBuffer1 = - createBufferWithIdentitiesList( + List<BufferIndexAndChannel> subpartitionBuffer1 = + createBufferIndexAndChannelsList( subpartition1, progress1 + 0, progress1 + 3, progress1 + 6, progress1 + 9); - List<BufferWithIdentity> subpartitionBuffer2 = - createBufferWithIdentitiesList( + List<BufferIndexAndChannel> subpartitionBuffer2 = + createBufferIndexAndChannelsList( subpartition2, progress2 + 1, progress2 + 4, progress2 + 7); - List<BufferWithIdentity> subpartitionBuffer3 = - createBufferWithIdentitiesList( + List<BufferIndexAndChannel> subpartitionBuffer3 = + createBufferIndexAndChannelsList( subpartition3, progress3 + 2, progress3 + 5, progress3 + 8); final int bufferPoolSize = 10; @@ -121,7 +120,7 @@ class HsSelectiveSpillingStrategyTest { // progress1 + 9 has the highest priority, but it cannot be decided to spill, as its // spillStatus is SPILL. expected buffer's index : progress1 + 6, progress2 + 7, progress3 + // 8 - List<BufferWithIdentity> expectedBuffers = new ArrayList<>(); + List<BufferIndexAndChannel> expectedBuffers = new ArrayList<>(); expectedBuffers.addAll(subpartitionBuffer1.subList(2, 3)); expectedBuffers.addAll(subpartitionBuffer2.subList(2, 3)); expectedBuffers.addAll(subpartitionBuffer3.subList(2, 3)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyTestUtils.java index c95646ac10c..e959c150f63 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyTestUtils.java @@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition.hybrid; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import java.util.ArrayDeque; @@ -32,30 +31,24 @@ import java.util.List; public class HsSpillingStrategyTestUtils { public static final int MEMORY_SEGMENT_SIZE = 128; - public static List<BufferWithIdentity> createBufferWithIdentitiesList( + public static List<BufferIndexAndChannel> createBufferIndexAndChannelsList( int subpartitionId, int... bufferIndexes) { - List<BufferWithIdentity> bufferWithIdentityList = new ArrayList<>(); + List<BufferIndexAndChannel> bufferIndexAndChannels = new ArrayList<>(); for (int bufferIndex : bufferIndexes) { MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(MEMORY_SEGMENT_SIZE); NetworkBuffer buffer = new NetworkBuffer(segment, (ignore) -> {}); - bufferWithIdentityList.add(new BufferWithIdentity(buffer, bufferIndex, subpartitionId)); + bufferIndexAndChannels.add(new BufferIndexAndChannel(bufferIndex, subpartitionId)); } - return bufferWithIdentityList; + return bufferIndexAndChannels; } - public static Deque<BufferWithIdentity> createBufferWithIdentitiesDeque( + public static Deque<BufferIndexAndChannel> createBufferIndexAndChannelsDeque( int subpartitionId, int... bufferIndexes) { - Deque<BufferWithIdentity> bufferWithIdentityList = new ArrayDeque<>(); + Deque<BufferIndexAndChannel> bufferIndexAndChannels = new ArrayDeque<>(); for (int bufferIndex : bufferIndexes) { - Buffer buffer = createBuffer(); - bufferWithIdentityList.add(new BufferWithIdentity(buffer, bufferIndex, subpartitionId)); + bufferIndexAndChannels.add(new BufferIndexAndChannel(bufferIndex, subpartitionId)); } - return bufferWithIdentityList; - } - - public static Buffer createBuffer() { - MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(MEMORY_SEGMENT_SIZE); - return new NetworkBuffer(segment, (ignore) -> {}); + return bufferIndexAndChannels; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtilsTest.java index 721699ff8a4..98d3ab7907f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtilsTest.java @@ -26,18 +26,18 @@ import java.util.Deque; import java.util.List; import java.util.TreeMap; -import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferWithIdentitiesDeque; -import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferWithIdentitiesList; +import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferIndexAndChannelsDeque; +import static org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferIndexAndChannelsList; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link HsSpillingStrategyUtils}. */ class HsSpillingStrategyUtilsTest { @Test void testGetBuffersByConsumptionPriorityInOrderEmptyExpectedSize() { - TreeMap<Integer, Deque<BufferWithIdentity>> subpartitionToAllBuffers = new TreeMap<>(); - subpartitionToAllBuffers.put(0, createBufferWithIdentitiesDeque(0, 0, 1)); - subpartitionToAllBuffers.put(1, createBufferWithIdentitiesDeque(1, 2, 4)); - TreeMap<Integer, List<BufferWithIdentity>> buffersByConsumptionPriorityInOrder = + TreeMap<Integer, Deque<BufferIndexAndChannel>> subpartitionToAllBuffers = new TreeMap<>(); + subpartitionToAllBuffers.put(0, createBufferIndexAndChannelsDeque(0, 0, 1)); + subpartitionToAllBuffers.put(1, createBufferIndexAndChannelsDeque(1, 2, 4)); + TreeMap<Integer, List<BufferIndexAndChannel>> buffersByConsumptionPriorityInOrder = HsSpillingStrategyUtils.getBuffersByConsumptionPriorityInOrder( Arrays.asList(0, 1), subpartitionToAllBuffers, 0); assertThat(buffersByConsumptionPriorityInOrder).isEmpty(); @@ -51,17 +51,17 @@ class HsSpillingStrategyUtilsTest { final int progress1 = 10; final int progress2 = 20; - TreeMap<Integer, Deque<BufferWithIdentity>> subpartitionBuffers = new TreeMap<>(); - List<BufferWithIdentity> subpartitionBuffers1 = - createBufferWithIdentitiesList( + TreeMap<Integer, Deque<BufferIndexAndChannel>> subpartitionBuffers = new TreeMap<>(); + List<BufferIndexAndChannel> subpartitionBuffers1 = + createBufferIndexAndChannelsList( subpartition1, progress1, progress1 + 2, progress1 + 6); - List<BufferWithIdentity> subpartitionBuffers2 = - createBufferWithIdentitiesList( + List<BufferIndexAndChannel> subpartitionBuffers2 = + createBufferIndexAndChannelsList( subpartition2, progress2 + 1, progress2 + 2, progress2 + 5); subpartitionBuffers.put(subpartition1, new ArrayDeque<>(subpartitionBuffers1)); subpartitionBuffers.put(subpartition2, new ArrayDeque<>(subpartitionBuffers2)); - TreeMap<Integer, List<BufferWithIdentity>> buffersByConsumptionPriorityInOrder = + TreeMap<Integer, List<BufferIndexAndChannel>> buffersByConsumptionPriorityInOrder = HsSpillingStrategyUtils.getBuffersByConsumptionPriorityInOrder( Arrays.asList(progress1, progress2), subpartitionBuffers, 5); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java index 21beb5f1ebe..f8b303479f0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java @@ -42,7 +42,7 @@ public class TestingSpillingInfoProvider implements HsSpillingInfoProvider { private final Supplier<Integer> getNumSubpartitionsSupplier; - private final Map<Integer, List<BufferWithIdentity>> allBuffers; + private final Map<Integer, List<BufferIndexAndChannel>> allBuffers; private final Map<Integer, Set<Integer>> spillBufferIndexes; @@ -54,7 +54,7 @@ public class TestingSpillingInfoProvider implements HsSpillingInfoProvider { Supplier<Integer> getNumTotalRequestedBuffersSupplier, Supplier<Integer> getPoolSizeSupplier, Supplier<Integer> getNumSubpartitionsSupplier, - Map<Integer, List<BufferWithIdentity>> allBuffers, + Map<Integer, List<BufferIndexAndChannel>> allBuffers, Map<Integer, Set<Integer>> spillBufferIndexes, Map<Integer, Set<Integer>> consumedBufferIndexes) { this.getNextBufferIndexToConsumeSupplier = getNextBufferIndexToConsumeSupplier; @@ -78,11 +78,11 @@ public class TestingSpillingInfoProvider implements HsSpillingInfoProvider { } @Override - public Deque<BufferWithIdentity> getBuffersInOrder( + public Deque<BufferIndexAndChannel> getBuffersInOrder( int subpartitionId, SpillStatus spillStatus, ConsumeStatus consumeStatus) { - Deque<BufferWithIdentity> buffersInOrder = new ArrayDeque<>(); + Deque<BufferIndexAndChannel> buffersInOrder = new ArrayDeque<>(); - List<BufferWithIdentity> subpartitionBuffers = allBuffers.get(subpartitionId); + List<BufferIndexAndChannel> subpartitionBuffers = allBuffers.get(subpartitionId); if (subpartitionBuffers == null) { return buffersInOrder; } @@ -159,7 +159,7 @@ public class TestingSpillingInfoProvider implements HsSpillingInfoProvider { private Supplier<Integer> getNumSubpartitionsSupplier = () -> 0; - private final Map<Integer, List<BufferWithIdentity>> allBuffers = new HashMap<>(); + private final Map<Integer, List<BufferIndexAndChannel>> allBuffers = new HashMap<>(); private final Map<Integer, Set<Integer>> spillBufferIndexes = new HashMap<>(); @@ -197,7 +197,7 @@ public class TestingSpillingInfoProvider implements HsSpillingInfoProvider { } public Builder addSubpartitionBuffers( - int subpartitionId, List<BufferWithIdentity> subpartitionBuffers) { + int subpartitionId, List<BufferIndexAndChannel> subpartitionBuffers) { allBuffers.computeIfAbsent(subpartitionId, ArrayList::new).addAll(subpartitionBuffers); return this; }
