This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5494fe68d867694a1ce14eb613a087b99fcd6509
Author: Weijie Guo <[email protected]>
AuthorDate: Mon Jul 18 18:08:35 2022 +0800

    [hotfix] Introduce BufferIndexAndChannel and make HsSpillingStrategy using 
it instead of BufferWithIdentity.
---
 .../partition/hybrid/BufferIndexAndChannel.java    | 39 ++++++++++++++++++++++
 .../partition/hybrid/HsFullSpillingStrategy.java   | 14 ++++----
 .../hybrid/HsSelectiveSpillingStrategy.java        |  6 ++--
 .../partition/hybrid/HsSpillingInfoProvider.java   |  2 +-
 .../partition/hybrid/HsSpillingStrategy.java       | 25 +++++++-------
 .../partition/hybrid/HsSpillingStrategyUtils.java  | 28 ++++++++--------
 .../hybrid/HsFullSpillingStrategyTest.java         | 23 ++++++-------
 .../hybrid/HsSelectiveSpillingStrategyTest.java    | 23 ++++++-------
 .../hybrid/HsSpillingStrategyTestUtils.java        | 23 +++++--------
 .../hybrid/HsSpillingStrategyUtilsTest.java        | 24 ++++++-------
 .../hybrid/TestingSpillingInfoProvider.java        | 14 ++++----
 11 files changed, 127 insertions(+), 94 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/BufferIndexAndChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/BufferIndexAndChannel.java
