This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c044753b0f [FLINK-28373][network] Read a full buffer of data per file 
IO read request for sort-shuffle
1c044753b0f is described below

commit 1c044753b0f0e4c23f954d71f355d1368c41061f
Author: Yuxin Tan <[email protected]>
AuthorDate: Tue Aug 2 21:59:34 2022 +0800

    [FLINK-28373][network] Read a full buffer of data per file IO read request 
for sort-shuffle
    
    This closes #20335.
---
 .../runtime/io/network/buffer/CompositeBuffer.java | 233 +++++++++++++++++++++
 .../buffer/ReadOnlySlicedNetworkBuffer.java        |  10 +-
 .../runtime/io/network/netty/NettyMessage.java     |   7 +-
 .../network/partition/PartitionedFileReader.java   | 130 ++++++++++++
 .../partition/SortMergeSubpartitionReader.java     |  17 +-
 .../partition/consumer/LocalInputChannel.java      |   7 +-
 .../network/buffer/ReadOnlySlicedBufferTest.java   |  20 +-
 .../SortMergeResultPartitionReadSchedulerTest.java |  73 +++----
 .../partition/SortMergeResultPartitionTest.java    |  34 ++-
 .../partition/SortMergeSubpartitionReaderTest.java | 103 ++++-----
 10 files changed, 509 insertions(+), 125 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/CompositeBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/CompositeBuffer.java
