This is an automated email from the ASF dual-hosted git repository.
yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d6a47d897a9 [FLINK-28373][network] Read a full buffer of data per file
IO read request for sort-shuffle
d6a47d897a9 is described below
commit d6a47d897a9a4753c800b39adb17c06e154422cc
Author: Yuxin Tan <[email protected]>
AuthorDate: Tue Aug 9 14:48:39 2022 +0800
[FLINK-28373][network] Read a full buffer of data per file IO read request
for sort-shuffle
This closes #20457.
---
.../runtime/io/network/buffer/CompositeBuffer.java | 233 +++++++++++++++++++++
.../buffer/ReadOnlySlicedNetworkBuffer.java | 10 +-
.../runtime/io/network/netty/NettyMessage.java | 7 +-
.../network/partition/PartitionedFileReader.java | 143 +++++++++++--
.../partition/SortMergeSubpartitionReader.java | 17 +-
.../partition/consumer/LocalInputChannel.java | 7 +-
.../network/buffer/ReadOnlySlicedBufferTest.java | 20 +-
.../partition/PartitionedFileWriteReadTest.java | 80 +++++--
.../SortMergeResultPartitionReadSchedulerTest.java | 74 +++----
.../partition/SortMergeResultPartitionTest.java | 34 ++-
.../partition/SortMergeSubpartitionReaderTest.java | 103 ++++-----
11 files changed, 573 insertions(+), 155 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/CompositeBuffer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/CompositeBuffer.java
new file mode 100644
index 00000000000..7444704b0ad
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/CompositeBuffer.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.CompositeByteBuf;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of {@link Buffer} which contains multiple partial buffers
for network data
+ * communication.
+ */
+public class CompositeBuffer implements Buffer {
+
+ private final DataType dataType;
+
+ private final int length;
+
+ private final boolean isCompressed;
+
+ private final List<Buffer> partialBuffers = new ArrayList<>();
+
+ private int currentLength;
+
+ private ByteBufAllocator allocator;
+
+ public CompositeBuffer(DataType dataType, int length, boolean
isCompressed) {
+ this.dataType = checkNotNull(dataType);
+ this.length = length;
+ this.isCompressed = isCompressed;
+ }
+
+ public CompositeBuffer(BufferHeader header) {
+ this(header.getDataType(), header.getLength(), header.isCompressed());
+ }
+
+ @Override
+ public boolean isBuffer() {
+ return dataType.isBuffer();
+ }
+
+ @Override
+ public void recycleBuffer() {
+ for (Buffer partialBuffer : partialBuffers) {
+ partialBuffer.recycleBuffer();
+ }
+ }
+
+ @Override
+ public Buffer retainBuffer() {
+ for (Buffer partialBuffer : partialBuffers) {
+ partialBuffer.retainBuffer();
+ }
+ return this;
+ }
+
+ @Override
+ public int getSize() {
+ return currentLength;
+ }
+
+ @Override
+ public int readableBytes() {
+ return currentLength;
+ }
+
+ @Override
+ public void setAllocator(ByteBufAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ @Override
+ public ByteBuf asByteBuf() {
+ CompositeByteBuf compositeByteBuf =
checkNotNull(allocator).compositeDirectBuffer();
+ for (Buffer buffer : partialBuffers) {
+ compositeByteBuf.addComponent(buffer.asByteBuf());
+ }
+ compositeByteBuf.writerIndex(currentLength);
+ return compositeByteBuf;
+ }
+
+ @Override
+ public boolean isCompressed() {
+ return isCompressed;
+ }
+
+ @Override
+ public DataType getDataType() {
+ return dataType;
+ }
+
+ public int numPartialBuffers() {
+ return partialBuffers.size();
+ }
+
+ /**
+ * Returns the full buffer data in one piece of {@link MemorySegment}. If
there is multiple
+ * partial buffers, the partial data will be copied to the given target
{@link MemorySegment}.
+ */
+ public Buffer getFullBufferData(MemorySegment segment) {
+ checkState(!partialBuffers.isEmpty());
+ checkState(currentLength <= segment.size());
+
+ if (partialBuffers.size() == 1) {
+ return partialBuffers.get(0);
+ }
+
+ int offset = 0;
+ for (Buffer buffer : partialBuffers) {
+ segment.put(offset, buffer.getNioBufferReadable(),
buffer.readableBytes());
+ offset += buffer.readableBytes();
+ }
+ recycleBuffer();
+ return new NetworkBuffer(
+ segment,
+ BufferRecycler.DummyBufferRecycler.INSTANCE,
+ dataType,
+ isCompressed,
+ currentLength);
+ }
+
+ public void addPartialBuffer(Buffer buffer) {
+ buffer.setDataType(dataType);
+ buffer.setCompressed(isCompressed);
+ partialBuffers.add(buffer);
+ currentLength += buffer.readableBytes();
+ checkState(currentLength <= length);
+ }
+
+ public int missingLength() {
+ return length - currentLength;
+ }
+
+ @Override
+ public MemorySegment getMemorySegment() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getMemorySegmentOffset() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BufferRecycler getRecycler() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isRecycled() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Buffer readOnlySlice() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Buffer readOnlySlice(int index, int length) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getMaxCapacity() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getReaderIndex() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setReaderIndex(int readerIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setSize(int writerIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ByteBuffer getNioBufferReadable() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ByteBuffer getNioBuffer(int index, int length) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setCompressed(boolean isCompressed) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setDataType(DataType dataType) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int refCnt() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
index df9dccb7314..4fccec09bdc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
@@ -44,6 +44,8 @@ public final class ReadOnlySlicedNetworkBuffer extends
ReadOnlyByteBuf implement
private boolean isCompressed = false;
+ private DataType dataType;
+
/**
* Creates a buffer which shares the memory segment of the given buffer
and exposed the given
* sub-region only.
@@ -58,6 +60,7 @@ public final class ReadOnlySlicedNetworkBuffer extends
ReadOnlyByteBuf implement
ReadOnlySlicedNetworkBuffer(NetworkBuffer buffer, int index, int length) {
super(new SlicedByteBuf(buffer, index, length));
this.memorySegmentOffset = buffer.getMemorySegmentOffset() + index;
+ this.dataType = buffer.getDataType();
}
/**
@@ -79,6 +82,7 @@ public final class ReadOnlySlicedNetworkBuffer extends
ReadOnlyByteBuf implement
super(new SlicedByteBuf(buffer, index, length));
this.memorySegmentOffset = memorySegmentOffset + index;
this.isCompressed = isCompressed;
+ this.dataType = getBuffer().getDataType();
}
@Override
@@ -88,7 +92,7 @@ public final class ReadOnlySlicedNetworkBuffer extends
ReadOnlyByteBuf implement
@Override
public boolean isBuffer() {
- return getBuffer().isBuffer();
+ return dataType.isBuffer();
}
/**
@@ -223,12 +227,12 @@ public final class ReadOnlySlicedNetworkBuffer extends
ReadOnlyByteBuf implement
@Override
public DataType getDataType() {
- return getBuffer().getDataType();
+ return dataType;
}
@Override
public void setDataType(DataType dataType) {
- throw new ReadOnlyBufferException();
+ this.dataType = dataType;
}
private Buffer getBuffer() {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 81984569320..702e3e32e48 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FileRegionBuffer;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
@@ -345,7 +346,11 @@ public abstract class NettyMessage {
headerBuf = fillHeader(allocator);
out.write(headerBuf);
- out.write(buffer, promise);
+ if (buffer instanceof FileRegionBuffer) {
+ out.write(buffer, promise);
+ } else {
+ out.write(buffer.asByteBuf(), promise);
+ }
} catch (Throwable t) {
handleException(headerBuf, buffer, t);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
index a84fe5fc706..b5a43bf6c42 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
@@ -20,16 +20,18 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferHeader;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-
-import javax.annotation.Nullable;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.Queue;
+import java.util.function.Consumer;
import static
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.HEADER_LENGTH;
-import static
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.readFromByteChannel;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -100,24 +102,66 @@ class PartitionedFileReader {
*
* <p>Note: The caller is responsible for recycling the target buffer if
any exception occurs.
*
- * @param target The target {@link MemorySegment} to read data to.
+ * @param freeSegments The free {@link MemorySegment}s to read data to.
* @param recycler The {@link BufferRecycler} which is responsible to
recycle the target buffer.
- * @return A {@link Buffer} containing the data read.
+ * @param consumer The target {@link Buffer} stores the data read from
file channel.
+ * @return Whether the file reader has remaining data to read.
*/
- @Nullable
- Buffer readCurrentRegion(MemorySegment target, BufferRecycler recycler)
throws IOException {
+ boolean readCurrentRegion(
+ Queue<MemorySegment> freeSegments, BufferRecycler recycler,
Consumer<Buffer> consumer)
+ throws IOException {
if (currentRegionRemainingBytes == 0) {
- return null;
+ return false;
}
+ checkArgument(!freeSegments.isEmpty(), "No buffer available for data
reading.");
dataFileChannel.position(nextOffsetToRead);
- Buffer buffer = readFromByteChannel(dataFileChannel, headerBuf,
target, recycler);
- if (buffer != null) {
- int length = buffer.readableBytes() + HEADER_LENGTH;
- nextOffsetToRead = nextOffsetToRead + length;
- currentRegionRemainingBytes -= length;
+
+ BufferAndHeader partialBuffer = new BufferAndHeader(null, null);
+ try {
+ while (!freeSegments.isEmpty() && currentRegionRemainingBytes > 0)
{
+ MemorySegment segment = freeSegments.poll();
+ int numBytes = (int) Math.min(segment.size(),
currentRegionRemainingBytes);
+ ByteBuffer byteBuffer = segment.wrap(0, numBytes);
+
+ try {
+
BufferReaderWriterUtil.readByteBufferFully(dataFileChannel, byteBuffer);
+ byteBuffer.flip();
+ currentRegionRemainingBytes -= byteBuffer.remaining();
+ nextOffsetToRead += byteBuffer.remaining();
+ } catch (Throwable throwable) {
+ freeSegments.add(segment);
+ throw throwable;
+ }
+
+ NetworkBuffer buffer = new NetworkBuffer(segment, recycler);
+ buffer.setSize(byteBuffer.remaining());
+ try {
+ partialBuffer = processBuffer(byteBuffer, buffer,
partialBuffer, consumer);
+ } catch (Throwable throwable) {
+ partialBuffer = new BufferAndHeader(null, null);
+ throw throwable;
+ } finally {
+ buffer.recycleBuffer();
+ }
+ }
+ } finally {
+ if (headerBuf.position() > 0) {
+ nextOffsetToRead -= headerBuf.position();
+ currentRegionRemainingBytes += headerBuf.position();
+ headerBuf.clear();
+ }
+ if (partialBuffer.header != null) {
+ nextOffsetToRead -= HEADER_LENGTH;
+ currentRegionRemainingBytes += HEADER_LENGTH;
+ }
+ if (partialBuffer.buffer != null) {
+ nextOffsetToRead -= partialBuffer.buffer.readableBytes();
+ currentRegionRemainingBytes +=
partialBuffer.buffer.readableBytes();
+ partialBuffer.buffer.recycleBuffer();
+ }
}
- return buffer;
+ return hasRemaining();
}
boolean hasRemaining() throws IOException {
@@ -129,4 +173,75 @@ class PartitionedFileReader {
long getPriority() {
return nextOffsetToRead;
}
+
+ private BufferAndHeader processBuffer(
+ ByteBuffer byteBuffer,
+ Buffer buffer,
+ BufferAndHeader partialBuffer,
+ Consumer<Buffer> consumer) {
+ BufferHeader header = partialBuffer.header;
+ CompositeBuffer targetBuffer = partialBuffer.buffer;
+ while (byteBuffer.hasRemaining()) {
+ if (header == null && (header = parseBufferHeader(byteBuffer)) ==
null) {
+ break;
+ }
+
+ if (targetBuffer != null) {
+ buffer.retainBuffer();
+ int position = byteBuffer.position() +
targetBuffer.missingLength();
+ targetBuffer.addPartialBuffer(
+ buffer.readOnlySlice(byteBuffer.position(),
targetBuffer.missingLength()));
+ byteBuffer.position(position);
+ } else if (byteBuffer.remaining() < header.getLength()) {
+ if (byteBuffer.hasRemaining()) {
+ buffer.retainBuffer();
+ targetBuffer = new CompositeBuffer(header);
+ targetBuffer.addPartialBuffer(
+ buffer.readOnlySlice(byteBuffer.position(),
byteBuffer.remaining()));
+ }
+ break;
+ } else {
+ buffer.retainBuffer();
+ targetBuffer = new CompositeBuffer(header);
+ targetBuffer.addPartialBuffer(
+ buffer.readOnlySlice(byteBuffer.position(),
header.getLength()));
+ byteBuffer.position(byteBuffer.position() +
header.getLength());
+ }
+
+ header = null;
+ consumer.accept(targetBuffer);
+ targetBuffer = null;
+ }
+ return new BufferAndHeader(targetBuffer, header);
+ }
+
+ private BufferHeader parseBufferHeader(ByteBuffer buffer) {
+ BufferHeader header = null;
+ if (headerBuf.position() > 0) {
+ while (headerBuf.hasRemaining()) {
+ headerBuf.put(buffer.get());
+ }
+ headerBuf.flip();
+ header = BufferReaderWriterUtil.parseBufferHeader(headerBuf);
+ headerBuf.clear();
+ }
+
+ if (header == null && buffer.remaining() < HEADER_LENGTH) {
+ headerBuf.put(buffer);
+ } else if (header == null) {
+ header = BufferReaderWriterUtil.parseBufferHeader(buffer);
+ }
+ return header;
+ }
+
+ private static class BufferAndHeader {
+
+ private final CompositeBuffer buffer;
+ private final BufferHeader header;
+
+ BufferAndHeader(CompositeBuffer buffer, BufferHeader header) {
+ this.buffer = buffer;
+ this.header = header;
+ }
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
index 589fd6962bd..3cf7a08b77f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
@@ -127,22 +127,7 @@ class SortMergeSubpartitionReader
/** This method is called by the IO thread of {@link
SortMergeResultPartitionReadScheduler}. */
boolean readBuffers(Queue<MemorySegment> buffers, BufferRecycler recycler)
throws IOException {
- while (!buffers.isEmpty()) {
- MemorySegment segment = buffers.poll();
-
- Buffer buffer;
- try {
- if ((buffer = fileReader.readCurrentRegion(segment, recycler))
== null) {
- buffers.add(segment);
- break;
- }
- } catch (Throwable throwable) {
- buffers.add(segment);
- throw throwable;
- }
- addBuffer(buffer);
- }
- return fileReader.hasRemaining();
+ return fileReader.readCurrentRegion(buffers, recycler,
this::addBuffer);
}
CompletableFuture<?> getReleaseFuture() {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index f409a68524f..c1a440864ed 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.buffer.FileRegionBuffer;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
@@ -241,7 +242,11 @@ public class LocalInputChannel extends InputChannel
implements BufferAvailabilit
buffer = ((FileRegionBuffer)
buffer).readInto(inputGate.getUnpooledSegment());
}
- numBytesIn.inc(buffer.getSize());
+ if (buffer instanceof CompositeBuffer) {
+ buffer = ((CompositeBuffer)
buffer).getFullBufferData(inputGate.getUnpooledSegment());
+ }
+
+ numBytesIn.inc(buffer.readableBytes());
numBuffersIn.inc();
channelStatePersister.checkForBarrier(buffer);
channelStatePersister.maybePersist(buffer);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
index ff66ca7dfbd..3b92d177618 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
@@ -26,12 +26,12 @@ import
org.apache.flink.runtime.io.network.netty.NettyBufferPool;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.ReadOnlyBufferException;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -66,14 +66,20 @@ public class ReadOnlySlicedBufferTest {
assertEquals(eventBuffer.isBuffer(), eventBuffer.readOnlySlice(1,
2).isBuffer());
}
- @Test(expected = ReadOnlyBufferException.class)
- public void testSetDataTypeThrows1() {
- buffer.readOnlySlice().setDataType(Buffer.DataType.EVENT_BUFFER);
+ @Test
+ public void testSetDataType1() {
+ ReadOnlySlicedNetworkBuffer readOnlyBuffer = buffer.readOnlySlice();
+ readOnlyBuffer.setDataType(Buffer.DataType.EVENT_BUFFER);
+
Assertions.assertThat(readOnlyBuffer.getDataType()).isEqualTo(Buffer.DataType.EVENT_BUFFER);
}
- @Test(expected = ReadOnlyBufferException.class)
- public void testSetDataTypeThrows2() {
- buffer.readOnlySlice(1, 2).setDataType(Buffer.DataType.EVENT_BUFFER);
+ @Test
+ public void testSetDataType2() {
+ ReadOnlySlicedNetworkBuffer readOnlyBuffer = buffer.readOnlySlice(1,
2);
+ readOnlyBuffer.setDataType(Buffer.DataType.EVENT_BUFFER);
+
Assertions.assertThat(readOnlyBuffer.getDataType()).isEqualTo(Buffer.DataType.EVENT_BUFFER);
+ Assertions.assertThat(buffer.readOnlySlice(1, 2).getDataType())
+ .isNotEqualTo(Buffer.DataType.EVENT_BUFFER);
}
@Test
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
index 53c1f2c3524..adfe71792d5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.util.IOUtils;
@@ -36,14 +37,13 @@ import java.nio.file.StandardOpenOption;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests for writing and reading {@link PartitionedFile} with {@link
PartitionedFileWriter} and
@@ -112,15 +112,18 @@ public class PartitionedFileWriteReadTest {
new PartitionedFileReader(
partitionedFile, subpartition, dataFileChannel,
indexFileChannel);
while (fileReader.hasRemaining()) {
- MemorySegment readBuffer =
MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
- Buffer buffer = fileReader.readCurrentRegion(readBuffer, (buf)
-> {});
- buffersRead[subpartition].add(buffer);
+ final int subIndex = subpartition;
+ fileReader.readCurrentRegion(
+ allocateBuffers(bufferSize),
+ FreeingBufferRecycler.INSTANCE,
+ buffer -> addReadBuffer(buffer,
buffersRead[subIndex]));
}
}
IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
for (int subpartition = 0; subpartition < numSubpartitions;
++subpartition) {
- assertEquals(buffersWritten[subpartition].size(),
buffersRead[subpartition].size());
+ assertThat(buffersWritten[subpartition].size())
+ .isEqualTo(buffersRead[subpartition].size());
for (int i = 0; i < buffersWritten[subpartition].size(); ++i) {
assertBufferEquals(
buffersWritten[subpartition].get(i),
buffersRead[subpartition].get(i));
@@ -128,6 +131,32 @@ public class PartitionedFileWriteReadTest {
}
}
+ private void addReadBuffer(Buffer buffer, List<Buffer> buffersRead) {
+ int numBytes = buffer.readableBytes();
+ MemorySegment segment =
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
+ Buffer fullBuffer =
+ ((CompositeBuffer) buffer)
+
.getFullBufferData(MemorySegmentFactory.allocateUnpooledSegment(numBytes));
+ segment.put(0, fullBuffer.getNioBufferReadable(),
fullBuffer.readableBytes());
+ buffersRead.add(
+ new NetworkBuffer(
+ segment,
+ ignore -> {},
+ fullBuffer.getDataType(),
+ fullBuffer.isCompressed(),
+ fullBuffer.readableBytes()));
+ fullBuffer.recycleBuffer();
+ }
+
+ private static Queue<MemorySegment> allocateBuffers(int bufferSize) {
+ int numBuffers = 2;
+ Queue<MemorySegment> readBuffers = new LinkedList<>();
+ while (numBuffers-- > 0) {
+
readBuffers.add(MemorySegmentFactory.allocateUnpooledSegment(bufferSize));
+ }
+ return readBuffers;
+ }
+
@Test
public void testWriteAndReadWithEmptySubpartition() throws Exception {
int numRegions = 10;
@@ -136,8 +165,10 @@ public class PartitionedFileWriteReadTest {
Random random = new Random(1111);
Queue<Buffer>[] subpartitionBuffers = new ArrayDeque[numSubpartitions];
+ List<Buffer>[] buffersRead = new List[numSubpartitions];
for (int subpartition = 0; subpartition < numSubpartitions;
++subpartition) {
subpartitionBuffers[subpartition] = new ArrayDeque<>();
+ buffersRead[subpartition] = new ArrayList<>();
}
PartitionedFileWriter fileWriter =
createPartitionedFileWriter(numSubpartitions);
@@ -159,19 +190,24 @@ public class PartitionedFileWriteReadTest {
PartitionedFileReader fileReader =
new PartitionedFileReader(
partitionedFile, subpartition, dataFileChannel,
indexFileChannel);
+ int bufferIndex = 0;
while (fileReader.hasRemaining()) {
- MemorySegment readBuffer =
MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
- Buffer buffer =
checkNotNull(fileReader.readCurrentRegion(readBuffer, (buf) -> {}));
+ final int subIndex = subpartition;
+ fileReader.readCurrentRegion(
+ allocateBuffers(bufferSize),
+ FreeingBufferRecycler.INSTANCE,
+ buffer -> addReadBuffer(buffer,
buffersRead[subIndex]));
+ Buffer buffer = buffersRead[subIndex].get(bufferIndex++);
assertBufferEquals(checkNotNull(subpartitionBuffers[subpartition].poll()),
buffer);
}
- assertTrue(subpartitionBuffers[subpartition].isEmpty());
+ assertThat(subpartitionBuffers[subpartition]).isEmpty();
}
IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
}
private void assertBufferEquals(Buffer expected, Buffer actual) {
- assertEquals(expected.getDataType(), actual.getDataType());
- assertEquals(expected.getNioBufferReadable(),
actual.getNioBufferReadable());
+ assertThat(expected.getDataType()).isEqualTo(actual.getDataType());
+
assertThat(expected.getNioBufferReadable()).isEqualTo(actual.getNioBufferReadable());
}
private Buffer createBuffer(Random random, int bufferSize) {
@@ -221,15 +257,27 @@ public class PartitionedFileWriteReadTest {
@Test
public void testReadEmptyPartitionedFile() throws Exception {
+ int bufferSize = 1024;
+ int numSubpartitions = 2;
+ int targetSubpartition = 1;
PartitionedFile partitionedFile = createPartitionedFile();
+ List<Buffer>[] buffersRead = new List[numSubpartitions];
+ for (int subpartition = 0; subpartition < numSubpartitions;
++subpartition) {
+ buffersRead[subpartition] = new ArrayList<>();
+ }
+
FileChannel dataFileChannel =
openFileChannel(partitionedFile.getDataFilePath());
FileChannel indexFileChannel =
openFileChannel(partitionedFile.getIndexFilePath());
PartitionedFileReader partitionedFileReader =
- new PartitionedFileReader(partitionedFile, 1, dataFileChannel,
indexFileChannel);
- MemorySegment target =
MemorySegmentFactory.allocateUnpooledSegment(1024);
-
- assertNull(partitionedFileReader.readCurrentRegion(target,
FreeingBufferRecycler.INSTANCE));
+ new PartitionedFileReader(
+ partitionedFile, targetSubpartition, dataFileChannel,
indexFileChannel);
+
+ partitionedFileReader.readCurrentRegion(
+ allocateBuffers(bufferSize),
+ FreeingBufferRecycler.INSTANCE,
+ buffer -> addReadBuffer(buffer,
buffersRead[targetSubpartition]));
+ assertThat(buffersRead[targetSubpartition]).isEmpty();
IOUtils.closeAllQuietly(dataFileChannel, indexFileChannel);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index d65d408d1ad..301ecffee1d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
@@ -49,11 +49,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link SortMergeResultPartitionReadScheduler}. */
public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
@@ -62,7 +58,7 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
private static final byte[] dataBytes = new byte[bufferSize];
- private static final int totalBytes = bufferSize;
+ private static final int totalBytes = bufferSize * 2;
private static final int numThreads = 4;
@@ -105,7 +101,8 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
new PartitionedFileReader(partitionedFile, 0, dataFileChannel,
indexFileChannel);
bufferPool = new BatchShuffleReadBufferPool(totalBytes, bufferSize);
executor = Executors.newFixedThreadPool(numThreads);
- readScheduler = new SortMergeResultPartitionReadScheduler(bufferPool,
executor, this);
+ readScheduler =
+ new SortMergeResultPartitionReadScheduler(bufferPool,
executor, new Object());
}
@After
@@ -123,18 +120,21 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
readScheduler.createSubpartitionReader(
new NoOpBufferAvailablityListener(), 0,
partitionedFile);
- assertTrue(readScheduler.isRunning());
- assertTrue(readScheduler.getDataFileChannel().isOpen());
- assertTrue(readScheduler.getIndexFileChannel().isOpen());
+ assertThat(readScheduler.isRunning()).isTrue();
+ assertThat(readScheduler.getDataFileChannel().isOpen()).isTrue();
+ assertThat(readScheduler.getIndexFileChannel().isOpen()).isTrue();
int numBuffersRead = 0;
while (numBuffersRead < numBuffersPerSubpartition) {
ResultSubpartition.BufferAndBacklog bufferAndBacklog =
subpartitionReader.getNextBuffer();
if (bufferAndBacklog != null) {
- Buffer buffer = bufferAndBacklog.buffer();
- assertEquals(ByteBuffer.wrap(dataBytes),
buffer.getNioBufferReadable());
- buffer.recycleBuffer();
+ int numBytes = bufferAndBacklog.buffer().readableBytes();
+ MemorySegment segment =
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
+ Buffer fullBuffer =
+ ((CompositeBuffer)
bufferAndBacklog.buffer()).getFullBufferData(segment);
+
assertThat(ByteBuffer.wrap(dataBytes)).isEqualTo(fullBuffer.getNioBufferReadable());
+ fullBuffer.recycleBuffer();
++numBuffersRead;
}
}
@@ -160,10 +160,10 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
Thread.sleep(1000);
readScheduler.release();
- assertNotNull(subpartitionReader.getFailureCause());
- assertTrue(subpartitionReader.isReleased());
- assertEquals(0,
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
-
assertTrue(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
+ assertThat(subpartitionReader.getFailureCause()).isNotNull();
+ assertThat(subpartitionReader.isReleased()).isTrue();
+
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
+
assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
readScheduler.getReleaseFuture().get();
assertAllResourcesReleased();
@@ -199,8 +199,8 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
}
waitUntilReadFinish();
- assertNotNull(subpartitionReader.getFailureCause());
-
assertTrue(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
+ assertThat(subpartitionReader.getFailureCause()).isNotNull();
+
assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
assertAllResourcesReleased();
}
@@ -213,14 +213,15 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
bufferPool.destroy();
waitUntilReadFinish();
- assertTrue(subpartitionReader.isReleased());
- assertNotNull(subpartitionReader.getFailureCause());
-
assertTrue(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
+ assertThat(subpartitionReader.isReleased()).isTrue();
+ assertThat(subpartitionReader.getFailureCause()).isNotNull();
+
assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
assertAllResourcesReleased();
}
@Test(timeout = 60000)
public void testNoDeadlockWhenReadAndReleaseBuffers() throws Exception {
+ bufferPool.initialize();
SortMergeSubpartitionReader subpartitionReader =
new SortMergeSubpartitionReader(new
NoOpBufferAvailablityListener(), fileReader);
Thread readAndReleaseThread =
@@ -229,7 +230,7 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
Queue<MemorySegment> segments = new ArrayDeque<>();
segments.add(MemorySegmentFactory.allocateUnpooledSegment(bufferSize));
try {
- assertTrue(fileReader.hasRemaining());
+ assertThat(fileReader.hasRemaining()).isTrue();
subpartitionReader.readBuffers(segments,
readScheduler);
subpartitionReader.releaseAllResources();
subpartitionReader.readBuffers(segments,
readScheduler);
@@ -258,7 +259,7 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
Assertions.assertThatThrownBy(readScheduler::allocateBuffers)
.isInstanceOf(TimeoutException.class);
long requestDuration = System.nanoTime() - startTimestamp;
- Assertions.assertThat(requestDuration >
bufferRequestTimeout.toNanos()).isTrue();
+ assertThat(requestDuration > bufferRequestTimeout.toNanos()).isTrue();
bufferPool.recycle(buffers);
readScheduler.release();
@@ -279,9 +280,9 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
Queue<MemorySegment> allocatedBuffers =
readScheduler.allocateBuffers();
long requestDuration = System.nanoTime() - startTimestamp;
- assertEquals(3, allocatedBuffers.size());
- assertTrue(requestDuration > bufferRequestTimeout.toNanos() * 2);
- assertNull(subpartitionReader.getFailureCause());
+ assertThat(allocatedBuffers.size()).isEqualTo(3);
+ assertThat(requestDuration > bufferRequestTimeout.toNanos() *
2).isTrue();
+ assertThat(subpartitionReader.getFailureCause()).isNull();
bufferPool.recycle(allocatedBuffers);
bufferPool.destroy();
@@ -314,21 +315,14 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
return FileChannel.open(path, StandardOpenOption.READ);
}
- private static void assertExpectedTimeoutException(Throwable throwable) {
- assertNotNull(throwable);
- assertTrue(
- ExceptionUtils.findThrowableWithMessage(throwable, "Buffer
request timeout")
- .isPresent());
- }
-
private void assertAllResourcesReleased() {
- assertNull(readScheduler.getDataFileChannel());
- assertNull(readScheduler.getIndexFileChannel());
- assertFalse(readScheduler.isRunning());
- assertEquals(0, readScheduler.getNumPendingReaders());
+ assertThat(readScheduler.getDataFileChannel()).isNull();
+ assertThat(readScheduler.getIndexFileChannel()).isNull();
+ assertThat(readScheduler.isRunning()).isFalse();
+ assertThat(readScheduler.getNumPendingReaders()).isEqualTo(0);
if (!bufferPool.isDestroyed()) {
- assertEquals(bufferPool.getNumTotalBuffers(),
bufferPool.getAvailableBuffers());
+
assertThat(bufferPool.getNumTotalBuffers()).isEqualTo(bufferPool.getAvailableBuffers());
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
index 646b66de791..f4650e75df1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
@@ -27,6 +27,7 @@ import
org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.util.TestLogger;
@@ -178,10 +179,19 @@ public class SortMergeResultPartitionTest extends
TestLogger {
numBytesRead[subpartition] += numBytes;
MemorySegment segment =
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
- segment.put(0, buffer.getNioBufferReadable(), numBytes);
+ Buffer fullBuffer =
+ ((CompositeBuffer) buffer)
+ .getFullBufferData(
+
MemorySegmentFactory.allocateUnpooledSegment(numBytes));
+ segment.put(0, fullBuffer.getNioBufferReadable(),
fullBuffer.readableBytes());
buffersRead[subpartition].add(
new NetworkBuffer(
- segment, (buf) -> {},
buffer.getDataType(), numBytes));
+ segment,
+ ignore -> {},
+ fullBuffer.getDataType(),
+ fullBuffer.isCompressed(),
+ fullBuffer.readableBytes()));
+ fullBuffer.recycleBuffer();
});
DataBufferTest.checkWriteReadResult(
numSubpartitions, numBytesWritten, numBytesRead, dataWritten,
buffersRead);
@@ -219,7 +229,6 @@ public class SortMergeResultPartitionTest extends
TestLogger {
Buffer buffer = bufferAndBacklog.buffer();
bufferProcessor.accept(new BufferWithChannel(buffer,
subpartition));
dataSize += buffer.readableBytes();
- buffer.recycleBuffer();
if (!buffer.isBuffer()) {
++numEndOfPartitionEvents;
@@ -263,9 +272,17 @@ public class SortMergeResultPartitionTest extends
TestLogger {
new ResultSubpartitionView[] {view},
bufferWithChannel -> {
Buffer buffer = bufferWithChannel.getBuffer();
- if (buffer.isBuffer()) {
- recordRead.put(buffer.getNioBufferReadable());
+ int numBytes = buffer.readableBytes();
+
+ MemorySegment segment =
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
+ Buffer fullBuffer = ((CompositeBuffer)
buffer).getFullBufferData(segment);
+ if (fullBuffer.isBuffer()) {
+ ByteBuffer byteBuffer =
+ ByteBuffer.allocate(fullBuffer.readableBytes())
+
.put(fullBuffer.getNioBufferReadable());
+ recordRead.put((ByteBuffer) byteBuffer.flip());
}
+ fullBuffer.recycleBuffer();
});
recordWritten.rewind();
recordRead.flip();
@@ -300,7 +317,12 @@ public class SortMergeResultPartitionTest extends
TestLogger {
}
ResultSubpartitionView[] views = createSubpartitionViews(partition,
numSubpartitions);
- long dataRead = readData(views, (ignored) -> {});
+ long dataRead =
+ readData(
+ views,
+ bufferWithChannel -> {
+ bufferWithChannel.getBuffer().recycleBuffer();
+ });
assertEquals(dataSize, dataRead);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
index ac7acc80a87..cf28333de1f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.TestLogger;
@@ -43,11 +44,7 @@ import java.util.Random;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link SortMergeSubpartitionReader}. */
public class SortMergeSubpartitionReaderTest extends TestLogger {
@@ -97,22 +94,22 @@ public class SortMergeSubpartitionReaderTest extends
TestLogger {
SortMergeSubpartitionReader subpartitionReader =
createSortMergeSubpartitionReader(listener);
- assertEquals(0, listener.numNotifications);
- assertEquals(0,
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
+ assertThat(listener.numNotifications).isEqualTo(0);
+
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
Queue<MemorySegment> segments = createsMemorySegments(2);
subpartitionReader.readBuffers(segments,
FreeingBufferRecycler.INSTANCE);
- assertEquals(1, listener.numNotifications);
- assertEquals(2,
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
- assertEquals(0, segments.size());
+ assertThat(listener.numNotifications).isEqualTo(1);
+
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(1);
+ assertThat(segments.size()).isEqualTo(0);
segments = createsMemorySegments(2);
subpartitionReader.readBuffers(segments,
FreeingBufferRecycler.INSTANCE);
- assertEquals(1, listener.numNotifications);
- assertEquals(4,
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
- assertEquals(0, segments.size());
+ assertThat(listener.numNotifications).isEqualTo(1);
+
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(2);
+ assertThat(segments.size()).isEqualTo(0);
while (subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers() >
0) {
checkNotNull(subpartitionReader.getNextBuffer()).buffer().recycleBuffer();
@@ -121,11 +118,10 @@ public class SortMergeSubpartitionReaderTest extends
TestLogger {
segments = createsMemorySegments(numBuffersPerSubpartition);
subpartitionReader.readBuffers(segments,
FreeingBufferRecycler.INSTANCE);
- assertEquals(2, listener.numNotifications);
- assertEquals(
- numBuffersPerSubpartition - 4,
- subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
- assertEquals(4, segments.size());
+ assertThat(listener.numNotifications).isEqualTo(2);
+ assertThat(numBuffersPerSubpartition - 2)
+
.isEqualTo(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
+ assertThat(segments.size()).isEqualTo(1);
}
@Test
@@ -133,24 +129,28 @@ public class SortMergeSubpartitionReaderTest extends
TestLogger {
SortMergeSubpartitionReader subpartitionReader =
createSortMergeSubpartitionReader(new
CountingAvailabilityListener());
- assertNull(subpartitionReader.getNextBuffer());
-
assertFalse(subpartitionReader.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable());
+ assertThat(subpartitionReader.getNextBuffer()).isNull();
+
assertThat(subpartitionReader.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable())
+ .isFalse();
Queue<MemorySegment> segments =
createsMemorySegments(numBuffersPerSubpartition);
subpartitionReader.readBuffers(segments,
FreeingBufferRecycler.INSTANCE);
for (int i = numBuffersPerSubpartition - 1; i >= 0; --i) {
-
assertTrue(subpartitionReader.getAvailabilityAndBacklog(i).isAvailable());
+ if
(!subpartitionReader.getAvailabilityAndBacklog(i).isAvailable()) {
+ continue;
+ }
ResultSubpartition.BufferAndBacklog bufferAndBacklog =
checkNotNull(subpartitionReader.getNextBuffer());
- assertEquals(
- ByteBuffer.wrap(dataBytes),
bufferAndBacklog.buffer().getNioBufferReadable());
- assertEquals(bufferAndBacklog.buffersInBacklog(), i == 0 ? 0 : i -
1);
- Buffer.DataType dataType =
- i == 0
- ? Buffer.DataType.NONE
- : (i > 1 ? Buffer.DataType.DATA_BUFFER :
Buffer.DataType.EVENT_BUFFER);
- assertEquals(dataType, bufferAndBacklog.getNextDataType());
+ int numBytes = bufferAndBacklog.buffer().readableBytes();
+ MemorySegment segment =
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
+ Buffer fullBuffer =
+ ((CompositeBuffer)
bufferAndBacklog.buffer()).getFullBufferData(segment);
+
assertThat(ByteBuffer.wrap(dataBytes)).isEqualTo(fullBuffer.getNioBufferReadable());
+ assertThat(bufferAndBacklog.buffersInBacklog()).isEqualTo(i == 0 ?
0 : i - 1);
+ Buffer.DataType dataType = i <= 1 ? Buffer.DataType.NONE :
Buffer.DataType.DATA_BUFFER;
+ assertThat(dataType).isEqualTo(bufferAndBacklog.getNextDataType());
+ fullBuffer.recycleBuffer();
}
}
@@ -165,19 +165,19 @@ public class SortMergeSubpartitionReaderTest extends
TestLogger {
createSortMergeSubpartitionReader(listener);
subpartitionReader.readBuffers(segments, segments::add);
- assertEquals(1, listener.numNotifications);
- assertEquals(5,
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
+ assertThat(listener.numNotifications).isEqualTo(1);
+
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(4);
subpartitionReader.fail(new RuntimeException("Test exception."));
- assertTrue(subpartitionReader.getReleaseFuture().isDone());
- assertEquals(0,
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
-
assertTrue(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
- assertTrue(subpartitionReader.isReleased());
+
assertThat(subpartitionReader.getReleaseFuture().isDone()).isTrue();
+
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
+
assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
+ assertThat(subpartitionReader.isReleased()).isTrue();
- assertEquals(2, listener.numNotifications);
- assertNotNull(subpartitionReader.getFailureCause());
+ assertThat(listener.numNotifications).isEqualTo(2);
+ assertThat(subpartitionReader.getFailureCause()).isNotNull();
} finally {
- assertEquals(numSegments, segments.size());
+ assertThat(numSegments).isEqualTo(segments.size());
}
}
@@ -192,19 +192,19 @@ public class SortMergeSubpartitionReaderTest extends
TestLogger {
createSortMergeSubpartitionReader(listener);
subpartitionReader.readBuffers(segments, segments::add);
- assertEquals(1, listener.numNotifications);
- assertEquals(5,
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
+ assertThat(listener.numNotifications).isEqualTo(1);
+
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(4);
subpartitionReader.releaseAllResources();
- assertTrue(subpartitionReader.getReleaseFuture().isDone());
- assertEquals(0,
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
-
assertTrue(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
- assertTrue(subpartitionReader.isReleased());
+
assertThat(subpartitionReader.getReleaseFuture().isDone()).isTrue();
+
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
+
assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
+ assertThat(subpartitionReader.isReleased()).isTrue();
- assertEquals(1, listener.numNotifications);
- assertNull(subpartitionReader.getFailureCause());
+ assertThat(listener.numNotifications).isEqualTo(1);
+ assertThat(subpartitionReader.getFailureCause()).isNull();
} finally {
- assertEquals(numSegments, segments.size());
+ assertThat(numSegments).isEqualTo(segments.size());
}
}
@@ -221,7 +221,7 @@ public class SortMergeSubpartitionReaderTest extends
TestLogger {
subpartitionReader.releaseAllResources();
subpartitionReader.readBuffers(segments, segments::add);
} finally {
- assertEquals(numSegments, segments.size());
+ assertThat(numSegments).isEqualTo(segments.size());
}
}
@@ -233,16 +233,17 @@ public class SortMergeSubpartitionReaderTest extends
TestLogger {
Queue<MemorySegment> segments =
createsMemorySegments(numBuffersPerSubpartition);
subpartitionReader.readBuffers(segments,
FreeingBufferRecycler.INSTANCE);
-
assertTrue(subpartitionReader.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable());
+
assertThat(subpartitionReader.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable())
+ .isTrue();
subpartitionReader.releaseAllResources();
- assertNull(subpartitionReader.getNextBuffer());
+ assertThat(subpartitionReader.getNextBuffer()).isNull();
}
private SortMergeSubpartitionReader createSortMergeSubpartitionReader(
BufferAvailabilityListener listener) throws Exception {
PartitionedFileReader fileReader =
new PartitionedFileReader(partitionedFile, 0, dataFileChannel,
indexFileChannel);
- assertTrue(fileReader.hasRemaining());
+ assertThat(fileReader.hasRemaining()).isTrue();
return new SortMergeSubpartitionReader(listener, fileReader);
}