This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 819ba92994a5dcc4ced054a3530d423123a8de23 Author: Wencong Liu <[email protected]> AuthorDate: Mon Aug 14 10:45:06 2023 +0800 [FLINK-32770][network] Fix the inaccurate backlog number of Hybrid Shuffle --- .../network/partition/ResultSubpartitionView.java | 7 ++ .../tiered/netty/NettyConnectionWriterImpl.java | 15 ++-- .../hybrid/tiered/netty/NettyPayloadManager.java | 79 ++++++++++++++++++++ .../netty/TieredStorageNettyServiceImpl.java | 12 ++- .../netty/TieredStorageResultSubpartitionView.java | 86 +++++++++++++--------- .../tiered/netty/NettyConnectionWriterTest.java | 16 ++-- .../TieredStorageResultSubpartitionViewTest.java | 48 ++++++------ 7 files changed, 179 insertions(+), 84 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java index 9a4078e0799..305e50271fa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java @@ -65,6 +65,13 @@ public interface ResultSubpartitionView { */ Throwable getFailureCause(); + /** + * Get the availability and backlog of the view. The availability represents if the view is + * ready to get buffer from it. The backlog represents the number of available data buffers. + * + * @param numCreditsAvailable the available credits for this {@link ResultSubpartitionView}. + * @return availability and backlog. + */ AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable); int unsynchronizedGetNumberOfQueuedBuffers(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterImpl.java index d5ec2710616..24f61e4329b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterImpl.java @@ -23,20 +23,19 @@ import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener; import javax.annotation.Nullable; -import java.util.Queue; - /** The default implementation of {@link NettyConnectionWriter}. */ public class NettyConnectionWriterImpl implements NettyConnectionWriter { - private final Queue<NettyPayload> bufferQueue; + private final NettyPayloadManager nettyPayloadManager; private final NettyConnectionId connectionId; private final BufferAvailabilityListener availabilityListener; public NettyConnectionWriterImpl( - Queue<NettyPayload> bufferQueue, BufferAvailabilityListener availabilityListener) { - this.bufferQueue = bufferQueue; + NettyPayloadManager nettyPayloadManager, + BufferAvailabilityListener availabilityListener) { + this.nettyPayloadManager = nettyPayloadManager; this.connectionId = NettyConnectionId.newId(); this.availabilityListener = availabilityListener; } @@ -53,18 +52,18 @@ public class NettyConnectionWriterImpl implements NettyConnectionWriter { @Override public int numQueuedBuffers() { - return bufferQueue.size(); + return nettyPayloadManager.getSize(); } @Override public void writeBuffer(NettyPayload nettyPayload) { - bufferQueue.add(nettyPayload); + nettyPayloadManager.add(nettyPayload); } @Override public void close(@Nullable Throwable error) { NettyPayload nettyPayload; - while ((nettyPayload = bufferQueue.poll()) != null) { + while ((nettyPayload = nettyPayloadManager.poll()) != null) { nettyPayload.getBuffer().ifPresent(Buffer::recycleBuffer); } if (error != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyPayloadManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyPayloadManager.java new file mode 100644 index 00000000000..95e7bc77bc0 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyPayloadManager.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty; + +import org.apache.flink.runtime.io.network.buffer.Buffer; + +import javax.annotation.concurrent.GuardedBy; + +import java.util.LinkedList; +import java.util.Optional; +import java.util.Queue; + +/** {@link NettyPayloadManager} is used to contain all netty payloads from a storage tier. */ +public class NettyPayloadManager { + + private final Object lock = new Object(); + + private final Queue<NettyPayload> queue = new LinkedList<>(); + + /** Number of buffers whose {@link Buffer.DataType} is buffer in the queue. */ + @GuardedBy("lock") + private int backlog = 0; + + public void add(NettyPayload nettyPayload) { + synchronized (lock) { + queue.add(nettyPayload); + Optional<Buffer> buffer = nettyPayload.getBuffer(); + if (buffer.isPresent() && buffer.get().isBuffer()) { + backlog++; + } + } + } + + public NettyPayload peek() { + synchronized (lock) { + return queue.peek(); + } + } + + public NettyPayload poll() { + synchronized (lock) { + NettyPayload nettyPayload = queue.poll(); + if (nettyPayload != null + && nettyPayload.getBuffer().isPresent() + && nettyPayload.getBuffer().get().isBuffer()) { + backlog--; + } + return nettyPayload; + } + } + + public int getBacklog() { + synchronized (lock) { + return backlog; + } + } + + public int getSize() { + synchronized (lock) { + return queue.size(); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java index ab85e05beb6..8255964e6ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageNettyServiceImpl.java @@ -35,10 +35,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Supplier; import static org.apache.flink.util.Preconditions.checkArgument; @@ -122,19 +120,19 @@ public class TieredStorageNettyServiceImpl implements TieredStorageNettyService return new TieredStorageResultSubpartitionView( availabilityListener, new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); } - List<Queue<NettyPayload>> queues = new ArrayList<>(); + List<NettyPayloadManager> nettyPayloadManagers = new ArrayList<>(); List<NettyConnectionId> nettyConnectionIds = new ArrayList<>(); for (NettyServiceProducer serviceProducer : serviceProducers) { - LinkedBlockingQueue<NettyPayload> queue = new LinkedBlockingQueue<>(); + NettyPayloadManager nettyPayloadManager = new NettyPayloadManager(); NettyConnectionWriterImpl writer = - new NettyConnectionWriterImpl(queue, availabilityListener); + new NettyConnectionWriterImpl(nettyPayloadManager, availabilityListener); serviceProducer.connectionEstablished(subpartitionId, writer); nettyConnectionIds.add(writer.getNettyConnectionId()); - queues.add(queue); + nettyPayloadManagers.add(nettyPayloadManager); } return new TieredStorageResultSubpartitionView( availabilityListener, - queues, + nettyPayloadManagers, nettyConnectionIds, registeredServiceProducers.get(partitionId)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionView.java index 56c43735a91..adb07cd9934 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionView.java @@ -29,7 +29,6 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.List; import java.util.Optional; -import java.util.Queue; import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT; @@ -41,7 +40,7 @@ public class TieredStorageResultSubpartitionView implements ResultSubpartitionVi private final BufferAvailabilityListener availabilityListener; - private final List<Queue<NettyPayload>> nettyPayloadQueues; + private final List<NettyPayloadManager> nettyPayloadManagers; private final List<NettyServiceProducer> serviceProducers; @@ -53,17 +52,17 @@ public class TieredStorageResultSubpartitionView implements ResultSubpartitionVi private boolean stopSendingData = false; - private int queueIndexContainsCurrentSegment = -1; + private int managerIndexContainsCurrentSegment = -1; private int currentSequenceNumber = -1; public TieredStorageResultSubpartitionView( BufferAvailabilityListener availabilityListener, - List<Queue<NettyPayload>> nettyPayloadQueues, + List<NettyPayloadManager> nettyPayloadManagers, List<NettyConnectionId> nettyConnectionIds, List<NettyServiceProducer> serviceProducers) { this.availabilityListener = availabilityListener; - this.nettyPayloadQueues = nettyPayloadQueues; + this.nettyPayloadManagers = nettyPayloadManagers; this.nettyConnectionIds = nettyConnectionIds; this.serviceProducers = serviceProducers; } @@ -74,18 +73,19 @@ public class TieredStorageResultSubpartitionView implements ResultSubpartitionVi if (stopSendingData || !findCurrentNettyPayloadQueue()) { return null; } - Queue<NettyPayload> currentQueue = nettyPayloadQueues.get(queueIndexContainsCurrentSegment); - Optional<Buffer> nextBuffer = readNettyPayload(currentQueue); + NettyPayloadManager nettyPayloadManager = + nettyPayloadManagers.get(managerIndexContainsCurrentSegment); + Optional<Buffer> nextBuffer = readNettyPayload(nettyPayloadManager); if (nextBuffer.isPresent()) { stopSendingData = nextBuffer.get().getDataType() == END_OF_SEGMENT; if (stopSendingData) { - queueIndexContainsCurrentSegment = -1; + managerIndexContainsCurrentSegment = -1; } currentSequenceNumber++; return BufferAndBacklog.fromBufferAndLookahead( nextBuffer.get(), - getNettyPayloadNextDataType(currentQueue), - currentQueue.size(), + getDataType(nettyPayloadManager.peek()), + getBacklog(), currentSequenceNumber); } return null; @@ -94,14 +94,13 @@ public class TieredStorageResultSubpartitionView implements ResultSubpartitionVi @Override public AvailabilityWithBacklog getAvailabilityAndBacklog(int numCreditsAvailable) { if (findCurrentNettyPayloadQueue()) { - Queue<NettyPayload> currentQueue = - nettyPayloadQueues.get(queueIndexContainsCurrentSegment); + NettyPayloadManager currentQueue = + nettyPayloadManagers.get(managerIndexContainsCurrentSegment); boolean availability = numCreditsAvailable > 0; - if (numCreditsAvailable <= 0 - && getNettyPayloadNextDataType(currentQueue) == Buffer.DataType.EVENT_BUFFER) { + if (numCreditsAvailable == 0 && isEventOrError(currentQueue)) { availability = true; } - return new AvailabilityWithBacklog(availability, currentQueue.size()); + return new AvailabilityWithBacklog(availability, getBacklog()); } return new AvailabilityWithBacklog(false, 0); } @@ -121,9 +120,9 @@ public class TieredStorageResultSubpartitionView implements ResultSubpartitionVi return; } isReleased = true; - for (int index = 0; index < nettyPayloadQueues.size(); ++index) { + for (int index = 0; index < nettyPayloadManagers.size(); ++index) { releaseQueue( - nettyPayloadQueues.get(index), + nettyPayloadManagers.get(index), serviceProducers.get(index), nettyConnectionIds.get(index)); } @@ -142,14 +141,18 @@ public class TieredStorageResultSubpartitionView implements ResultSubpartitionVi @Override public int unsynchronizedGetNumberOfQueuedBuffers() { - findCurrentNettyPayloadQueue(); - return nettyPayloadQueues.get(queueIndexContainsCurrentSegment).size(); + if (findCurrentNettyPayloadQueue()) { + return getBacklog(); + } + return 0; } @Override public int getNumberOfQueuedBuffers() { - findCurrentNettyPayloadQueue(); - return nettyPayloadQueues.get(queueIndexContainsCurrentSegment).size(); + if (findCurrentNettyPayloadQueue()) { + return getBacklog(); + } + return 0; } @Override @@ -178,14 +181,14 @@ public class TieredStorageResultSubpartitionView implements ResultSubpartitionVi // Internal Methods // ------------------------------- - private Optional<Buffer> readNettyPayload(Queue<NettyPayload> nettyPayloadQueue) + private Optional<Buffer> readNettyPayload(NettyPayloadManager nettyPayloadManager) throws IOException { - NettyPayload nettyPayload = nettyPayloadQueue.poll(); + NettyPayload nettyPayload = nettyPayloadManager.poll(); if (nettyPayload == null) { return Optional.empty(); } else { if (nettyPayload.getSegmentId() != -1) { - return readNettyPayload(nettyPayloadQueue); + return readNettyPayload(nettyPayloadManager); } Optional<Throwable> error = nettyPayload.getError(); if (error.isPresent()) { @@ -197,37 +200,52 @@ public class TieredStorageResultSubpartitionView implements ResultSubpartitionVi } } - private Buffer.DataType getNettyPayloadNextDataType(Queue<NettyPayload> nettyPayload) { - NettyPayload nextBuffer = nettyPayload.peek(); - if (nextBuffer == null || !nextBuffer.getBuffer().isPresent()) { + private int getBacklog() { + int backlog = 0; + for (NettyPayloadManager nettyPayloadManager : nettyPayloadManagers) { + backlog += nettyPayloadManager.getBacklog(); + } + return backlog; + } + + private boolean isEventOrError(NettyPayloadManager nettyPayloadManager) { + NettyPayload nettyPayload = nettyPayloadManager.peek(); + return nettyPayload != null + && (nettyPayload.getError().isPresent() + || (nettyPayload.getBuffer().isPresent() + && !nettyPayload.getBuffer().get().isBuffer())); + } + + private Buffer.DataType getDataType(NettyPayload nettyPayload) { + if (nettyPayload == null || !nettyPayload.getBuffer().isPresent()) { return Buffer.DataType.NONE; } else { - return nextBuffer.getBuffer().get().getDataType(); + return nettyPayload.getBuffer().get().getDataType(); } } private void releaseQueue( - Queue<NettyPayload> nettyPayloadQueue, + NettyPayloadManager nettyPayloadManager, NettyServiceProducer serviceProducer, NettyConnectionId id) { NettyPayload nettyPayload; - while ((nettyPayload = nettyPayloadQueue.poll()) != null) { + while ((nettyPayload = nettyPayloadManager.poll()) != null) { nettyPayload.getBuffer().ifPresent(Buffer::recycleBuffer); } serviceProducer.connectionBroken(id); } private boolean findCurrentNettyPayloadQueue() { - if (queueIndexContainsCurrentSegment != -1 && !stopSendingData) { + if (managerIndexContainsCurrentSegment != -1 && !stopSendingData) { return true; } - for (int queueIndex = 0; queueIndex < nettyPayloadQueues.size(); queueIndex++) { - NettyPayload firstNettyPayload = nettyPayloadQueues.get(queueIndex).peek(); + for (int managerIndex = 0; managerIndex < nettyPayloadManagers.size(); managerIndex++) { + NettyPayload firstNettyPayload = nettyPayloadManagers.get(managerIndex).peek(); if (firstNettyPayload == null || firstNettyPayload.getSegmentId() != requiredSegmentId) { continue; } - queueIndexContainsCurrentSegment = queueIndex; + managerIndexContainsCurrentSegment = managerIndex; return true; } return false; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterTest.java index 1d68ee3066b..fe89bd141bf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/NettyConnectionWriterTest.java @@ -23,7 +23,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils; import org.junit.jupiter.api.Test; import java.io.IOException; -import java.util.ArrayDeque; import java.util.concurrent.CompletableFuture; import static org.assertj.core.api.Assertions.assertThat; @@ -36,29 +35,27 @@ public class NettyConnectionWriterTest { @Test void testWriteBuffer() { int bufferNumber = 10; - ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>(); + NettyPayloadManager nettyPayloadManager = new NettyPayloadManager(); NettyConnectionWriter nettyConnectionWriter = - new NettyConnectionWriterImpl(nettyPayloadQueue, () -> {}); + new NettyConnectionWriterImpl(nettyPayloadManager, () -> {}); writeBufferToWriter(bufferNumber, nettyConnectionWriter); - assertThat(nettyPayloadQueue).hasSize(bufferNumber); + assertThat(nettyPayloadManager.getBacklog()).isEqualTo(bufferNumber); assertThat(nettyConnectionWriter.numQueuedBuffers()).isEqualTo(bufferNumber); } @Test void testGetNettyConnectionId() { - ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>(); NettyConnectionWriter nettyConnectionWriter = - new NettyConnectionWriterImpl(nettyPayloadQueue, () -> {}); + new NettyConnectionWriterImpl(new NettyPayloadManager(), () -> {}); assertThat(nettyConnectionWriter.getNettyConnectionId()).isNotNull(); } @Test void testNotifyAvailable() { CompletableFuture<Void> notifier = new CompletableFuture<>(); - ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>(); NettyConnectionWriter nettyConnectionWriter = new NettyConnectionWriterImpl( - nettyPayloadQueue, + new NettyPayloadManager(), () -> { notifier.complete(null); }); @@ -69,9 +66,8 @@ public class NettyConnectionWriterTest { @Test void testClose() { int bufferNumber = 10; - ArrayDeque<NettyPayload> nettyPayloadQueue = new ArrayDeque<>(); NettyConnectionWriter nettyConnectionWriter = - new NettyConnectionWriterImpl(nettyPayloadQueue, () -> {}); + new NettyConnectionWriterImpl(new NettyPayloadManager(), () -> {}); writeBufferToWriter(bufferNumber, nettyConnectionWriter); nettyConnectionWriter.close(null); assertThat(nettyConnectionWriter.numQueuedBuffers()).isEqualTo(0); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionViewTest.java index 922d56d8aa2..afd84848cef 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionViewTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageResultSubpartitionViewTest.java @@ -30,11 +30,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.IOException; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Queue; import java.util.concurrent.CompletableFuture; import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT; @@ -48,7 +46,7 @@ public class TieredStorageResultSubpartitionViewTest { private CompletableFuture<Void> availabilityListener; - private List<Queue<NettyPayload>> nettyPayloadQueues; + private List<NettyPayloadManager> nettyPayloadManagers; private List<CompletableFuture<NettyConnectionId>> connectionBrokenConsumers; @@ -57,13 +55,13 @@ public class TieredStorageResultSubpartitionViewTest { @BeforeEach void before() { availabilityListener = new CompletableFuture<>(); - nettyPayloadQueues = createNettyPayloadQueues(); + nettyPayloadManagers = createNettyPayloadQueues(); connectionBrokenConsumers = Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>()); tieredStorageResultSubpartitionView = new TieredStorageResultSubpartitionView( createBufferAvailabilityListener(availabilityListener), - nettyPayloadQueues, + nettyPayloadManagers, createNettyConnectionIds(), createNettyServiceProducers(connectionBrokenConsumers)); } @@ -71,10 +69,10 @@ public class TieredStorageResultSubpartitionViewTest { @Test void testGetNextBuffer() throws IOException { checkBufferAndBacklog(tieredStorageResultSubpartitionView.getNextBuffer(), 1); - checkBufferAndBacklog(tieredStorageResultSubpartitionView.getNextBuffer(), 0); + checkBufferAndBacklog(tieredStorageResultSubpartitionView.getNextBuffer(), 1); tieredStorageResultSubpartitionView.notifyRequiredSegmentId(1); assertThat(availabilityListener).isDone(); - checkBufferAndBacklog(tieredStorageResultSubpartitionView.getNextBuffer(), 1); + checkBufferAndBacklog(tieredStorageResultSubpartitionView.getNextBuffer(), 0); checkBufferAndBacklog(tieredStorageResultSubpartitionView.getNextBuffer(), 0); assertThat(tieredStorageResultSubpartitionView.getNextBuffer()).isNull(); } @@ -82,11 +80,11 @@ public class TieredStorageResultSubpartitionViewTest { @Test void testGetNextBufferFailed() { Throwable expectedError = new IOException(); - nettyPayloadQueues = createNettyPayloadQueuesWithError(expectedError); + nettyPayloadManagers = createNettyPayloadQueuesWithError(expectedError); tieredStorageResultSubpartitionView = new TieredStorageResultSubpartitionView( createBufferAvailabilityListener(availabilityListener), - nettyPayloadQueues, + nettyPayloadManagers, createNettyConnectionIds(), createNettyServiceProducers(connectionBrokenConsumers)); assertThatThrownBy(tieredStorageResultSubpartitionView::getNextBuffer) @@ -98,11 +96,11 @@ public class TieredStorageResultSubpartitionViewTest { void testGetAvailabilityAndBacklog() { ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog1 = tieredStorageResultSubpartitionView.getAvailabilityAndBacklog(0); - assertThat(availabilityAndBacklog1.getBacklog()).isEqualTo(3); + assertThat(availabilityAndBacklog1.getBacklog()).isEqualTo(2); assertThat(availabilityAndBacklog1.isAvailable()).isEqualTo(false); ResultSubpartitionView.AvailabilityWithBacklog availabilityAndBacklog2 = tieredStorageResultSubpartitionView.getAvailabilityAndBacklog(2); - assertThat(availabilityAndBacklog2.getBacklog()).isEqualTo(3); + assertThat(availabilityAndBacklog2.getBacklog()).isEqualTo(2); assertThat(availabilityAndBacklog2.isAvailable()).isEqualTo(true); } @@ -115,8 +113,8 @@ public class TieredStorageResultSubpartitionViewTest { @Test void testReleaseAllResources() throws IOException { tieredStorageResultSubpartitionView.releaseAllResources(); - assertThat(nettyPayloadQueues.get(0)).hasSize(0); - assertThat(nettyPayloadQueues.get(1)).hasSize(0); + assertThat(nettyPayloadManagers.get(0).getBacklog()).isZero(); + assertThat(nettyPayloadManagers.get(1).getBacklog()).isZero(); assertThat(connectionBrokenConsumers.get(0)).isDone(); assertThat(connectionBrokenConsumers.get(1)).isDone(); assertThat(tieredStorageResultSubpartitionView.isReleased()).isTrue(); @@ -124,9 +122,9 @@ public class TieredStorageResultSubpartitionViewTest { @Test void testGetNumberOfQueuedBuffers() { - assertThat(tieredStorageResultSubpartitionView.getNumberOfQueuedBuffers()).isEqualTo(3); + assertThat(tieredStorageResultSubpartitionView.getNumberOfQueuedBuffers()).isEqualTo(2); assertThat(tieredStorageResultSubpartitionView.unsynchronizedGetNumberOfQueuedBuffers()) - .isEqualTo(3); + .isEqualTo(2); } private static void checkBufferAndBacklog(BufferAndBacklog bufferAndBacklog, int backlog) { @@ -140,10 +138,10 @@ public class TieredStorageResultSubpartitionViewTest { return () -> notifier.complete(null); } - private static List<Queue<NettyPayload>> createNettyPayloadQueues() { - List<Queue<NettyPayload>> nettyPayloadQueues = new ArrayList<>(); + private static List<NettyPayloadManager> createNettyPayloadQueues() { + List<NettyPayloadManager> nettyPayloadManagers = new ArrayList<>(); for (int index = 0; index < TIER_NUMBER; ++index) { - Queue<NettyPayload> queue = new ArrayDeque<>(); + NettyPayloadManager queue = new NettyPayloadManager(); queue.add(NettyPayload.newSegment(index)); queue.add(NettyPayload.newBuffer(BufferBuilderTestUtils.buildSomeBuffer(0), 0, index)); queue.add( @@ -154,20 +152,20 @@ public class TieredStorageResultSubpartitionViewTest { END_OF_SEGMENT), 1, index)); - nettyPayloadQueues.add(queue); + nettyPayloadManagers.add(queue); } - return nettyPayloadQueues; + return nettyPayloadManagers; } - private static List<Queue<NettyPayload>> createNettyPayloadQueuesWithError(Throwable error) { - List<Queue<NettyPayload>> nettyPayloadQueues = new ArrayList<>(); + private static List<NettyPayloadManager> createNettyPayloadQueuesWithError(Throwable error) { + List<NettyPayloadManager> nettyPayloadManagers = new ArrayList<>(); for (int index = 0; index < TIER_NUMBER; ++index) { - Queue<NettyPayload> queue = new ArrayDeque<>(); + NettyPayloadManager queue = new NettyPayloadManager(); queue.add(NettyPayload.newSegment(index)); queue.add(NettyPayload.newError(error)); - nettyPayloadQueues.add(queue); + nettyPayloadManagers.add(queue); } - return nettyPayloadQueues; + return nettyPayloadManagers; } private static List<NettyConnectionId> createNettyConnectionIds() {