new file mode 100644
index 00000000000..7444704b0ad
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/CompositeBuffer.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.CompositeByteBuf;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of {@link Buffer} which contains multiple partial buffers 
for network data
+ * communication.
+ */
+public class CompositeBuffer implements Buffer {
+
+    private final DataType dataType;
+
+    private final int length;
+
+    private final boolean isCompressed;
+
+    private final List<Buffer> partialBuffers = new ArrayList<>();
+
+    private int currentLength;
+
+    private ByteBufAllocator allocator;
+
+    public CompositeBuffer(DataType dataType, int length, boolean 
isCompressed) {
+        this.dataType = checkNotNull(dataType);
+        this.length = length;
+        this.isCompressed = isCompressed;
+    }
+
+    public CompositeBuffer(BufferHeader header) {
+        this(header.getDataType(), header.getLength(), header.isCompressed());
+    }
+
+    @Override
+    public boolean isBuffer() {
+        return dataType.isBuffer();
+    }
+
+    @Override
+    public void recycleBuffer() {
+        for (Buffer partialBuffer : partialBuffers) {
+            partialBuffer.recycleBuffer();
+        }
+    }
+
+    @Override
+    public Buffer retainBuffer() {
+        for (Buffer partialBuffer : partialBuffers) {
+            partialBuffer.retainBuffer();
+        }
+        return this;
+    }
+
+    @Override
+    public int getSize() {
+        return currentLength;
+    }
+
+    @Override
+    public int readableBytes() {
+        return currentLength;
+    }
+
+    @Override
+    public void setAllocator(ByteBufAllocator allocator) {
+        this.allocator = allocator;
+    }
+
+    @Override
+    public ByteBuf asByteBuf() {
+        CompositeByteBuf compositeByteBuf = 
checkNotNull(allocator).compositeDirectBuffer();
+        for (Buffer buffer : partialBuffers) {
+            compositeByteBuf.addComponent(buffer.asByteBuf());
+        }
+        compositeByteBuf.writerIndex(currentLength);
+        return compositeByteBuf;
+    }
+
+    @Override
+    public boolean isCompressed() {
+        return isCompressed;
+    }
+
+    @Override
+    public DataType getDataType() {
+        return dataType;
+    }
+
+    public int numPartialBuffers() {
+        return partialBuffers.size();
+    }
+
+    /**
+     * Returns the full buffer data in one piece of {@link MemorySegment}. If 
there is multiple
+     * partial buffers, the partial data will be copied to the given target 
{@link MemorySegment}.
+     */
+    public Buffer getFullBufferData(MemorySegment segment) {
+        checkState(!partialBuffers.isEmpty());
+        checkState(currentLength <= segment.size());
+
+        if (partialBuffers.size() == 1) {
+            return partialBuffers.get(0);
+        }
+
+        int offset = 0;
+        for (Buffer buffer : partialBuffers) {
+            segment.put(offset, buffer.getNioBufferReadable(), 
buffer.readableBytes());
+            offset += buffer.readableBytes();
+        }
+        recycleBuffer();
+        return new NetworkBuffer(
+                segment,
+                BufferRecycler.DummyBufferRecycler.INSTANCE,
+                dataType,
+                isCompressed,
+                currentLength);
+    }
+
+    public void addPartialBuffer(Buffer buffer) {
+        buffer.setDataType(dataType);
+        buffer.setCompressed(isCompressed);
+        partialBuffers.add(buffer);
+        currentLength += buffer.readableBytes();
+        checkState(currentLength <= length);
+    }
+
+    public int missingLength() {
+        return length - currentLength;
+    }
+
+    @Override
+    public MemorySegment getMemorySegment() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getMemorySegmentOffset() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public BufferRecycler getRecycler() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isRecycled() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer readOnlySlice() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Buffer readOnlySlice(int index, int length) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getMaxCapacity() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int getReaderIndex() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setReaderIndex(int readerIndex) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setSize(int writerIndex) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ByteBuffer getNioBufferReadable() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public ByteBuffer getNioBuffer(int index, int length) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setCompressed(boolean isCompressed) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setDataType(DataType dataType) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int refCnt() {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
index df9dccb7314..4fccec09bdc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
@@ -44,6 +44,8 @@ public final class ReadOnlySlicedNetworkBuffer extends 
ReadOnlyByteBuf implement
 
     private boolean isCompressed = false;
 
+    private DataType dataType;
+
     /**
      * Creates a buffer which shares the memory segment of the given buffer 
and exposed the given
      * sub-region only.
@@ -58,6 +60,7 @@ public final class ReadOnlySlicedNetworkBuffer extends 
ReadOnlyByteBuf implement
     ReadOnlySlicedNetworkBuffer(NetworkBuffer buffer, int index, int length) {
         super(new SlicedByteBuf(buffer, index, length));
         this.memorySegmentOffset = buffer.getMemorySegmentOffset() + index;
+        this.dataType = buffer.getDataType();
     }
 
     /**
@@ -79,6 +82,7 @@ public final class ReadOnlySlicedNetworkBuffer extends 
ReadOnlyByteBuf implement
         super(new SlicedByteBuf(buffer, index, length));
         this.memorySegmentOffset = memorySegmentOffset + index;
         this.isCompressed = isCompressed;
+        this.dataType = getBuffer().getDataType();
     }
 
     @Override
@@ -88,7 +92,7 @@ public final class ReadOnlySlicedNetworkBuffer extends 
ReadOnlyByteBuf implement
 
     @Override
     public boolean isBuffer() {
-        return getBuffer().isBuffer();
+        return dataType.isBuffer();
     }
 
     /**
@@ -223,12 +227,12 @@ public final class ReadOnlySlicedNetworkBuffer extends 
ReadOnlyByteBuf implement
 
     @Override
     public DataType getDataType() {
-        return getBuffer().getDataType();
+        return dataType;
     }
 
     @Override
     public void setDataType(DataType dataType) {
-        throw new ReadOnlyBufferException();
+        this.dataType = dataType;
     }
 
     private Buffer getBuffer() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 81984569320..702e3e32e48 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FileRegionBuffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
@@ -345,7 +346,11 @@ public abstract class NettyMessage {
 
                 headerBuf = fillHeader(allocator);
                 out.write(headerBuf);
-                out.write(buffer, promise);
+                if (buffer instanceof FileRegionBuffer) {
+                    out.write(buffer, promise);
+                } else {
+                    out.write(buffer.asByteBuf(), promise);
+                }
             } catch (Throwable t) {
                 handleException(headerBuf, buffer, t);
             }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
index a84fe5fc706..64570067b80 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
@@ -20,13 +20,18 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferHeader;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.Queue;
+import java.util.function.Consumer;
 
 import static 
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.HEADER_LENGTH;
 import static 
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.readFromByteChannel;
@@ -120,6 +125,60 @@ class PartitionedFileReader {
         return buffer;
     }
 
+    boolean readCurrentRegion(
+            Queue<MemorySegment> freeSegments, BufferRecycler recycler, 
Consumer<Buffer> consumer)
+            throws IOException {
+        if (currentRegionRemainingBytes == 0) {
+            return false;
+        }
+
+        checkArgument(!freeSegments.isEmpty(), "No buffer available for data 
reading.");
+        dataFileChannel.position(nextOffsetToRead);
+
+        BufferAndHeader partialBuffer = new BufferAndHeader(null, null);
+        try {
+            while (!freeSegments.isEmpty() && currentRegionRemainingBytes > 0) 
{
+                MemorySegment segment = freeSegments.poll();
+                int numBytes = (int) Math.min(segment.size(), 
currentRegionRemainingBytes);
+                ByteBuffer byteBuffer = segment.wrap(0, numBytes);
+
+                try {
+                    
BufferReaderWriterUtil.readByteBufferFully(dataFileChannel, byteBuffer);
+                    byteBuffer.flip();
+                    currentRegionRemainingBytes -= byteBuffer.remaining();
+                    nextOffsetToRead += byteBuffer.remaining();
+                } catch (IOException exception) {
+                    freeSegments.add(segment);
+                    throw exception;
+                }
+
+                NetworkBuffer buffer = new NetworkBuffer(segment, recycler);
+                buffer.setSize(byteBuffer.remaining());
+                try {
+                    partialBuffer = processBuffer(byteBuffer, buffer, 
partialBuffer, consumer);
+                } finally {
+                    buffer.recycleBuffer();
+                }
+            }
+        } finally {
+            if (headerBuf.position() > 0) {
+                nextOffsetToRead -= headerBuf.position();
+                currentRegionRemainingBytes += headerBuf.position();
+                headerBuf.clear();
+            }
+            if (partialBuffer.header != null) {
+                nextOffsetToRead -= HEADER_LENGTH;
+                currentRegionRemainingBytes += HEADER_LENGTH;
+            }
+            if (partialBuffer.buffer != null) {
+                nextOffsetToRead -= partialBuffer.buffer.readableBytes();
+                currentRegionRemainingBytes += 
partialBuffer.buffer.readableBytes();
+                partialBuffer.buffer.recycleBuffer();
+            }
+        }
+        return hasRemaining();
+    }
+
     boolean hasRemaining() throws IOException {
         moveToNextReadableRegion();
         return currentRegionRemainingBytes > 0;
@@ -129,4 +188,75 @@ class PartitionedFileReader {
     long getPriority() {
         return nextOffsetToRead;
     }
+
+    private BufferAndHeader processBuffer(
+            ByteBuffer byteBuffer,
+            Buffer buffer,
+            BufferAndHeader partialBuffer,
+            Consumer<Buffer> consumer) {
+        BufferHeader header = partialBuffer.header;
+        CompositeBuffer targetBuffer = partialBuffer.buffer;
+        while (byteBuffer.hasRemaining()) {
+            if (header == null && (header = parseBufferHeader(byteBuffer)) == 
null) {
+                break;
+            }
+
+            if (targetBuffer != null) {
+                buffer.retainBuffer();
+                int position = byteBuffer.position() + 
targetBuffer.missingLength();
+                targetBuffer.addPartialBuffer(
+                        buffer.readOnlySlice(byteBuffer.position(), 
targetBuffer.missingLength()));
+                byteBuffer.position(position);
+            } else if (byteBuffer.remaining() < header.getLength()) {
+                if (byteBuffer.hasRemaining()) {
+                    buffer.retainBuffer();
+                    targetBuffer = new CompositeBuffer(header);
+                    targetBuffer.addPartialBuffer(
+                            buffer.readOnlySlice(byteBuffer.position(), 
byteBuffer.remaining()));
+                }
+                break;
+            } else {
+                buffer.retainBuffer();
+                targetBuffer = new CompositeBuffer(header);
+                targetBuffer.addPartialBuffer(
+                        buffer.readOnlySlice(byteBuffer.position(), 
header.getLength()));
+                byteBuffer.position(byteBuffer.position() + 
header.getLength());
+            }
+
+            header = null;
+            consumer.accept(targetBuffer);
+            targetBuffer = null;
+        }
+        return new BufferAndHeader(targetBuffer, header);
+    }
+
+    private BufferHeader parseBufferHeader(ByteBuffer buffer) {
+        BufferHeader header = null;
+        if (headerBuf.position() > 0) {
+            while (headerBuf.hasRemaining()) {
+                headerBuf.put(buffer.get());
+            }
+            headerBuf.flip();
+            header = BufferReaderWriterUtil.parseBufferHeader(headerBuf);
+            headerBuf.clear();
+        }
+
+        if (header == null && buffer.remaining() < HEADER_LENGTH) {
+            headerBuf.put(buffer);
+        } else if (header == null) {
+            header = BufferReaderWriterUtil.parseBufferHeader(buffer);
+        }
+        return header;
+    }
+
+    private static class BufferAndHeader {
+
+        private final CompositeBuffer buffer;
+        private final BufferHeader header;
+
+        BufferAndHeader(CompositeBuffer buffer, BufferHeader header) {
+            this.buffer = buffer;
+            this.header = header;
+        }
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
index 4644a98d65a..db9121cd6c6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
@@ -127,22 +127,7 @@ class SortMergeSubpartitionReader
 
     /** This method is called by the IO thread of {@link 
SortMergeResultPartitionReadScheduler}. */
     boolean readBuffers(Queue<MemorySegment> buffers, BufferRecycler recycler) 
throws IOException {
-        while (!buffers.isEmpty()) {
-            MemorySegment segment = buffers.poll();
-
-            Buffer buffer;
-            try {
-                if ((buffer = fileReader.readCurrentRegion(segment, recycler)) 
== null) {
-                    buffers.add(segment);
-                    break;
-                }
-            } catch (Throwable throwable) {
-                buffers.add(segment);
-                throw throwable;
-            }
-            addBuffer(buffer);
-        }
-        return fileReader.hasRemaining();
+        return fileReader.readCurrentRegion(buffers, recycler, 
this::addBuffer);
     }
 
     CompletableFuture<?> getReleaseFuture() {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index f409a68524f..c1a440864ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.TaskEventPublisher;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
 import org.apache.flink.runtime.io.network.buffer.FileRegionBuffer;
 import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
@@ -241,7 +242,11 @@ public class LocalInputChannel extends InputChannel 
implements BufferAvailabilit
             buffer = ((FileRegionBuffer) 
buffer).readInto(inputGate.getUnpooledSegment());
         }
 
-        numBytesIn.inc(buffer.getSize());
+        if (buffer instanceof CompositeBuffer) {
+            buffer = ((CompositeBuffer) 
buffer).getFullBufferData(inputGate.getUnpooledSegment());
+        }
+
+        numBytesIn.inc(buffer.readableBytes());
         numBuffersIn.inc();
         channelStatePersister.checkForBarrier(buffer);
         channelStatePersister.maybePersist(buffer);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
index ff66ca7dfbd..3b92d177618 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
@@ -26,12 +26,12 @@ import 
org.apache.flink.runtime.io.network.netty.NettyBufferPool;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.ReadOnlyBufferException;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -66,14 +66,20 @@ public class ReadOnlySlicedBufferTest {
         assertEquals(eventBuffer.isBuffer(), eventBuffer.readOnlySlice(1, 
2).isBuffer());
     }
 
-    @Test(expected = ReadOnlyBufferException.class)
-    public void testSetDataTypeThrows1() {
-        buffer.readOnlySlice().setDataType(Buffer.DataType.EVENT_BUFFER);
+    @Test
+    public void testSetDataType1() {
+        ReadOnlySlicedNetworkBuffer readOnlyBuffer = buffer.readOnlySlice();
+        readOnlyBuffer.setDataType(Buffer.DataType.EVENT_BUFFER);
+        
Assertions.assertThat(readOnlyBuffer.getDataType()).isEqualTo(Buffer.DataType.EVENT_BUFFER);
     }
 
-    @Test(expected = ReadOnlyBufferException.class)
-    public void testSetDataTypeThrows2() {
-        buffer.readOnlySlice(1, 2).setDataType(Buffer.DataType.EVENT_BUFFER);
+    @Test
+    public void testSetDataType2() {
+        ReadOnlySlicedNetworkBuffer readOnlyBuffer = buffer.readOnlySlice(1, 
2);
+        readOnlyBuffer.setDataType(Buffer.DataType.EVENT_BUFFER);
+        
Assertions.assertThat(readOnlyBuffer.getDataType()).isEqualTo(Buffer.DataType.EVENT_BUFFER);
+        Assertions.assertThat(buffer.readOnlySlice(1, 2).getDataType())
+                .isNotEqualTo(Buffer.DataType.EVENT_BUFFER);
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index ee442bcdc44..223f298e800 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
 import org.apache.flink.util.TestLogger;
 
 import org.assertj.core.api.Assertions;
@@ -49,11 +49,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link SortMergeResultPartitionReadScheduler}. */
 public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
@@ -62,7 +58,7 @@ public class SortMergeResultPartitionReadSchedulerTest 
extends TestLogger {
 
     private static final byte[] dataBytes = new byte[bufferSize];
 
-    private static final int totalBytes = bufferSize;
+    private static final int totalBytes = bufferSize * 2;
 
     private static final int numThreads = 4;
 
@@ -107,7 +103,7 @@ public class SortMergeResultPartitionReadSchedulerTest 
extends TestLogger {
         executor = Executors.newFixedThreadPool(numThreads);
         readScheduler =
                 new SortMergeResultPartitionReadScheduler(
-                        numSubpartitions, bufferPool, executor, this);
+                        numSubpartitions, bufferPool, executor, new Object());
     }
 
     @After
@@ -125,18 +121,21 @@ public class SortMergeResultPartitionReadSchedulerTest 
extends TestLogger {
                 readScheduler.createSubpartitionReader(
                         new NoOpBufferAvailablityListener(), 0, 
partitionedFile);
 
-        assertTrue(readScheduler.isRunning());
-        assertTrue(readScheduler.getDataFileChannel().isOpen());
-        assertTrue(readScheduler.getIndexFileChannel().isOpen());
+        assertThat(readScheduler.isRunning()).isTrue();
+        assertThat(readScheduler.getDataFileChannel().isOpen()).isTrue();
+        assertThat(readScheduler.getIndexFileChannel().isOpen()).isTrue();
 
         int numBuffersRead = 0;
         while (numBuffersRead < numBuffersPerSubpartition) {
             ResultSubpartition.BufferAndBacklog bufferAndBacklog =
                     subpartitionReader.getNextBuffer();
             if (bufferAndBacklog != null) {
-                Buffer buffer = bufferAndBacklog.buffer();
-                assertEquals(ByteBuffer.wrap(dataBytes), 
buffer.getNioBufferReadable());
-                buffer.recycleBuffer();
+                int numBytes = bufferAndBacklog.buffer().readableBytes();
+                MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
+                Buffer fullBuffer =
+                        ((CompositeBuffer) 
bufferAndBacklog.buffer()).getFullBufferData(segment);
+                
assertThat(ByteBuffer.wrap(dataBytes)).isEqualTo(fullBuffer.getNioBufferReadable());
+                fullBuffer.recycleBuffer();
                 ++numBuffersRead;
             }
         }
@@ -162,10 +161,10 @@ public class SortMergeResultPartitionReadSchedulerTest 
extends TestLogger {
         Thread.sleep(1000);
         readScheduler.release();
 
-        assertNotNull(subpartitionReader.getFailureCause());
-        assertTrue(subpartitionReader.isReleased());
-        assertEquals(0, 
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
-        
assertTrue(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
+        assertThat(subpartitionReader.getFailureCause()).isNotNull();
+        assertThat(subpartitionReader.isReleased()).isTrue();
+        
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
+        
assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
 
         readScheduler.getReleaseFuture().get();
         assertAllResourcesReleased();
@@ -201,8 +200,8 @@ public class SortMergeResultPartitionReadSchedulerTest 
extends TestLogger {
         }
 
         waitUntilReadFinish();
-        assertNotNull(subpartitionReader.getFailureCause());
-        
assertTrue(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
+        assertThat(subpartitionReader.getFailureCause()).isNotNull();
+        
assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
         assertAllResourcesReleased();
     }
 
@@ -215,14 +214,15 @@ public class SortMergeResultPartitionReadSchedulerTest 
extends TestLogger {
         bufferPool.destroy();
         waitUntilReadFinish();
 
-        assertTrue(subpartitionReader.isReleased());
-        assertNotNull(subpartitionReader.getFailureCause());
-        
assertTrue(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
+        assertThat(subpartitionReader.isReleased()).isTrue();
+        assertThat(subpartitionReader.getFailureCause()).isNotNull();
+        
assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
         assertAllResourcesReleased();
     }
 
     @Test(timeout = 60000)
     public void testNoDeadlockWhenReadAndReleaseBuffers() throws Exception {
+        bufferPool.initialize();
         SortMergeSubpartitionReader subpartitionReader =
                 new SortMergeSubpartitionReader(new 
NoOpBufferAvailablityListener(), fileReader);
         Thread readAndReleaseThread =
@@ -231,7 +231,7 @@ public class SortMergeResultPartitionReadSchedulerTest 
extends TestLogger {
                             Queue<MemorySegment> segments = new ArrayDeque<>();
                             
segments.add(MemorySegmentFactory.allocateUnpooledSegment(bufferSize));
                             try {
-                                assertTrue(fileReader.hasRemaining());
+                                assertThat(fileReader.hasRemaining()).isTrue();
                                 subpartitionReader.readBuffers(segments, 
readScheduler);
                                 subpartitionReader.releaseAllResources();
                                 subpartitionReader.readBuffers(segments, 
readScheduler);
@@ -260,7 +260,7 @@ public class SortMergeResultPartitionReadSchedulerTest 
extends TestLogger {
         Assertions.assertThatThrownBy(readScheduler::allocateBuffers)
                 .isInstanceOf(TimeoutException.class);
         long requestDuration = System.nanoTime() - startTimestamp;
-        Assertions.assertThat(requestDuration > 
bufferRequestTimeout.toNanos()).isTrue();
+        assertThat(requestDuration > bufferRequestTimeout.toNanos()).isTrue();
 
         bufferPool.recycle(buffers);
         readScheduler.release();
@@ -281,9 +281,9 @@ public class SortMergeResultPartitionReadSchedulerTest 
extends TestLogger {
         Queue<MemorySegment> allocatedBuffers = 
readScheduler.allocateBuffers();
         long requestDuration = System.nanoTime() - startTimestamp;
 
-        assertEquals(3, allocatedBuffers.size());
-        assertTrue(requestDuration > bufferRequestTimeout.toNanos() * 2);
-        assertNull(subpartitionReader.getFailureCause());
+        assertThat(allocatedBuffers.size()).isEqualTo(3);
+        assertThat(requestDuration > bufferRequestTimeout.toNanos() * 
2).isTrue();
+        assertThat(subpartitionReader.getFailureCause()).isNull();
 
         bufferPool.recycle(allocatedBuffers);
         bufferPool.destroy();
@@ -316,21 +316,14 @@ public class SortMergeResultPartitionReadSchedulerTest 
extends TestLogger {
         return FileChannel.open(path, StandardOpenOption.READ);
     }
 
-    private static void assertExpectedTimeoutException(Throwable throwable) {
-        assertNotNull(throwable);
-        assertTrue(
-                ExceptionUtils.findThrowableWithMessage(throwable, "Buffer 
request timeout")
-                        .isPresent());
-    }
-
     private void assertAllResourcesReleased() {
-        assertNull(readScheduler.getDataFileChannel());
-        assertNull(readScheduler.getIndexFileChannel());
-        assertFalse(readScheduler.isRunning());
-        assertEquals(0, readScheduler.getNumPendingReaders());
+        assertThat(readScheduler.getDataFileChannel()).isNull();
+        assertThat(readScheduler.getIndexFileChannel()).isNull();
+        assertThat(readScheduler.isRunning()).isFalse();
+        assertThat(readScheduler.getNumPendingReaders()).isEqualTo(0);
 
         if (!bufferPool.isDestroyed()) {
-            assertEquals(bufferPool.getNumTotalBuffers(), 
bufferPool.getAvailableBuffers());
+            
assertThat(bufferPool.getNumTotalBuffers()).isEqualTo(bufferPool.getAvailableBuffers());
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
index 646b66de791..f4650e75df1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.util.TestLogger;
@@ -178,10 +179,19 @@ public class SortMergeResultPartitionTest extends 
TestLogger {
                     numBytesRead[subpartition] += numBytes;
 
                     MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
-                    segment.put(0, buffer.getNioBufferReadable(), numBytes);
+                    Buffer fullBuffer =
+                            ((CompositeBuffer) buffer)
+                                    .getFullBufferData(
+                                            
MemorySegmentFactory.allocateUnpooledSegment(numBytes));
+                    segment.put(0, fullBuffer.getNioBufferReadable(), 
fullBuffer.readableBytes());
                     buffersRead[subpartition].add(
                             new NetworkBuffer(
-                                    segment, (buf) -> {}, 
buffer.getDataType(), numBytes));
+                                    segment,
+                                    ignore -> {},
+                                    fullBuffer.getDataType(),
+                                    fullBuffer.isCompressed(),
+                                    fullBuffer.readableBytes()));
+                    fullBuffer.recycleBuffer();
                 });
         DataBufferTest.checkWriteReadResult(
                 numSubpartitions, numBytesWritten, numBytesRead, dataWritten, 
buffersRead);
@@ -219,7 +229,6 @@ public class SortMergeResultPartitionTest extends 
TestLogger {
                     Buffer buffer = bufferAndBacklog.buffer();
                     bufferProcessor.accept(new BufferWithChannel(buffer, 
subpartition));
                     dataSize += buffer.readableBytes();
-                    buffer.recycleBuffer();
 
                     if (!buffer.isBuffer()) {
                         ++numEndOfPartitionEvents;
@@ -263,9 +272,17 @@ public class SortMergeResultPartitionTest extends 
TestLogger {
                 new ResultSubpartitionView[] {view},
                 bufferWithChannel -> {
                     Buffer buffer = bufferWithChannel.getBuffer();
-                    if (buffer.isBuffer()) {
-                        recordRead.put(buffer.getNioBufferReadable());
+                    int numBytes = buffer.readableBytes();
+
+                    MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
+                    Buffer fullBuffer = ((CompositeBuffer) 
buffer).getFullBufferData(segment);
+                    if (fullBuffer.isBuffer()) {
+                        ByteBuffer byteBuffer =
+                                ByteBuffer.allocate(fullBuffer.readableBytes())
+                                        
.put(fullBuffer.getNioBufferReadable());
+                        recordRead.put((ByteBuffer) byteBuffer.flip());
                     }
+                    fullBuffer.recycleBuffer();
                 });
         recordWritten.rewind();
         recordRead.flip();
@@ -300,7 +317,12 @@ public class SortMergeResultPartitionTest extends 
TestLogger {
         }
 
         ResultSubpartitionView[] views = createSubpartitionViews(partition, 
numSubpartitions);
-        long dataRead = readData(views, (ignored) -> {});
+        long dataRead =
+                readData(
+                        views,
+                        bufferWithChannel -> {
+                            bufferWithChannel.getBuffer().recycleBuffer();
+                        });
         assertEquals(dataSize, dataRead);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
index ac7acc80a87..cf28333de1f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.TestLogger;
@@ -43,11 +44,7 @@ import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link SortMergeSubpartitionReader}. */
 public class SortMergeSubpartitionReaderTest extends TestLogger {
@@ -97,22 +94,22 @@ public class SortMergeSubpartitionReaderTest extends 
TestLogger {
         SortMergeSubpartitionReader subpartitionReader =
                 createSortMergeSubpartitionReader(listener);
 
-        assertEquals(0, listener.numNotifications);
-        assertEquals(0, 
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
+        assertThat(listener.numNotifications).isEqualTo(0);
+        
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
 
         Queue<MemorySegment> segments = createsMemorySegments(2);
         subpartitionReader.readBuffers(segments, 
FreeingBufferRecycler.INSTANCE);
 
-        assertEquals(1, listener.numNotifications);
-        assertEquals(2, 
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
-        assertEquals(0, segments.size());
+        assertThat(listener.numNotifications).isEqualTo(1);
+        
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(1);
+        assertThat(segments.size()).isEqualTo(0);
 
         segments = createsMemorySegments(2);
         subpartitionReader.readBuffers(segments, 
FreeingBufferRecycler.INSTANCE);
 
-        assertEquals(1, listener.numNotifications);
-        assertEquals(4, 
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
-        assertEquals(0, segments.size());
+        assertThat(listener.numNotifications).isEqualTo(1);
+        
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(2);
+        assertThat(segments.size()).isEqualTo(0);
 
         while (subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers() > 
0) {
             
checkNotNull(subpartitionReader.getNextBuffer()).buffer().recycleBuffer();
@@ -121,11 +118,10 @@ public class SortMergeSubpartitionReaderTest extends 
TestLogger {
         segments = createsMemorySegments(numBuffersPerSubpartition);
         subpartitionReader.readBuffers(segments, 
FreeingBufferRecycler.INSTANCE);
 
-        assertEquals(2, listener.numNotifications);
-        assertEquals(
-                numBuffersPerSubpartition - 4,
-                subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
-        assertEquals(4, segments.size());
+        assertThat(listener.numNotifications).isEqualTo(2);
+        assertThat(numBuffersPerSubpartition - 2)
+                
.isEqualTo(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
+        assertThat(segments.size()).isEqualTo(1);
     }
 
     @Test
@@ -133,24 +129,28 @@ public class SortMergeSubpartitionReaderTest extends 
TestLogger {
         SortMergeSubpartitionReader subpartitionReader =
                 createSortMergeSubpartitionReader(new 
CountingAvailabilityListener());
 
-        assertNull(subpartitionReader.getNextBuffer());
-        
assertFalse(subpartitionReader.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable());
+        assertThat(subpartitionReader.getNextBuffer()).isNull();
+        
assertThat(subpartitionReader.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable())
+                .isFalse();
 
         Queue<MemorySegment> segments = 
createsMemorySegments(numBuffersPerSubpartition);
         subpartitionReader.readBuffers(segments, 
FreeingBufferRecycler.INSTANCE);
 
         for (int i = numBuffersPerSubpartition - 1; i >= 0; --i) {
-            
assertTrue(subpartitionReader.getAvailabilityAndBacklog(i).isAvailable());
+            if 
(!subpartitionReader.getAvailabilityAndBacklog(i).isAvailable()) {
+                continue;
+            }
             ResultSubpartition.BufferAndBacklog bufferAndBacklog =
                     checkNotNull(subpartitionReader.getNextBuffer());
-            assertEquals(
-                    ByteBuffer.wrap(dataBytes), 
bufferAndBacklog.buffer().getNioBufferReadable());
-            assertEquals(bufferAndBacklog.buffersInBacklog(), i == 0 ? 0 : i - 
1);
-            Buffer.DataType dataType =
-                    i == 0
-                            ? Buffer.DataType.NONE
-                            : (i > 1 ? Buffer.DataType.DATA_BUFFER : 
Buffer.DataType.EVENT_BUFFER);
-            assertEquals(dataType, bufferAndBacklog.getNextDataType());
+            int numBytes = bufferAndBacklog.buffer().readableBytes();
+            MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
+            Buffer fullBuffer =
+                    ((CompositeBuffer) 
bufferAndBacklog.buffer()).getFullBufferData(segment);
+            
assertThat(ByteBuffer.wrap(dataBytes)).isEqualTo(fullBuffer.getNioBufferReadable());
+            assertThat(bufferAndBacklog.buffersInBacklog()).isEqualTo(i == 0 ? 
0 : i - 1);
+            Buffer.DataType dataType = i <= 1 ? Buffer.DataType.NONE : 
Buffer.DataType.DATA_BUFFER;
+            assertThat(dataType).isEqualTo(bufferAndBacklog.getNextDataType());
+            fullBuffer.recycleBuffer();
         }
     }
 
@@ -165,19 +165,19 @@ public class SortMergeSubpartitionReaderTest extends 
TestLogger {
                     createSortMergeSubpartitionReader(listener);
 
             subpartitionReader.readBuffers(segments, segments::add);
-            assertEquals(1, listener.numNotifications);
-            assertEquals(5, 
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
+            assertThat(listener.numNotifications).isEqualTo(1);
+            
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(4);
 
             subpartitionReader.fail(new RuntimeException("Test exception."));
-            assertTrue(subpartitionReader.getReleaseFuture().isDone());
-            assertEquals(0, 
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
-            
assertTrue(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
-            assertTrue(subpartitionReader.isReleased());
+            
assertThat(subpartitionReader.getReleaseFuture().isDone()).isTrue();
+            
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
+            
assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
+            assertThat(subpartitionReader.isReleased()).isTrue();
 
-            assertEquals(2, listener.numNotifications);
-            assertNotNull(subpartitionReader.getFailureCause());
+            assertThat(listener.numNotifications).isEqualTo(2);
+            assertThat(subpartitionReader.getFailureCause()).isNotNull();
         } finally {
-            assertEquals(numSegments, segments.size());
+            assertThat(numSegments).isEqualTo(segments.size());
         }
     }
 
@@ -192,19 +192,19 @@ public class SortMergeSubpartitionReaderTest extends 
TestLogger {
                     createSortMergeSubpartitionReader(listener);
 
             subpartitionReader.readBuffers(segments, segments::add);
-            assertEquals(1, listener.numNotifications);
-            assertEquals(5, 
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
+            assertThat(listener.numNotifications).isEqualTo(1);
+            
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(4);
 
             subpartitionReader.releaseAllResources();
-            assertTrue(subpartitionReader.getReleaseFuture().isDone());
-            assertEquals(0, 
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
-            
assertTrue(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
-            assertTrue(subpartitionReader.isReleased());
+            
assertThat(subpartitionReader.getReleaseFuture().isDone()).isTrue();
+            
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
+            
assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
+            assertThat(subpartitionReader.isReleased()).isTrue();
 
-            assertEquals(1, listener.numNotifications);
-            assertNull(subpartitionReader.getFailureCause());
+            assertThat(listener.numNotifications).isEqualTo(1);
+            assertThat(subpartitionReader.getFailureCause()).isNull();
         } finally {
-            assertEquals(numSegments, segments.size());
+            assertThat(numSegments).isEqualTo(segments.size());
         }
     }
 
@@ -221,7 +221,7 @@ public class SortMergeSubpartitionReaderTest extends 
TestLogger {
             subpartitionReader.releaseAllResources();
             subpartitionReader.readBuffers(segments, segments::add);
         } finally {
-            assertEquals(numSegments, segments.size());
+            assertThat(numSegments).isEqualTo(segments.size());
         }
     }
 
@@ -233,16 +233,17 @@ public class SortMergeSubpartitionReaderTest extends 
TestLogger {
         Queue<MemorySegment> segments = 
createsMemorySegments(numBuffersPerSubpartition);
         subpartitionReader.readBuffers(segments, 
FreeingBufferRecycler.INSTANCE);
 
-        
assertTrue(subpartitionReader.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable());
+        
assertThat(subpartitionReader.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable())
+                .isTrue();
         subpartitionReader.releaseAllResources();
-        assertNull(subpartitionReader.getNextBuffer());
+        assertThat(subpartitionReader.getNextBuffer()).isNull();
     }
 
     private SortMergeSubpartitionReader createSortMergeSubpartitionReader(
             BufferAvailabilityListener listener) throws Exception {
         PartitionedFileReader fileReader =
                 new PartitionedFileReader(partitionedFile, 0, dataFileChannel, 
indexFileChannel);
-        assertTrue(fileReader.hasRemaining());
+        assertThat(fileReader.hasRemaining()).isTrue();
         return new SortMergeSubpartitionReader(listener, fileReader);
     }
 

Reply via email to