new file mode 100644
index 00000000000..3d76244f0cd
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/BufferIndexAndChannel.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+/** Integrate the buffer index and the channel id which it belongs. */
+public class BufferIndexAndChannel {
+    private final int bufferIndex;
+
+    private final int channel;
+
+    public BufferIndexAndChannel(int bufferIndex, int channel) {
+        this.bufferIndex = bufferIndex;
+        this.channel = channel;
+    }
+
+    public int getBufferIndex() {
+        return bufferIndex;
+    }
+
+    public int getChannel() {
+        return channel;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
index bfddc73dc45..3c4d58369e0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java
@@ -57,7 +57,7 @@ public class HsFullSpillingStrategy implements 
HsSpillingStrategy {
 
     // For the case of buffer consumed, there is no need to take action for 
HsFullSpillingStrategy.
     @Override
-    public Optional<Decision> onBufferConsumed(BufferWithIdentity 
consumedBuffer) {
+    public Optional<Decision> onBufferConsumed(BufferIndexAndChannel 
consumedBuffer) {
         return Optional.of(Decision.NO_ACTION);
     }
 
@@ -85,7 +85,7 @@ public class HsFullSpillingStrategy implements 
HsSpillingStrategy {
             return;
         }
         // Spill all not spill buffers.
-        List<BufferWithIdentity> unSpillBuffers = new ArrayList<>();
+        List<BufferIndexAndChannel> unSpillBuffers = new ArrayList<>();
         for (int i = 0; i < spillingInfoProvider.getNumSubpartitions(); i++) {
             unSpillBuffers.addAll(
                     spillingInfoProvider.getBuffersInOrder(
@@ -105,13 +105,13 @@ public class HsFullSpillingStrategy implements 
HsSpillingStrategy {
         int releaseNum = (int) (spillingInfoProvider.getPoolSize() * 
releaseBufferRatio);
 
         // first, release all consumed buffers
-        TreeMap<Integer, Deque<BufferWithIdentity>> consumedBuffersToRelease = 
new TreeMap<>();
+        TreeMap<Integer, Deque<BufferIndexAndChannel>> 
consumedBuffersToRelease = new TreeMap<>();
         int numConsumedBuffers = 0;
         for (int subpartitionId = 0;
                 subpartitionId < spillingInfoProvider.getNumSubpartitions();
                 subpartitionId++) {
 
-            Deque<BufferWithIdentity> consumedSpillSubpartitionBuffers =
+            Deque<BufferIndexAndChannel> consumedSpillSubpartitionBuffers =
                     spillingInfoProvider.getBuffersInOrder(
                             subpartitionId, SpillStatus.SPILL, 
ConsumeStatus.CONSUMED);
             numConsumedBuffers += consumedSpillSubpartitionBuffers.size();
@@ -119,9 +119,9 @@ public class HsFullSpillingStrategy implements 
HsSpillingStrategy {
         }
 
         // make up the releaseNum with unconsumed buffers, if needed, w.r.t. 
the consuming priority
-        TreeMap<Integer, List<BufferWithIdentity>> unconsumedBufferToRelease = 
new TreeMap<>();
+        TreeMap<Integer, List<BufferIndexAndChannel>> 
unconsumedBufferToRelease = new TreeMap<>();
         if (releaseNum > numConsumedBuffers) {
-            TreeMap<Integer, Deque<BufferWithIdentity>> unconsumedBuffers = 
new TreeMap<>();
+            TreeMap<Integer, Deque<BufferIndexAndChannel>> unconsumedBuffers = 
new TreeMap<>();
             for (int subpartitionId = 0;
                     subpartitionId < 
spillingInfoProvider.getNumSubpartitions();
                     subpartitionId++) {
@@ -138,7 +138,7 @@ public class HsFullSpillingStrategy implements 
HsSpillingStrategy {
         }
 
         // collect results in order
-        List<BufferWithIdentity> toRelease = new ArrayList<>();
+        List<BufferIndexAndChannel> toRelease = new ArrayList<>();
         for (int i = 0; i < spillingInfoProvider.getNumSubpartitions(); i++) {
             toRelease.addAll(consumedBuffersToRelease.getOrDefault(i, new 
ArrayDeque<>()));
             toRelease.addAll(unconsumedBufferToRelease.getOrDefault(i, new 
ArrayList<>()));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
index 6b8f0b2a1d5..cd8c6edb8ae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategy.java
@@ -52,7 +52,7 @@ public class HsSelectiveSpillingStrategy implements 
HsSpillingStrategy {
     // For the case of buffer consumed, this buffer need release. The control 
of the buffer is taken
     // over by the downstream task.
     @Override
-    public Optional<Decision> onBufferConsumed(BufferWithIdentity 
consumedBuffer) {
+    public Optional<Decision> onBufferConsumed(BufferIndexAndChannel 
consumedBuffer) {
         return 
Optional.of(Decision.builder().addBufferToRelease(consumedBuffer).build());
     }
 
@@ -80,7 +80,7 @@ public class HsSelectiveSpillingStrategy implements 
HsSpillingStrategy {
 
         int spillNum = (int) (spillingInfoProvider.getPoolSize() * 
spillBufferRatio);
 
-        TreeMap<Integer, Deque<BufferWithIdentity>> subpartitionToBuffers = 
new TreeMap<>();
+        TreeMap<Integer, Deque<BufferIndexAndChannel>> subpartitionToBuffers = 
new TreeMap<>();
         for (int channel = 0; channel < 
spillingInfoProvider.getNumSubpartitions(); channel++) {
             subpartitionToBuffers.put(
                     channel,
@@ -88,7 +88,7 @@ public class HsSelectiveSpillingStrategy implements 
HsSpillingStrategy {
                             channel, SpillStatus.NOT_SPILL, 
ConsumeStatus.NOT_CONSUMED));
         }
 
-        TreeMap<Integer, List<BufferWithIdentity>> 
subpartitionToHighPriorityBuffers =
+        TreeMap<Integer, List<BufferIndexAndChannel>> 
subpartitionToHighPriorityBuffers =
                 getBuffersByConsumptionPriorityInOrder(
                         spillingInfoProvider.getNextBufferIndexToConsume(),
                         subpartitionToBuffers,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java
index 01d2dfcee04..6168c637de4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingInfoProvider.java
@@ -48,7 +48,7 @@ public interface HsSpillingInfoProvider {
      *     according to bufferIndex from small to large, in other words, head 
is the buffer with the
      *     minimum bufferIndex in the current subpartition.
      */
-    Deque<BufferWithIdentity> getBuffersInOrder(
+    Deque<BufferIndexAndChannel> getBuffersInOrder(
             int subpartitionId, SpillStatus spillStatus, ConsumeStatus 
consumeStatus);
 
     /** Get total number of not decided to spill buffers. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
index 16f75d0cd0b..01041aaed26 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategy.java
@@ -57,7 +57,7 @@ public interface HsSpillingStrategy {
      * @return A {@link Decision} based on the provided information, or {@link 
Optional#empty()} if
      *     the decision cannot be made, which indicates global information is 
needed.
      */
-    Optional<Decision> onBufferConsumed(BufferWithIdentity consumedBuffer);
+    Optional<Decision> onBufferConsumed(BufferIndexAndChannel consumedBuffer);
 
     /**
      * Make a decision based on global information. Because this method will 
directly touch the
@@ -74,25 +74,26 @@ public interface HsSpillingStrategy {
      */
     class Decision {
         /** A collection of buffer that needs to be spilled to disk. */
-        private final List<BufferWithIdentity> bufferToSpill;
+        private final List<BufferIndexAndChannel> bufferToSpill;
 
         /** A collection of buffer that needs to be released. */
-        private final List<BufferWithIdentity> bufferToRelease;
+        private final List<BufferIndexAndChannel> bufferToRelease;
 
         public static final Decision NO_ACTION =
                 new Decision(Collections.emptyList(), Collections.emptyList());
 
         private Decision(
-                List<BufferWithIdentity> bufferToSpill, 
List<BufferWithIdentity> bufferToRelease) {
+                List<BufferIndexAndChannel> bufferToSpill,
+                List<BufferIndexAndChannel> bufferToRelease) {
             this.bufferToSpill = bufferToSpill;
             this.bufferToRelease = bufferToRelease;
         }
 
-        public List<BufferWithIdentity> getBufferToSpill() {
+        public List<BufferIndexAndChannel> getBufferToSpill() {
             return bufferToSpill;
         }
 
-        public List<BufferWithIdentity> getBufferToRelease() {
+        public List<BufferIndexAndChannel> getBufferToRelease() {
             return bufferToRelease;
         }
 
@@ -103,29 +104,29 @@ public interface HsSpillingStrategy {
         /** Builder for {@link Decision}. */
         static class Builder {
             /** A collection of buffer that needs to be spilled to disk. */
-            private final List<BufferWithIdentity> bufferToSpill = new 
ArrayList<>();
+            private final List<BufferIndexAndChannel> bufferToSpill = new 
ArrayList<>();
 
             /** A collection of buffer that needs to be released. */
-            private final List<BufferWithIdentity> bufferToRelease = new 
ArrayList<>();
+            private final List<BufferIndexAndChannel> bufferToRelease = new 
ArrayList<>();
 
             private Builder() {}
 
-            public Builder addBufferToSpill(BufferWithIdentity buffer) {
+            public Builder addBufferToSpill(BufferIndexAndChannel buffer) {
                 bufferToSpill.add(buffer);
                 return this;
             }
 
-            public Builder addBufferToSpill(List<BufferWithIdentity> buffers) {
+            public Builder addBufferToSpill(List<BufferIndexAndChannel> 
buffers) {
                 bufferToSpill.addAll(buffers);
                 return this;
             }
 
-            public Builder addBufferToRelease(BufferWithIdentity buffer) {
+            public Builder addBufferToRelease(BufferIndexAndChannel buffer) {
                 bufferToRelease.add(buffer);
                 return this;
             }
 
-            public Builder addBufferToRelease(List<BufferWithIdentity> 
buffers) {
+            public Builder addBufferToRelease(List<BufferIndexAndChannel> 
buffers) {
                 bufferToRelease.addAll(buffers);
                 return this;
             }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java
index b5283aeeb79..5dd11719f57 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtils.java
@@ -45,10 +45,11 @@ public class HsSpillingStrategyUtils {
      * @return mapping for subpartitionId to buffers, the value of map entry 
must be order by
      *     bufferIndex ascending.
      */
-    public static TreeMap<Integer, List<BufferWithIdentity>> 
getBuffersByConsumptionPriorityInOrder(
-            List<Integer> nextBufferIndexToConsume,
-            TreeMap<Integer, Deque<BufferWithIdentity>> 
subpartitionToAllBuffers,
-            int expectedSize) {
+    public static TreeMap<Integer, List<BufferIndexAndChannel>>
+            getBuffersByConsumptionPriorityInOrder(
+                    List<Integer> nextBufferIndexToConsume,
+                    TreeMap<Integer, Deque<BufferIndexAndChannel>> 
subpartitionToAllBuffers,
+                    int expectedSize) {
         if (expectedSize <= 0) {
             return new TreeMap<>();
         }
@@ -63,17 +64,17 @@ public class HsSpillingStrategyUtils {
                     }
                 });
 
-        TreeMap<Integer, List<BufferWithIdentity>> 
subpartitionToHighPriorityBuffers =
+        TreeMap<Integer, List<BufferIndexAndChannel>> 
subpartitionToHighPriorityBuffers =
                 new TreeMap<>();
         for (int i = 0; i < expectedSize; i++) {
             if (heap.isEmpty()) {
                 break;
             }
             BufferConsumptionPriorityIterator 
bufferConsumptionPriorityIterator = heap.poll();
-            BufferWithIdentity bufferWithIdentity = 
bufferConsumptionPriorityIterator.next();
+            BufferIndexAndChannel bufferIndexAndChannel = 
bufferConsumptionPriorityIterator.next();
             subpartitionToHighPriorityBuffers
-                    .computeIfAbsent(bufferWithIdentity.getChannelIndex(), 
ArrayList::new)
-                    .add(bufferWithIdentity);
+                    .computeIfAbsent(bufferIndexAndChannel.getChannel(), 
ArrayList::new)
+                    .add(bufferIndexAndChannel);
             // if this iterator has next, re-added it.
             if (bufferConsumptionPriorityIterator.hasNext()) {
                 heap.add(bufferConsumptionPriorityIterator);
@@ -89,24 +90,25 @@ public class HsSpillingStrategyUtils {
 
     /**
      * Special {@link Iterator} for hybrid shuffle mode that wrapped a deque 
of {@link
-     * BufferWithIdentity}. Tow iterator can compare by compute consumption 
priority of peek
+     * BufferIndexAndChannel}. Tow iterator can compare by compute consumption 
priority of peek
      * element.
      */
     private static class BufferConsumptionPriorityIterator
-            implements Comparable<BufferConsumptionPriorityIterator>, 
Iterator<BufferWithIdentity> {
+            implements Comparable<BufferConsumptionPriorityIterator>,
+                    Iterator<BufferIndexAndChannel> {
 
         private final int consumptionProgress;
 
-        private final PeekingIterator<BufferWithIdentity> bufferIterator;
+        private final PeekingIterator<BufferIndexAndChannel> bufferIterator;
 
         public BufferConsumptionPriorityIterator(
-                Deque<BufferWithIdentity> bufferQueue, int 
consumptionProgress) {
+                Deque<BufferIndexAndChannel> bufferQueue, int 
consumptionProgress) {
             this.consumptionProgress = consumptionProgress;
             this.bufferIterator = 
Iterators.peekingIterator(bufferQueue.descendingIterator());
         }
 
         // move the iterator to next item.
-        public BufferWithIdentity next() {
+        public BufferIndexAndChannel next() {
             return bufferIterator.next();
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
index d39ad315a05..eee3ecb3165 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java
@@ -27,8 +27,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 
-import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBuffer;
-import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferWithIdentitiesList;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferIndexAndChannelsList;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link HsFullSpillingStrategy}. */
@@ -67,9 +66,9 @@ class HsFullSpillingStrategyTest {
 
     @Test
     void testOnBufferConsumed() {
-        BufferWithIdentity bufferWithIdentity = new 
BufferWithIdentity(createBuffer(), 0, 0);
+        BufferIndexAndChannel bufferIndexAndChannel = new 
BufferIndexAndChannel(0, 0);
         Optional<Decision> bufferConsumedDecision =
-                spillStrategy.onBufferConsumed(bufferWithIdentity);
+                spillStrategy.onBufferConsumed(bufferIndexAndChannel);
         assertThat(bufferConsumedDecision).hasValue(Decision.NO_ACTION);
     }
 
@@ -96,16 +95,16 @@ class HsFullSpillingStrategyTest {
         final int progress1 = 10;
         final int progress2 = 20;
 
-        List<BufferWithIdentity> subpartitionBuffers1 =
-                createBufferWithIdentitiesList(
+        List<BufferIndexAndChannel> subpartitionBuffers1 =
+                createBufferIndexAndChannelsList(
                         subpartition1,
                         progress1,
                         progress1 + 2,
                         progress1 + 4,
                         progress1 + 6,
                         progress1 + 8);
-        List<BufferWithIdentity> subpartitionBuffers2 =
-                createBufferWithIdentitiesList(
+        List<BufferIndexAndChannel> subpartitionBuffers2 =
+                createBufferIndexAndChannelsList(
                         subpartition2,
                         progress2 + 1,
                         progress2 + 3,
@@ -132,13 +131,13 @@ class HsFullSpillingStrategyTest {
         Decision decision = 
spillStrategy.decideActionWithGlobalInfo(spillInfoProvider);
 
         // all not spilled buffers need to spill.
-        ArrayList<BufferWithIdentity> expectedSpillBuffers =
+        ArrayList<BufferIndexAndChannel> expectedSpillBuffers =
                 new ArrayList<>(subpartitionBuffers1.subList(4, 5));
         expectedSpillBuffers.add(subpartitionBuffers2.get(0));
         expectedSpillBuffers.addAll(subpartitionBuffers2.subList(4, 5));
         
assertThat(decision.getBufferToSpill()).isEqualTo(expectedSpillBuffers);
 
-        ArrayList<BufferWithIdentity> expectedReleaseBuffers = new 
ArrayList<>();
+        ArrayList<BufferIndexAndChannel> expectedReleaseBuffers = new 
ArrayList<>();
         // all consumed spill buffers should release.
         expectedReleaseBuffers.addAll(subpartitionBuffers1.subList(0, 2));
         // priority higher buffers should release.
@@ -154,8 +153,8 @@ class HsFullSpillingStrategyTest {
     @Test
     void testDecideActionWithGlobalInfoAllConsumedSpillBufferShouldRelease() {
         final int subpartitionId = 0;
-        List<BufferWithIdentity> subpartitionBuffers =
-                createBufferWithIdentitiesList(subpartitionId, 0, 1, 2, 3, 4);
+        List<BufferIndexAndChannel> subpartitionBuffers =
+                createBufferIndexAndChannelsList(subpartitionId, 0, 1, 2, 3, 
4);
 
         final int poolSize = 5;
         TestingSpillingInfoProvider spillInfoProvider =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
index fd5548d641a..eee7be9f409 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSelectiveSpillingStrategyTest.java
@@ -28,8 +28,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
-import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBuffer;
-import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferWithIdentitiesList;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferIndexAndChannelsList;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link HsSelectiveSpillingStrategy}. */
@@ -55,15 +54,15 @@ class HsSelectiveSpillingStrategyTest {
 
     @Test
     void testOnBufferConsumed() {
-        BufferWithIdentity bufferWithIdentity = new 
BufferWithIdentity(createBuffer(), 0, 0);
-        Optional<Decision> consumedDecision = 
spillStrategy.onBufferConsumed(bufferWithIdentity);
+        BufferIndexAndChannel bufferIndexAndChannel = new 
BufferIndexAndChannel(0, 0);
+        Optional<Decision> consumedDecision = 
spillStrategy.onBufferConsumed(bufferIndexAndChannel);
         assertThat(consumedDecision)
                 .hasValueSatisfying(
                         (decision -> {
                             assertThat(decision.getBufferToRelease())
                                     .hasSize(1)
                                     .element(0)
-                                    .isEqualTo(bufferWithIdentity);
+                                    .isEqualTo(bufferIndexAndChannel);
                             assertThat(decision.getBufferToSpill()).isEmpty();
                         }));
     }
@@ -87,14 +86,14 @@ class HsSelectiveSpillingStrategyTest {
         final int progress2 = 20;
         final int progress3 = 30;
 
-        List<BufferWithIdentity> subpartitionBuffer1 =
-                createBufferWithIdentitiesList(
+        List<BufferIndexAndChannel> subpartitionBuffer1 =
+                createBufferIndexAndChannelsList(
                         subpartition1, progress1 + 0, progress1 + 3, progress1 
+ 6, progress1 + 9);
-        List<BufferWithIdentity> subpartitionBuffer2 =
-                createBufferWithIdentitiesList(
+        List<BufferIndexAndChannel> subpartitionBuffer2 =
+                createBufferIndexAndChannelsList(
                         subpartition2, progress2 + 1, progress2 + 4, progress2 
+ 7);
-        List<BufferWithIdentity> subpartitionBuffer3 =
-                createBufferWithIdentitiesList(
+        List<BufferIndexAndChannel> subpartitionBuffer3 =
+                createBufferIndexAndChannelsList(
                         subpartition3, progress3 + 2, progress3 + 5, progress3 
+ 8);
 
         final int bufferPoolSize = 10;
@@ -121,7 +120,7 @@ class HsSelectiveSpillingStrategyTest {
         // progress1 + 9 has the highest priority, but it cannot be decided to 
spill, as its
         // spillStatus is SPILL. expected buffer's index : progress1 + 6, 
progress2 + 7, progress3 +
         // 8
-        List<BufferWithIdentity> expectedBuffers = new ArrayList<>();
+        List<BufferIndexAndChannel> expectedBuffers = new ArrayList<>();
         expectedBuffers.addAll(subpartitionBuffer1.subList(2, 3));
         expectedBuffers.addAll(subpartitionBuffer2.subList(2, 3));
         expectedBuffers.addAll(subpartitionBuffer3.subList(2, 3));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyTestUtils.java
index c95646ac10c..e959c150f63 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyTestUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.io.network.partition.hybrid;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 
 import java.util.ArrayDeque;
@@ -32,30 +31,24 @@ import java.util.List;
 public class HsSpillingStrategyTestUtils {
     public static final int MEMORY_SEGMENT_SIZE = 128;
 
-    public static List<BufferWithIdentity> createBufferWithIdentitiesList(
+    public static List<BufferIndexAndChannel> createBufferIndexAndChannelsList(
             int subpartitionId, int... bufferIndexes) {
-        List<BufferWithIdentity> bufferWithIdentityList = new ArrayList<>();
+        List<BufferIndexAndChannel> bufferIndexAndChannels = new ArrayList<>();
         for (int bufferIndex : bufferIndexes) {
             MemorySegment segment =
                     
MemorySegmentFactory.allocateUnpooledSegment(MEMORY_SEGMENT_SIZE);
             NetworkBuffer buffer = new NetworkBuffer(segment, (ignore) -> {});
-            bufferWithIdentityList.add(new BufferWithIdentity(buffer, 
bufferIndex, subpartitionId));
+            bufferIndexAndChannels.add(new BufferIndexAndChannel(bufferIndex, 
subpartitionId));
         }
-        return bufferWithIdentityList;
+        return bufferIndexAndChannels;
     }
 
-    public static Deque<BufferWithIdentity> createBufferWithIdentitiesDeque(
+    public static Deque<BufferIndexAndChannel> 
createBufferIndexAndChannelsDeque(
             int subpartitionId, int... bufferIndexes) {
-        Deque<BufferWithIdentity> bufferWithIdentityList = new ArrayDeque<>();
+        Deque<BufferIndexAndChannel> bufferIndexAndChannels = new 
ArrayDeque<>();
         for (int bufferIndex : bufferIndexes) {
-            Buffer buffer = createBuffer();
-            bufferWithIdentityList.add(new BufferWithIdentity(buffer, 
bufferIndex, subpartitionId));
+            bufferIndexAndChannels.add(new BufferIndexAndChannel(bufferIndex, 
subpartitionId));
         }
-        return bufferWithIdentityList;
-    }
-
-    public static Buffer createBuffer() {
-        MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(MEMORY_SEGMENT_SIZE);
-        return new NetworkBuffer(segment, (ignore) -> {});
+        return bufferIndexAndChannels;
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtilsTest.java
index 721699ff8a4..98d3ab7907f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSpillingStrategyUtilsTest.java
@@ -26,18 +26,18 @@ import java.util.Deque;
 import java.util.List;
 import java.util.TreeMap;
 
-import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferWithIdentitiesDeque;
-import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferWithIdentitiesList;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferIndexAndChannelsDeque;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategyTestUtils.createBufferIndexAndChannelsList;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link HsSpillingStrategyUtils}. */
 class HsSpillingStrategyUtilsTest {
     @Test
     void testGetBuffersByConsumptionPriorityInOrderEmptyExpectedSize() {
-        TreeMap<Integer, Deque<BufferWithIdentity>> subpartitionToAllBuffers = 
new TreeMap<>();
-        subpartitionToAllBuffers.put(0, createBufferWithIdentitiesDeque(0, 0, 
1));
-        subpartitionToAllBuffers.put(1, createBufferWithIdentitiesDeque(1, 2, 
4));
-        TreeMap<Integer, List<BufferWithIdentity>> 
buffersByConsumptionPriorityInOrder =
+        TreeMap<Integer, Deque<BufferIndexAndChannel>> 
subpartitionToAllBuffers = new TreeMap<>();
+        subpartitionToAllBuffers.put(0, createBufferIndexAndChannelsDeque(0, 
0, 1));
+        subpartitionToAllBuffers.put(1, createBufferIndexAndChannelsDeque(1, 
2, 4));
+        TreeMap<Integer, List<BufferIndexAndChannel>> 
buffersByConsumptionPriorityInOrder =
                 HsSpillingStrategyUtils.getBuffersByConsumptionPriorityInOrder(
                         Arrays.asList(0, 1), subpartitionToAllBuffers, 0);
         assertThat(buffersByConsumptionPriorityInOrder).isEmpty();
@@ -51,17 +51,17 @@ class HsSpillingStrategyUtilsTest {
         final int progress1 = 10;
         final int progress2 = 20;
 
-        TreeMap<Integer, Deque<BufferWithIdentity>> subpartitionBuffers = new 
TreeMap<>();
-        List<BufferWithIdentity> subpartitionBuffers1 =
-                createBufferWithIdentitiesList(
+        TreeMap<Integer, Deque<BufferIndexAndChannel>> subpartitionBuffers = 
new TreeMap<>();
+        List<BufferIndexAndChannel> subpartitionBuffers1 =
+                createBufferIndexAndChannelsList(
                         subpartition1, progress1, progress1 + 2, progress1 + 
6);
-        List<BufferWithIdentity> subpartitionBuffers2 =
-                createBufferWithIdentitiesList(
+        List<BufferIndexAndChannel> subpartitionBuffers2 =
+                createBufferIndexAndChannelsList(
                         subpartition2, progress2 + 1, progress2 + 2, progress2 
+ 5);
         subpartitionBuffers.put(subpartition1, new 
ArrayDeque<>(subpartitionBuffers1));
         subpartitionBuffers.put(subpartition2, new 
ArrayDeque<>(subpartitionBuffers2));
 
-        TreeMap<Integer, List<BufferWithIdentity>> 
buffersByConsumptionPriorityInOrder =
+        TreeMap<Integer, List<BufferIndexAndChannel>> 
buffersByConsumptionPriorityInOrder =
                 HsSpillingStrategyUtils.getBuffersByConsumptionPriorityInOrder(
                         Arrays.asList(progress1, progress2), 
subpartitionBuffers, 5);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java
index 21beb5f1ebe..f8b303479f0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSpillingInfoProvider.java
@@ -42,7 +42,7 @@ public class TestingSpillingInfoProvider implements 
HsSpillingInfoProvider {
 
     private final Supplier<Integer> getNumSubpartitionsSupplier;
 
-    private final Map<Integer, List<BufferWithIdentity>> allBuffers;
+    private final Map<Integer, List<BufferIndexAndChannel>> allBuffers;
 
     private final Map<Integer, Set<Integer>> spillBufferIndexes;
 
@@ -54,7 +54,7 @@ public class TestingSpillingInfoProvider implements 
HsSpillingInfoProvider {
             Supplier<Integer> getNumTotalRequestedBuffersSupplier,
             Supplier<Integer> getPoolSizeSupplier,
             Supplier<Integer> getNumSubpartitionsSupplier,
-            Map<Integer, List<BufferWithIdentity>> allBuffers,
+            Map<Integer, List<BufferIndexAndChannel>> allBuffers,
             Map<Integer, Set<Integer>> spillBufferIndexes,
             Map<Integer, Set<Integer>> consumedBufferIndexes) {
         this.getNextBufferIndexToConsumeSupplier = 
getNextBufferIndexToConsumeSupplier;
@@ -78,11 +78,11 @@ public class TestingSpillingInfoProvider implements 
HsSpillingInfoProvider {
     }
 
     @Override
-    public Deque<BufferWithIdentity> getBuffersInOrder(
+    public Deque<BufferIndexAndChannel> getBuffersInOrder(
             int subpartitionId, SpillStatus spillStatus, ConsumeStatus 
consumeStatus) {
-        Deque<BufferWithIdentity> buffersInOrder = new ArrayDeque<>();
+        Deque<BufferIndexAndChannel> buffersInOrder = new ArrayDeque<>();
 
-        List<BufferWithIdentity> subpartitionBuffers = 
allBuffers.get(subpartitionId);
+        List<BufferIndexAndChannel> subpartitionBuffers = 
allBuffers.get(subpartitionId);
         if (subpartitionBuffers == null) {
             return buffersInOrder;
         }
@@ -159,7 +159,7 @@ public class TestingSpillingInfoProvider implements 
HsSpillingInfoProvider {
 
         private Supplier<Integer> getNumSubpartitionsSupplier = () -> 0;
 
-        private final Map<Integer, List<BufferWithIdentity>> allBuffers = new 
HashMap<>();
+        private final Map<Integer, List<BufferIndexAndChannel>> allBuffers = 
new HashMap<>();
 
         private final Map<Integer, Set<Integer>> spillBufferIndexes = new 
HashMap<>();
 
@@ -197,7 +197,7 @@ public class TestingSpillingInfoProvider implements 
HsSpillingInfoProvider {
         }
 
         public Builder addSubpartitionBuffers(
-                int subpartitionId, List<BufferWithIdentity> 
subpartitionBuffers) {
+                int subpartitionId, List<BufferIndexAndChannel> 
subpartitionBuffers) {
             allBuffers.computeIfAbsent(subpartitionId, 
ArrayList::new).addAll(subpartitionBuffers);
             return this;
         }

Reply via email to