This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 819ba92994a5dcc4ced054a3530d423123a8de23
Author: Wencong Liu <[email protected]>
AuthorDate: Mon Aug 14 10:45:06 2023 +0800

    [FLINK-32770][network] Fix the inaccurate backlog number of Hybrid Shuffle
---
 .../network/partition/ResultSubpartitionView.java  |  7 ++
 .../tiered/netty/NettyConnectionWriterImpl.java    | 15 ++--
 .../hybrid/tiered/netty/NettyPayloadManager.java   | 79 ++++++++++++++++++++
 .../netty/TieredStorageNettyServiceImpl.java       | 12 ++-
 .../netty/TieredStorageResultSubpartitionView.java | 86 +++++++++++++---------
 .../tiered/netty/NettyConnectionWriterTest.java    | 16 ++--
 .../TieredStorageResultSubpartitionViewTest.java   | 48 ++++++------
 7 files changed, 179 insertions(+), 84 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index 9a4078e0799..305e50271fa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -65,6 +65,13 @@ public interface ResultSubpartitionView {
      */
     Throwable getFailureCause();
 
+    /**
+     * Get the availability and backlog of the view. The availability 
represents if the view is
+     * ready to get buffer from it. The backlog represents the number of 
available data buffers.
+     *
+     * @param numCreditsAvailable the available credits for this {@link 
ResultSubpartitionView}.
+     * @return availability and backlog.
+     */
     AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable);
 
     int unsynchronizedGetNumberOfQueuedBuffers();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterImpl.java
index d5ec2710616..24f61e4329b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterImpl.java
@@ -23,20 +23,19 @@ import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 
 import javax.annotation.Nullable;
 
-import java.util.Queue;
-
 /** The default implementation of {@link NettyConnectionWriter}. */
 public class NettyConnectionWriterImpl implements NettyConnectionWriter {
 
-    private final Queue<NettyPayload> bufferQueue;
+    private final NettyPayloadManager nettyPayloadManager;
 
     private final NettyConnectionId connectionId;
 
     private final BufferAvailabilityListener availabilityListener;
 
     public NettyConnectionWriterImpl(
-            Queue<NettyPayload> bufferQueue, BufferAvailabilityListener 
availabilityListener) {
-        this.bufferQueue = bufferQueue;
+            NettyPayloadManager nettyPayloadManager,
+            BufferAvailabilityListener availabilityListener) {
+        this.nettyPayloadManager = nettyPayloadManager;
         this.connectionId = NettyConnectionId.newId();
         this.availabilityListener = availabilityListener;
     }
@@ -53,18 +52,18 @@ public class NettyConnectionWriterImpl implements 
NettyConnectionWriter {
 
     @Override
     public int numQueuedBuffers() {
-        return bufferQueue.size();
+        return nettyPayloadManager.getSize();
     }
 
     @Override
     public void writeBuffer(NettyPayload nettyPayload) {
-        bufferQueue.add(nettyPayload);
+        nettyPayloadManager.add(nettyPayload);
     }
 
     @Override
     public void close(@Nullable Throwable error) {
         NettyPayload nettyPayload;
-        while ((nettyPayload = bufferQueue.poll()) != null) {
+        while ((nettyPayload = nettyPayloadManager.poll()) != null) {
             nettyPayload.getBuffer().ifPresent(Buffer::recycleBuffer);
         }
         if (error != null) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyPayloadManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyPayloadManager.java
new file mode 100644
index 00000000000..95e7bc77bc0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyPayloadManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.LinkedList;
+import java.util.Optional;
+import java.util.Queue;
+
+/** {@link NettyPayloadManager} is used to contain all netty payloads from a 
storage tier. */
+public class NettyPayloadManager {
+
+    private final Object lock = new Object();
+
+    private final Queue<NettyPayload> queue = new LinkedList<>();
+
+    /** Number of buffers whose {@link Buffer.DataType} is buffer in the 
queue. */
+    @GuardedBy("lock")
+    private int backlog = 0;
+
+    public void add(NettyPayload nettyPayload) {
+        synchronized (lock) {
+            queue.add(nettyPayload);
+            Optional<Buffer> buffer = nettyPayload.getBuffer();
+            if (buffer.isPresent() && buffer.get().isBuffer()) {
+                backlog++;
+            }
+        }
+    }
+
+    public NettyPayload peek() {
+        synchronized (lock) {
+            return queue.peek();
+        }
+    }
+
+    public NettyPayload poll() {
+        synchronized (lock) {
+            NettyPayload nettyPayload = queue.poll();
+            if (nettyPayload != null
+                    && nettyPayload.getBuffer().isPresent()
+                    && nettyPayload.getBuffer().get().isBuffer()) {
+                backlog--;
+            }
+            return nettyPayload;
+        }
+    }
+
+    public int getBacklog() {
+        synchronized (lock) {
+            return backlog;
+        }
+    }
+
+    public int getSize() {
+        synchronized (lock) {
+            return queue.size();
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java
index ab85e05beb6..8255964e6ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java
@@ -35,10 +35,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Supplier;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -122,19 +120,19 @@ public class TieredStorageNettyServiceImpl implements 
TieredStorageNettyService
             return new TieredStorageResultSubpartitionView(
                     availabilityListener, new ArrayList<>(), new 
ArrayList<>(), new ArrayList<>());
         }
-        List<Queue<NettyPayload>> queues = new ArrayList<>();
+        List<NettyPayloadManager> nettyPayloadManagers = new ArrayList<>();
         List<NettyConnectionId> nettyConnectionIds = new ArrayList<>();
         for (NettyServiceProducer serviceProducer : serviceProducers) {
-            LinkedBlockingQueue<NettyPayload> queue = new 
LinkedBlockingQueue<>();
+            NettyPayloadManager nettyPayloadManager = new 
NettyPayloadManager();
             NettyConnectionWriterImpl writer =
-                    new NettyConnectionWriterImpl(queue, availabilityListener);
+                    new NettyConnectionWriterImpl(nettyPayloadManager, 
availabilityListener);
             serviceProducer.connectionEstablished(subpartitionId, writer);
             nettyConnectionIds.add(writer.getNettyConnectionId());
-            queues.add(queue);
+            nettyPayloadManagers.add(nettyPayloadManager);
         }
         return new TieredStorageResultSubpartitionView(
                 availabilityListener,
-                queues,
+                nettyPayloadManagers,
                 nettyConnectionIds,
                 registeredServiceProducers.get(partitionId));
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionView.java
index 56c43735a91..adb07cd9934 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionView.java
@@ -29,7 +29,6 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
-import java.util.Queue;
 
 import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT;
 
@@ -41,7 +40,7 @@ public class TieredStorageResultSubpartitionView implements 
ResultSubpartitionVi
 
     private final BufferAvailabilityListener availabilityListener;
 
-    private final List<Queue<NettyPayload>> nettyPayloadQueues;
+    private final List<NettyPayloadManager> nettyPayloadManagers;
 
     private final List<NettyServiceProducer> serviceProducers;
 
@@ -53,17 +52,17 @@ public class TieredStorageResultSubpartitionView implements 
ResultSubpartitionVi
 
     private boolean stopSendingData = false;
 
-    private int queueIndexContainsCurrentSegment = -1;
+    private int managerIndexContainsCurrentSegment = -1;
 
     private int currentSequenceNumber = -1;
 
     public TieredStorageResultSubpartitionView(
             BufferAvailabilityListener availabilityListener,
-            List<Queue<NettyPayload>> nettyPayloadQueues,
+            List<NettyPayloadManager> nettyPayloadManagers,
             List<NettyConnectionId> nettyConnectionIds,
             List<NettyServiceProducer> serviceProducers) {
         this.availabilityListener = availabilityListener;
-        this.nettyPayloadQueues = nettyPayloadQueues;
+        this.nettyPayloadManagers = nettyPayloadManagers;
         this.nettyConnectionIds = nettyConnectionIds;
         this.serviceProducers = serviceProducers;
     }
@@ -74,18 +73,19 @@ public class TieredStorageResultSubpartitionView implements 
ResultSubpartitionVi
         if (stopSendingData || !findCurrentNettyPayloadQueue()) {
             return null;
         }
-        Queue<NettyPayload> currentQueue = 
nettyPayloadQueues.get(queueIndexContainsCurrentSegment);
-        Optional<Buffer> nextBuffer = readNettyPayload(currentQueue);
+        NettyPayloadManager nettyPayloadManager =
+                nettyPayloadManagers.get(managerIndexContainsCurrentSegment);
+        Optional<Buffer> nextBuffer = readNettyPayload(nettyPayloadManager);
         if (nextBuffer.isPresent()) {
             stopSendingData = nextBuffer.get().getDataType() == END_OF_SEGMENT;
             if (stopSendingData) {
-                queueIndexContainsCurrentSegment = -1;
+                managerIndexContainsCurrentSegment = -1;
             }
             currentSequenceNumber++;
             return BufferAndBacklog.fromBufferAndLookahead(
                     nextBuffer.get(),
-                    getNettyPayloadNextDataType(currentQueue),
-                    currentQueue.size(),
+                    getDataType(nettyPayloadManager.peek()),
+                    getBacklog(),
                     currentSequenceNumber);
         }
         return null;
@@ -94,14 +94,13 @@ public class TieredStorageResultSubpartitionView implements 
ResultSubpartitionVi
     @Override
     public AvailabilityWithBacklog getAvailabilityAndBacklog(int 
numCreditsAvailable) {
         if (findCurrentNettyPayloadQueue()) {
-            Queue<NettyPayload> currentQueue =
-                    nettyPayloadQueues.get(queueIndexContainsCurrentSegment);
+            NettyPayloadManager currentQueue =
+                    
nettyPayloadManagers.get(managerIndexContainsCurrentSegment);
             boolean availability = numCreditsAvailable > 0;
-            if (numCreditsAvailable <= 0
-                    && getNettyPayloadNextDataType(currentQueue) == 
Buffer.DataType.EVENT_BUFFER) {
+            if (numCreditsAvailable == 0 && isEventOrError(currentQueue)) {
                 availability = true;
             }
-            return new AvailabilityWithBacklog(availability, 
currentQueue.size());
+            return new AvailabilityWithBacklog(availability, getBacklog());
         }
         return new AvailabilityWithBacklog(false, 0);
     }
@@ -121,9 +120,9 @@ public class TieredStorageResultSubpartitionView implements 
ResultSubpartitionVi
             return;
         }
         isReleased = true;
-        for (int index = 0; index < nettyPayloadQueues.size(); ++index) {
+        for (int index = 0; index < nettyPayloadManagers.size(); ++index) {
             releaseQueue(
-                    nettyPayloadQueues.get(index),
+                    nettyPayloadManagers.get(index),
                     serviceProducers.get(index),
                     nettyConnectionIds.get(index));
         }
@@ -142,14 +141,18 @@ public class TieredStorageResultSubpartitionView 
implements ResultSubpartitionVi
 
     @Override
     public int unsynchronizedGetNumberOfQueuedBuffers() {
-        findCurrentNettyPayloadQueue();
-        return nettyPayloadQueues.get(queueIndexContainsCurrentSegment).size();
+        if (findCurrentNettyPayloadQueue()) {
+            return getBacklog();
+        }
+        return 0;
     }
 
     @Override
     public int getNumberOfQueuedBuffers() {
-        findCurrentNettyPayloadQueue();
-        return nettyPayloadQueues.get(queueIndexContainsCurrentSegment).size();
+        if (findCurrentNettyPayloadQueue()) {
+            return getBacklog();
+        }
+        return 0;
     }
 
     @Override
@@ -178,14 +181,14 @@ public class TieredStorageResultSubpartitionView 
implements ResultSubpartitionVi
     //       Internal Methods
     // -------------------------------
 
-    private Optional<Buffer> readNettyPayload(Queue<NettyPayload> 
nettyPayloadQueue)
+    private Optional<Buffer> readNettyPayload(NettyPayloadManager 
nettyPayloadManager)
             throws IOException {
-        NettyPayload nettyPayload = nettyPayloadQueue.poll();
+        NettyPayload nettyPayload = nettyPayloadManager.poll();
         if (nettyPayload == null) {
             return Optional.empty();
         } else {
             if (nettyPayload.getSegmentId() != -1) {
-                return readNettyPayload(nettyPayloadQueue);
+                return readNettyPayload(nettyPayloadManager);
             }
             Optional<Throwable> error = nettyPayload.getError();
             if (error.isPresent()) {
@@ -197,37 +200,52 @@ public class TieredStorageResultSubpartitionView 
implements ResultSubpartitionVi
         }
     }
 
-    private Buffer.DataType getNettyPayloadNextDataType(Queue<NettyPayload> 
nettyPayload) {
-        NettyPayload nextBuffer = nettyPayload.peek();
-        if (nextBuffer == null || !nextBuffer.getBuffer().isPresent()) {
+    private int getBacklog() {
+        int backlog = 0;
+        for (NettyPayloadManager nettyPayloadManager : nettyPayloadManagers) {
+            backlog += nettyPayloadManager.getBacklog();
+        }
+        return backlog;
+    }
+
+    private boolean isEventOrError(NettyPayloadManager nettyPayloadManager) {
+        NettyPayload nettyPayload = nettyPayloadManager.peek();
+        return nettyPayload != null
+                && (nettyPayload.getError().isPresent()
+                        || (nettyPayload.getBuffer().isPresent()
+                                && 
!nettyPayload.getBuffer().get().isBuffer()));
+    }
+
+    private Buffer.DataType getDataType(NettyPayload nettyPayload) {
+        if (nettyPayload == null || !nettyPayload.getBuffer().isPresent()) {
             return Buffer.DataType.NONE;
         } else {
-            return nextBuffer.getBuffer().get().getDataType();
+            return nettyPayload.getBuffer().get().getDataType();
         }
     }
 
     private void releaseQueue(
-            Queue<NettyPayload> nettyPayloadQueue,
+            NettyPayloadManager nettyPayloadManager,
             NettyServiceProducer serviceProducer,
             NettyConnectionId id) {
         NettyPayload nettyPayload;
-        while ((nettyPayload = nettyPayloadQueue.poll()) != null) {
+        while ((nettyPayload = nettyPayloadManager.poll()) != null) {
             nettyPayload.getBuffer().ifPresent(Buffer::recycleBuffer);
         }
         serviceProducer.connectionBroken(id);
     }
 
     private boolean findCurrentNettyPayloadQueue() {
-        if (queueIndexContainsCurrentSegment != -1 && !stopSendingData) {
+        if (managerIndexContainsCurrentSegment != -1 && !stopSendingData) {
             return true;
         }
-        for (int queueIndex = 0; queueIndex < nettyPayloadQueues.size(); 
queueIndex++) {
-            NettyPayload firstNettyPayload = 
nettyPayloadQueues.get(queueIndex).peek();
+        for (int managerIndex = 0; managerIndex < nettyPayloadManagers.size(); 
managerIndex++) {
+            NettyPayload firstNettyPayload = 
nettyPayloadManagers.get(managerIndex).peek();
             if (firstNettyPayload == null
                     || firstNettyPayload.getSegmentId() != requiredSegmentId) {
                 continue;
             }
-            queueIndexContainsCurrentSegment = queueIndex;
+            managerIndexContainsCurrentSegment = managerIndex;
             return true;
         }
         return false;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterTest.java
index 1d68ee3066b..fe89bd141bf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterTest.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
-import java.util.ArrayDeque;
 import java.util.concurrent.CompletableFuture;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -36,29 +35,27 @@ public class NettyConnectionWriterTest {
     @Test
     void testWriteBuffer() {
         int bufferNumber = 10;
-        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
+        NettyPayloadManager nettyPayloadManager = new NettyPayloadManager();
         NettyConnectionWriter nettyConnectionWriter =
-                new NettyConnectionWriterImpl(nettyPayloadQueue, () -> {});
+                new NettyConnectionWriterImpl(nettyPayloadManager, () -> {});
         writeBufferToWriter(bufferNumber, nettyConnectionWriter);
-        assertThat(nettyPayloadQueue).hasSize(bufferNumber);
+        assertThat(nettyPayloadManager.getBacklog()).isEqualTo(bufferNumber);
         
assertThat(nettyConnectionWriter.numQueuedBuffers()).isEqualTo(bufferNumber);
     }
 
     @Test
     void testGetNettyConnectionId() {
-        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
         NettyConnectionWriter nettyConnectionWriter =
-                new NettyConnectionWriterImpl(nettyPayloadQueue, () -> {});
+                new NettyConnectionWriterImpl(new NettyPayloadManager(), () -> 
{});
         assertThat(nettyConnectionWriter.getNettyConnectionId()).isNotNull();
     }
 
     @Test
     void testNotifyAvailable() {
         CompletableFuture<Void> notifier = new CompletableFuture<>();
-        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
         NettyConnectionWriter nettyConnectionWriter =
                 new NettyConnectionWriterImpl(
-                        nettyPayloadQueue,
+                        new NettyPayloadManager(),
                         () -> {
                             notifier.complete(null);
                         });
@@ -69,9 +66,8 @@ public class NettyConnectionWriterTest {
     @Test
     void testClose() {
         int bufferNumber = 10;
-        ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>();
         NettyConnectionWriter nettyConnectionWriter =
-                new NettyConnectionWriterImpl(nettyPayloadQueue, () -> {});
+                new NettyConnectionWriterImpl(new NettyPayloadManager(), () -> 
{});
         writeBufferToWriter(bufferNumber, nettyConnectionWriter);
         nettyConnectionWriter.close(null);
         assertThat(nettyConnectionWriter.numQueuedBuffers()).isEqualTo(0);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionViewTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionViewTest.java
index 922d56d8aa2..afd84848cef 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionViewTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionViewTest.java
@@ -30,11 +30,9 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
-import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT;
@@ -48,7 +46,7 @@ public class TieredStorageResultSubpartitionViewTest {
 
     private CompletableFuture<Void> availabilityListener;
 
-    private List<Queue<NettyPayload>> nettyPayloadQueues;
+    private List<NettyPayloadManager> nettyPayloadManagers;
 
     private List<CompletableFuture<NettyConnectionId>> 
connectionBrokenConsumers;
 
@@ -57,13 +55,13 @@ public class TieredStorageResultSubpartitionViewTest {
     @BeforeEach
     void before() {
         availabilityListener = new CompletableFuture<>();
-        nettyPayloadQueues = createNettyPayloadQueues();
+        nettyPayloadManagers = createNettyPayloadQueues();
         connectionBrokenConsumers =
                 Arrays.asList(new CompletableFuture<>(), new 
CompletableFuture<>());
         tieredStorageResultSubpartitionView =
                 new TieredStorageResultSubpartitionView(
                         createBufferAvailabilityListener(availabilityListener),
-                        nettyPayloadQueues,
+                        nettyPayloadManagers,
                         createNettyConnectionIds(),
                         
createNettyServiceProducers(connectionBrokenConsumers));
     }
@@ -71,10 +69,10 @@ public class TieredStorageResultSubpartitionViewTest {
     @Test
     void testGetNextBuffer() throws IOException {
         
checkBufferAndBacklog(tieredStorageResultSubpartitionView.getNextBuffer(), 1);
-        
checkBufferAndBacklog(tieredStorageResultSubpartitionView.getNextBuffer(), 0);
+        
checkBufferAndBacklog(tieredStorageResultSubpartitionView.getNextBuffer(), 1);
         tieredStorageResultSubpartitionView.notifyRequiredSegmentId(1);
         assertThat(availabilityListener).isDone();
-        
checkBufferAndBacklog(tieredStorageResultSubpartitionView.getNextBuffer(), 1);
+        
checkBufferAndBacklog(tieredStorageResultSubpartitionView.getNextBuffer(), 0);
         
checkBufferAndBacklog(tieredStorageResultSubpartitionView.getNextBuffer(), 0);
         
assertThat(tieredStorageResultSubpartitionView.getNextBuffer()).isNull();
     }
@@ -82,11 +80,11 @@ public class TieredStorageResultSubpartitionViewTest {
     @Test
     void testGetNextBufferFailed() {
         Throwable expectedError = new IOException();
-        nettyPayloadQueues = createNettyPayloadQueuesWithError(expectedError);
+        nettyPayloadManagers = 
createNettyPayloadQueuesWithError(expectedError);
         tieredStorageResultSubpartitionView =
                 new TieredStorageResultSubpartitionView(
                         createBufferAvailabilityListener(availabilityListener),
-                        nettyPayloadQueues,
+                        nettyPayloadManagers,
                         createNettyConnectionIds(),
                         
createNettyServiceProducers(connectionBrokenConsumers));
         assertThatThrownBy(tieredStorageResultSubpartitionView::getNextBuffer)
@@ -98,11 +96,11 @@ public class TieredStorageResultSubpartitionViewTest {
     void testGetAvailabilityAndBacklog() {
         ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog1 
=
                 
tieredStorageResultSubpartitionView.getAvailabilityAndBacklog(0);
-        assertThat(availabilityAndBacklog1.getBacklog()).isEqualTo(3);
+        assertThat(availabilityAndBacklog1.getBacklog()).isEqualTo(2);
         assertThat(availabilityAndBacklog1.isAvailable()).isEqualTo(false);
         ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog2 
=
                 
tieredStorageResultSubpartitionView.getAvailabilityAndBacklog(2);
-        assertThat(availabilityAndBacklog2.getBacklog()).isEqualTo(3);
+        assertThat(availabilityAndBacklog2.getBacklog()).isEqualTo(2);
         assertThat(availabilityAndBacklog2.isAvailable()).isEqualTo(true);
     }
 
@@ -115,8 +113,8 @@ public class TieredStorageResultSubpartitionViewTest {
     @Test
     void testReleaseAllResources() throws IOException {
         tieredStorageResultSubpartitionView.releaseAllResources();
-        assertThat(nettyPayloadQueues.get(0)).hasSize(0);
-        assertThat(nettyPayloadQueues.get(1)).hasSize(0);
+        assertThat(nettyPayloadManagers.get(0).getBacklog()).isZero();
+        assertThat(nettyPayloadManagers.get(1).getBacklog()).isZero();
         assertThat(connectionBrokenConsumers.get(0)).isDone();
         assertThat(connectionBrokenConsumers.get(1)).isDone();
         assertThat(tieredStorageResultSubpartitionView.isReleased()).isTrue();
@@ -124,9 +122,9 @@ public class TieredStorageResultSubpartitionViewTest {
 
     @Test
     void testGetNumberOfQueuedBuffers() {
-        
assertThat(tieredStorageResultSubpartitionView.getNumberOfQueuedBuffers()).isEqualTo(3);
+        
assertThat(tieredStorageResultSubpartitionView.getNumberOfQueuedBuffers()).isEqualTo(2);
         
assertThat(tieredStorageResultSubpartitionView.unsynchronizedGetNumberOfQueuedBuffers())
-                .isEqualTo(3);
+                .isEqualTo(2);
     }
 
     private static void checkBufferAndBacklog(BufferAndBacklog 
bufferAndBacklog, int backlog) {
@@ -140,10 +138,10 @@ public class TieredStorageResultSubpartitionViewTest {
         return () -> notifier.complete(null);
     }
 
-    private static List<Queue<NettyPayload>> createNettyPayloadQueues() {
-        List<Queue<NettyPayload>> nettyPayloadQueues = new ArrayList<>();
+    private static List<NettyPayloadManager> createNettyPayloadQueues() {
+        List<NettyPayloadManager> nettyPayloadManagers = new ArrayList<>();
         for (int index = 0; index < TIER_NUMBER; ++index) {
-            Queue<NettyPayload> queue = new ArrayDeque<>();
+            NettyPayloadManager queue = new NettyPayloadManager();
             queue.add(NettyPayload.newSegment(index));
             
queue.add(NettyPayload.newBuffer(BufferBuilderTestUtils.buildSomeBuffer(0), 0, 
index));
             queue.add(
@@ -154,20 +152,20 @@ public class TieredStorageResultSubpartitionViewTest {
                                     END_OF_SEGMENT),
                             1,
                             index));
-            nettyPayloadQueues.add(queue);
+            nettyPayloadManagers.add(queue);
         }
-        return nettyPayloadQueues;
+        return nettyPayloadManagers;
     }
 
-    private static List<Queue<NettyPayload>> 
createNettyPayloadQueuesWithError(Throwable error) {
-        List<Queue<NettyPayload>> nettyPayloadQueues = new ArrayList<>();
+    private static List<NettyPayloadManager> 
createNettyPayloadQueuesWithError(Throwable error) {
+        List<NettyPayloadManager> nettyPayloadManagers = new ArrayList<>();
         for (int index = 0; index < TIER_NUMBER; ++index) {
-            Queue<NettyPayload> queue = new ArrayDeque<>();
+            NettyPayloadManager queue = new NettyPayloadManager();
             queue.add(NettyPayload.newSegment(index));
             queue.add(NettyPayload.newError(error));
-            nettyPayloadQueues.add(queue);
+            nettyPayloadManagers.add(queue);
         }
-        return nettyPayloadQueues;
+        return nettyPayloadManagers;
     }
 
     private static List<NettyConnectionId> createNettyConnectionIds() {

Reply via email to