This is an automated email from the ASF dual-hosted git repository.
yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1c044753b0f [FLINK-28373][network] Read a full buffer of data per file
IO read request for sort-shuffle
1c044753b0f is described below
commit 1c044753b0f0e4c23f954d71f355d1368c41061f
Author: Yuxin Tan <[email protected]>
AuthorDate: Tue Aug 2 21:59:34 2022 +0800
[FLINK-28373][network] Read a full buffer of data per file IO read request
for sort-shuffle
This closes #20335.
---
.../runtime/io/network/buffer/CompositeBuffer.java | 233 +++++++++++++++++++++
.../buffer/ReadOnlySlicedNetworkBuffer.java | 10 +-
.../runtime/io/network/netty/NettyMessage.java | 7 +-
.../network/partition/PartitionedFileReader.java | 130 ++++++++++++
.../partition/SortMergeSubpartitionReader.java | 17 +-
.../partition/consumer/LocalInputChannel.java | 7 +-
.../network/buffer/ReadOnlySlicedBufferTest.java | 20 +-
.../SortMergeResultPartitionReadSchedulerTest.java | 73 +++----
.../partition/SortMergeResultPartitionTest.java | 34 ++-
.../partition/SortMergeSubpartitionReaderTest.java | 103 ++++-----
10 files changed, 509 insertions(+), 125 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/CompositeBuffer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/CompositeBuffer.java
new file mode 100644
index 00000000000..7444704b0ad
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/CompositeBuffer.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.CompositeByteBuf;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An implementation of {@link Buffer} which contains multiple partial buffers
for network data
+ * communication.
+ */
+public class CompositeBuffer implements Buffer {
+
+ private final DataType dataType;
+
+ private final int length;
+
+ private final boolean isCompressed;
+
+ private final List<Buffer> partialBuffers = new ArrayList<>();
+
+ private int currentLength;
+
+ private ByteBufAllocator allocator;
+
+ public CompositeBuffer(DataType dataType, int length, boolean
isCompressed) {
+ this.dataType = checkNotNull(dataType);
+ this.length = length;
+ this.isCompressed = isCompressed;
+ }
+
+ public CompositeBuffer(BufferHeader header) {
+ this(header.getDataType(), header.getLength(), header.isCompressed());
+ }
+
+ @Override
+ public boolean isBuffer() {
+ return dataType.isBuffer();
+ }
+
+ @Override
+ public void recycleBuffer() {
+ for (Buffer partialBuffer : partialBuffers) {
+ partialBuffer.recycleBuffer();
+ }
+ }
+
+ @Override
+ public Buffer retainBuffer() {
+ for (Buffer partialBuffer : partialBuffers) {
+ partialBuffer.retainBuffer();
+ }
+ return this;
+ }
+
+ @Override
+ public int getSize() {
+ return currentLength;
+ }
+
+ @Override
+ public int readableBytes() {
+ return currentLength;
+ }
+
+ @Override
+ public void setAllocator(ByteBufAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ @Override
+ public ByteBuf asByteBuf() {
+ CompositeByteBuf compositeByteBuf =
checkNotNull(allocator).compositeDirectBuffer();
+ for (Buffer buffer : partialBuffers) {
+ compositeByteBuf.addComponent(buffer.asByteBuf());
+ }
+ compositeByteBuf.writerIndex(currentLength);
+ return compositeByteBuf;
+ }
+
+ @Override
+ public boolean isCompressed() {
+ return isCompressed;
+ }
+
+ @Override
+ public DataType getDataType() {
+ return dataType;
+ }
+
+ public int numPartialBuffers() {
+ return partialBuffers.size();
+ }
+
+ /**
+ * Returns the full buffer data in one piece of {@link MemorySegment}. If
there is multiple
+ * partial buffers, the partial data will be copied to the given target
{@link MemorySegment}.
+ */
+ public Buffer getFullBufferData(MemorySegment segment) {
+ checkState(!partialBuffers.isEmpty());
+ checkState(currentLength <= segment.size());
+
+ if (partialBuffers.size() == 1) {
+ return partialBuffers.get(0);
+ }
+
+ int offset = 0;
+ for (Buffer buffer : partialBuffers) {
+ segment.put(offset, buffer.getNioBufferReadable(),
buffer.readableBytes());
+ offset += buffer.readableBytes();
+ }
+ recycleBuffer();
+ return new NetworkBuffer(
+ segment,
+ BufferRecycler.DummyBufferRecycler.INSTANCE,
+ dataType,
+ isCompressed,
+ currentLength);
+ }
+
+ public void addPartialBuffer(Buffer buffer) {
+ buffer.setDataType(dataType);
+ buffer.setCompressed(isCompressed);
+ partialBuffers.add(buffer);
+ currentLength += buffer.readableBytes();
+ checkState(currentLength <= length);
+ }
+
+ public int missingLength() {
+ return length - currentLength;
+ }
+
+ @Override
+ public MemorySegment getMemorySegment() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getMemorySegmentOffset() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BufferRecycler getRecycler() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isRecycled() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Buffer readOnlySlice() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Buffer readOnlySlice(int index, int length) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getMaxCapacity() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getReaderIndex() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setReaderIndex(int readerIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setSize(int writerIndex) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ByteBuffer getNioBufferReadable() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ByteBuffer getNioBuffer(int index, int length) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setCompressed(boolean isCompressed) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setDataType(DataType dataType) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int refCnt() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
index df9dccb7314..4fccec09bdc 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
@@ -44,6 +44,8 @@ public final class ReadOnlySlicedNetworkBuffer extends
ReadOnlyByteBuf implement
private boolean isCompressed = false;
+ private DataType dataType;
+
/**
* Creates a buffer which shares the memory segment of the given buffer
and exposed the given
* sub-region only.
@@ -58,6 +60,7 @@ public final class ReadOnlySlicedNetworkBuffer extends
ReadOnlyByteBuf implement
ReadOnlySlicedNetworkBuffer(NetworkBuffer buffer, int index, int length) {
super(new SlicedByteBuf(buffer, index, length));
this.memorySegmentOffset = buffer.getMemorySegmentOffset() + index;
+ this.dataType = buffer.getDataType();
}
/**
@@ -79,6 +82,7 @@ public final class ReadOnlySlicedNetworkBuffer extends
ReadOnlyByteBuf implement
super(new SlicedByteBuf(buffer, index, length));
this.memorySegmentOffset = memorySegmentOffset + index;
this.isCompressed = isCompressed;
+ this.dataType = getBuffer().getDataType();
}
@Override
@@ -88,7 +92,7 @@ public final class ReadOnlySlicedNetworkBuffer extends
ReadOnlyByteBuf implement
@Override
public boolean isBuffer() {
- return getBuffer().isBuffer();
+ return dataType.isBuffer();
}
/**
@@ -223,12 +227,12 @@ public final class ReadOnlySlicedNetworkBuffer extends
ReadOnlyByteBuf implement
@Override
public DataType getDataType() {
- return getBuffer().getDataType();
+ return dataType;
}
@Override
public void setDataType(DataType dataType) {
- throw new ReadOnlyBufferException();
+ this.dataType = dataType;
}
private Buffer getBuffer() {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 81984569320..702e3e32e48 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FileRegionBuffer;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
@@ -345,7 +346,11 @@ public abstract class NettyMessage {
headerBuf = fillHeader(allocator);
out.write(headerBuf);
- out.write(buffer, promise);
+ if (buffer instanceof FileRegionBuffer) {
+ out.write(buffer, promise);
+ } else {
+ out.write(buffer.asByteBuf(), promise);
+ }
} catch (Throwable t) {
handleException(headerBuf, buffer, t);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
index a84fe5fc706..64570067b80 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionedFileReader.java
@@ -20,13 +20,18 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferHeader;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.Queue;
+import java.util.function.Consumer;
import static
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.HEADER_LENGTH;
import static
org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil.readFromByteChannel;
@@ -120,6 +125,60 @@ class PartitionedFileReader {
return buffer;
}
+ boolean readCurrentRegion(
+ Queue<MemorySegment> freeSegments, BufferRecycler recycler,
Consumer<Buffer> consumer)
+ throws IOException {
+ if (currentRegionRemainingBytes == 0) {
+ return false;
+ }
+
+ checkArgument(!freeSegments.isEmpty(), "No buffer available for data
reading.");
+ dataFileChannel.position(nextOffsetToRead);
+
+ BufferAndHeader partialBuffer = new BufferAndHeader(null, null);
+ try {
+ while (!freeSegments.isEmpty() && currentRegionRemainingBytes > 0)
{
+ MemorySegment segment = freeSegments.poll();
+ int numBytes = (int) Math.min(segment.size(),
currentRegionRemainingBytes);
+ ByteBuffer byteBuffer = segment.wrap(0, numBytes);
+
+ try {
+
BufferReaderWriterUtil.readByteBufferFully(dataFileChannel, byteBuffer);
+ byteBuffer.flip();
+ currentRegionRemainingBytes -= byteBuffer.remaining();
+ nextOffsetToRead += byteBuffer.remaining();
+ } catch (IOException exception) {
+ freeSegments.add(segment);
+ throw exception;
+ }
+
+ NetworkBuffer buffer = new NetworkBuffer(segment, recycler);
+ buffer.setSize(byteBuffer.remaining());
+ try {
+ partialBuffer = processBuffer(byteBuffer, buffer,
partialBuffer, consumer);
+ } finally {
+ buffer.recycleBuffer();
+ }
+ }
+ } finally {
+ if (headerBuf.position() > 0) {
+ nextOffsetToRead -= headerBuf.position();
+ currentRegionRemainingBytes += headerBuf.position();
+ headerBuf.clear();
+ }
+ if (partialBuffer.header != null) {
+ nextOffsetToRead -= HEADER_LENGTH;
+ currentRegionRemainingBytes += HEADER_LENGTH;
+ }
+ if (partialBuffer.buffer != null) {
+ nextOffsetToRead -= partialBuffer.buffer.readableBytes();
+ currentRegionRemainingBytes +=
partialBuffer.buffer.readableBytes();
+ partialBuffer.buffer.recycleBuffer();
+ }
+ }
+ return hasRemaining();
+ }
+
boolean hasRemaining() throws IOException {
moveToNextReadableRegion();
return currentRegionRemainingBytes > 0;
@@ -129,4 +188,75 @@ class PartitionedFileReader {
long getPriority() {
return nextOffsetToRead;
}
+
+ private BufferAndHeader processBuffer(
+ ByteBuffer byteBuffer,
+ Buffer buffer,
+ BufferAndHeader partialBuffer,
+ Consumer<Buffer> consumer) {
+ BufferHeader header = partialBuffer.header;
+ CompositeBuffer targetBuffer = partialBuffer.buffer;
+ while (byteBuffer.hasRemaining()) {
+ if (header == null && (header = parseBufferHeader(byteBuffer)) ==
null) {
+ break;
+ }
+
+ if (targetBuffer != null) {
+ buffer.retainBuffer();
+ int position = byteBuffer.position() +
targetBuffer.missingLength();
+ targetBuffer.addPartialBuffer(
+ buffer.readOnlySlice(byteBuffer.position(),
targetBuffer.missingLength()));
+ byteBuffer.position(position);
+ } else if (byteBuffer.remaining() < header.getLength()) {
+ if (byteBuffer.hasRemaining()) {
+ buffer.retainBuffer();
+ targetBuffer = new CompositeBuffer(header);
+ targetBuffer.addPartialBuffer(
+ buffer.readOnlySlice(byteBuffer.position(),
byteBuffer.remaining()));
+ }
+ break;
+ } else {
+ buffer.retainBuffer();
+ targetBuffer = new CompositeBuffer(header);
+ targetBuffer.addPartialBuffer(
+ buffer.readOnlySlice(byteBuffer.position(),
header.getLength()));
+ byteBuffer.position(byteBuffer.position() +
header.getLength());
+ }
+
+ header = null;
+ consumer.accept(targetBuffer);
+ targetBuffer = null;
+ }
+ return new BufferAndHeader(targetBuffer, header);
+ }
+
+ private BufferHeader parseBufferHeader(ByteBuffer buffer) {
+ BufferHeader header = null;
+ if (headerBuf.position() > 0) {
+ while (headerBuf.hasRemaining()) {
+ headerBuf.put(buffer.get());
+ }
+ headerBuf.flip();
+ header = BufferReaderWriterUtil.parseBufferHeader(headerBuf);
+ headerBuf.clear();
+ }
+
+ if (header == null && buffer.remaining() < HEADER_LENGTH) {
+ headerBuf.put(buffer);
+ } else if (header == null) {
+ header = BufferReaderWriterUtil.parseBufferHeader(buffer);
+ }
+ return header;
+ }
+
+ private static class BufferAndHeader {
+
+ private final CompositeBuffer buffer;
+ private final BufferHeader header;
+
+ BufferAndHeader(CompositeBuffer buffer, BufferHeader header) {
+ this.buffer = buffer;
+ this.header = header;
+ }
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
index 4644a98d65a..db9121cd6c6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
@@ -127,22 +127,7 @@ class SortMergeSubpartitionReader
/** This method is called by the IO thread of {@link
SortMergeResultPartitionReadScheduler}. */
boolean readBuffers(Queue<MemorySegment> buffers, BufferRecycler recycler)
throws IOException {
- while (!buffers.isEmpty()) {
- MemorySegment segment = buffers.poll();
-
- Buffer buffer;
- try {
- if ((buffer = fileReader.readCurrentRegion(segment, recycler))
== null) {
- buffers.add(segment);
- break;
- }
- } catch (Throwable throwable) {
- buffers.add(segment);
- throw throwable;
- }
- addBuffer(buffer);
- }
- return fileReader.hasRemaining();
+ return fileReader.readCurrentRegion(buffers, recycler,
this::addBuffer);
}
CompletableFuture<?> getReleaseFuture() {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index f409a68524f..c1a440864ed 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.buffer.FileRegionBuffer;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
@@ -241,7 +242,11 @@ public class LocalInputChannel extends InputChannel
implements BufferAvailabilit
buffer = ((FileRegionBuffer)
buffer).readInto(inputGate.getUnpooledSegment());
}
- numBytesIn.inc(buffer.getSize());
+ if (buffer instanceof CompositeBuffer) {
+ buffer = ((CompositeBuffer)
buffer).getFullBufferData(inputGate.getUnpooledSegment());
+ }
+
+ numBytesIn.inc(buffer.readableBytes());
numBuffersIn.inc();
channelStatePersister.checkForBarrier(buffer);
channelStatePersister.maybePersist(buffer);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
index ff66ca7dfbd..3b92d177618 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
@@ -26,12 +26,12 @@ import
org.apache.flink.runtime.io.network.netty.NettyBufferPool;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.ReadOnlyBufferException;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -66,14 +66,20 @@ public class ReadOnlySlicedBufferTest {
assertEquals(eventBuffer.isBuffer(), eventBuffer.readOnlySlice(1,
2).isBuffer());
}
- @Test(expected = ReadOnlyBufferException.class)
- public void testSetDataTypeThrows1() {
- buffer.readOnlySlice().setDataType(Buffer.DataType.EVENT_BUFFER);
+ @Test
+ public void testSetDataType1() {
+ ReadOnlySlicedNetworkBuffer readOnlyBuffer = buffer.readOnlySlice();
+ readOnlyBuffer.setDataType(Buffer.DataType.EVENT_BUFFER);
+
Assertions.assertThat(readOnlyBuffer.getDataType()).isEqualTo(Buffer.DataType.EVENT_BUFFER);
}
- @Test(expected = ReadOnlyBufferException.class)
- public void testSetDataTypeThrows2() {
- buffer.readOnlySlice(1, 2).setDataType(Buffer.DataType.EVENT_BUFFER);
+ @Test
+ public void testSetDataType2() {
+ ReadOnlySlicedNetworkBuffer readOnlyBuffer = buffer.readOnlySlice(1,
2);
+ readOnlyBuffer.setDataType(Buffer.DataType.EVENT_BUFFER);
+
Assertions.assertThat(readOnlyBuffer.getDataType()).isEqualTo(Buffer.DataType.EVENT_BUFFER);
+ Assertions.assertThat(buffer.readOnlySlice(1, 2).getDataType())
+ .isNotEqualTo(Buffer.DataType.EVENT_BUFFER);
}
@Test
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index ee442bcdc44..223f298e800 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -22,7 +22,7 @@ import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
@@ -49,11 +49,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link SortMergeResultPartitionReadScheduler}. */
public class SortMergeResultPartitionReadSchedulerTest extends TestLogger {
@@ -62,7 +58,7 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
private static final byte[] dataBytes = new byte[bufferSize];
- private static final int totalBytes = bufferSize;
+ private static final int totalBytes = bufferSize * 2;
private static final int numThreads = 4;
@@ -107,7 +103,7 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
executor = Executors.newFixedThreadPool(numThreads);
readScheduler =
new SortMergeResultPartitionReadScheduler(
- numSubpartitions, bufferPool, executor, this);
+ numSubpartitions, bufferPool, executor, new Object());
}
@After
@@ -125,18 +121,21 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
readScheduler.createSubpartitionReader(
new NoOpBufferAvailablityListener(), 0,
partitionedFile);
- assertTrue(readScheduler.isRunning());
- assertTrue(readScheduler.getDataFileChannel().isOpen());
- assertTrue(readScheduler.getIndexFileChannel().isOpen());
+ assertThat(readScheduler.isRunning()).isTrue();
+ assertThat(readScheduler.getDataFileChannel().isOpen()).isTrue();
+ assertThat(readScheduler.getIndexFileChannel().isOpen()).isTrue();
int numBuffersRead = 0;
while (numBuffersRead < numBuffersPerSubpartition) {
ResultSubpartition.BufferAndBacklog bufferAndBacklog =
subpartitionReader.getNextBuffer();
if (bufferAndBacklog != null) {
- Buffer buffer = bufferAndBacklog.buffer();
- assertEquals(ByteBuffer.wrap(dataBytes),
buffer.getNioBufferReadable());
- buffer.recycleBuffer();
+ int numBytes = bufferAndBacklog.buffer().readableBytes();
+ MemorySegment segment =
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
+ Buffer fullBuffer =
+ ((CompositeBuffer)
bufferAndBacklog.buffer()).getFullBufferData(segment);
+
assertThat(ByteBuffer.wrap(dataBytes)).isEqualTo(fullBuffer.getNioBufferReadable());
+ fullBuffer.recycleBuffer();
++numBuffersRead;
}
}
@@ -162,10 +161,10 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
Thread.sleep(1000);
readScheduler.release();
- assertNotNull(subpartitionReader.getFailureCause());
- assertTrue(subpartitionReader.isReleased());
- assertEquals(0,
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
-
assertTrue(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
+ assertThat(subpartitionReader.getFailureCause()).isNotNull();
+ assertThat(subpartitionReader.isReleased()).isTrue();
+
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
+
assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
readScheduler.getReleaseFuture().get();
assertAllResourcesReleased();
@@ -201,8 +200,8 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
}
waitUntilReadFinish();
- assertNotNull(subpartitionReader.getFailureCause());
-
assertTrue(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
+ assertThat(subpartitionReader.getFailureCause()).isNotNull();
+
assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
assertAllResourcesReleased();
}
@@ -215,14 +214,15 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
bufferPool.destroy();
waitUntilReadFinish();
- assertTrue(subpartitionReader.isReleased());
- assertNotNull(subpartitionReader.getFailureCause());
-
assertTrue(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
+ assertThat(subpartitionReader.isReleased()).isTrue();
+ assertThat(subpartitionReader.getFailureCause()).isNotNull();
+
assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
assertAllResourcesReleased();
}
@Test(timeout = 60000)
public void testNoDeadlockWhenReadAndReleaseBuffers() throws Exception {
+ bufferPool.initialize();
SortMergeSubpartitionReader subpartitionReader =
new SortMergeSubpartitionReader(new
NoOpBufferAvailablityListener(), fileReader);
Thread readAndReleaseThread =
@@ -231,7 +231,7 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
Queue<MemorySegment> segments = new ArrayDeque<>();
segments.add(MemorySegmentFactory.allocateUnpooledSegment(bufferSize));
try {
- assertTrue(fileReader.hasRemaining());
+ assertThat(fileReader.hasRemaining()).isTrue();
subpartitionReader.readBuffers(segments,
readScheduler);
subpartitionReader.releaseAllResources();
subpartitionReader.readBuffers(segments,
readScheduler);
@@ -260,7 +260,7 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
Assertions.assertThatThrownBy(readScheduler::allocateBuffers)
.isInstanceOf(TimeoutException.class);
long requestDuration = System.nanoTime() - startTimestamp;
- Assertions.assertThat(requestDuration >
bufferRequestTimeout.toNanos()).isTrue();
+ assertThat(requestDuration > bufferRequestTimeout.toNanos()).isTrue();
bufferPool.recycle(buffers);
readScheduler.release();
@@ -281,9 +281,9 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
Queue<MemorySegment> allocatedBuffers =
readScheduler.allocateBuffers();
long requestDuration = System.nanoTime() - startTimestamp;
- assertEquals(3, allocatedBuffers.size());
- assertTrue(requestDuration > bufferRequestTimeout.toNanos() * 2);
- assertNull(subpartitionReader.getFailureCause());
+ assertThat(allocatedBuffers.size()).isEqualTo(3);
+ assertThat(requestDuration > bufferRequestTimeout.toNanos() *
2).isTrue();
+ assertThat(subpartitionReader.getFailureCause()).isNull();
bufferPool.recycle(allocatedBuffers);
bufferPool.destroy();
@@ -316,21 +316,14 @@ public class SortMergeResultPartitionReadSchedulerTest
extends TestLogger {
return FileChannel.open(path, StandardOpenOption.READ);
}
- private static void assertExpectedTimeoutException(Throwable throwable) {
- assertNotNull(throwable);
- assertTrue(
- ExceptionUtils.findThrowableWithMessage(throwable, "Buffer
request timeout")
- .isPresent());
- }
-
private void assertAllResourcesReleased() {
- assertNull(readScheduler.getDataFileChannel());
- assertNull(readScheduler.getIndexFileChannel());
- assertFalse(readScheduler.isRunning());
- assertEquals(0, readScheduler.getNumPendingReaders());
+ assertThat(readScheduler.getDataFileChannel()).isNull();
+ assertThat(readScheduler.getIndexFileChannel()).isNull();
+ assertThat(readScheduler.isRunning()).isFalse();
+ assertThat(readScheduler.getNumPendingReaders()).isEqualTo(0);
if (!bufferPool.isDestroyed()) {
- assertEquals(bufferPool.getNumTotalBuffers(),
bufferPool.getAvailableBuffers());
+
assertThat(bufferPool.getNumTotalBuffers()).isEqualTo(bufferPool.getAvailableBuffers());
}
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
index 646b66de791..f4650e75df1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
@@ -27,6 +27,7 @@ import
org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.util.TestLogger;
@@ -178,10 +179,19 @@ public class SortMergeResultPartitionTest extends
TestLogger {
numBytesRead[subpartition] += numBytes;
MemorySegment segment =
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
- segment.put(0, buffer.getNioBufferReadable(), numBytes);
+ Buffer fullBuffer =
+ ((CompositeBuffer) buffer)
+ .getFullBufferData(
+
MemorySegmentFactory.allocateUnpooledSegment(numBytes));
+ segment.put(0, fullBuffer.getNioBufferReadable(),
fullBuffer.readableBytes());
buffersRead[subpartition].add(
new NetworkBuffer(
- segment, (buf) -> {},
buffer.getDataType(), numBytes));
+ segment,
+ ignore -> {},
+ fullBuffer.getDataType(),
+ fullBuffer.isCompressed(),
+ fullBuffer.readableBytes()));
+ fullBuffer.recycleBuffer();
});
DataBufferTest.checkWriteReadResult(
numSubpartitions, numBytesWritten, numBytesRead, dataWritten,
buffersRead);
@@ -219,7 +229,6 @@ public class SortMergeResultPartitionTest extends
TestLogger {
Buffer buffer = bufferAndBacklog.buffer();
bufferProcessor.accept(new BufferWithChannel(buffer,
subpartition));
dataSize += buffer.readableBytes();
- buffer.recycleBuffer();
if (!buffer.isBuffer()) {
++numEndOfPartitionEvents;
@@ -263,9 +272,17 @@ public class SortMergeResultPartitionTest extends
TestLogger {
new ResultSubpartitionView[] {view},
bufferWithChannel -> {
Buffer buffer = bufferWithChannel.getBuffer();
- if (buffer.isBuffer()) {
- recordRead.put(buffer.getNioBufferReadable());
+ int numBytes = buffer.readableBytes();
+
+ MemorySegment segment =
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
+ Buffer fullBuffer = ((CompositeBuffer)
buffer).getFullBufferData(segment);
+ if (fullBuffer.isBuffer()) {
+ ByteBuffer byteBuffer =
+ ByteBuffer.allocate(fullBuffer.readableBytes())
+
.put(fullBuffer.getNioBufferReadable());
+ recordRead.put((ByteBuffer) byteBuffer.flip());
}
+ fullBuffer.recycleBuffer();
});
recordWritten.rewind();
recordRead.flip();
@@ -300,7 +317,12 @@ public class SortMergeResultPartitionTest extends
TestLogger {
}
ResultSubpartitionView[] views = createSubpartitionViews(partition,
numSubpartitions);
- long dataRead = readData(views, (ignored) -> {});
+ long dataRead =
+ readData(
+ views,
+ bufferWithChannel -> {
+ bufferWithChannel.getBuffer().recycleBuffer();
+ });
assertEquals(dataSize, dataRead);
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
index ac7acc80a87..cf28333de1f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReaderTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.TestLogger;
@@ -43,11 +44,7 @@ import java.util.Random;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
/** Tests for {@link SortMergeSubpartitionReader}. */
public class SortMergeSubpartitionReaderTest extends TestLogger {
@@ -97,22 +94,22 @@ public class SortMergeSubpartitionReaderTest extends
TestLogger {
SortMergeSubpartitionReader subpartitionReader =
createSortMergeSubpartitionReader(listener);
- assertEquals(0, listener.numNotifications);
- assertEquals(0,
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
+ assertThat(listener.numNotifications).isEqualTo(0);
+
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
Queue<MemorySegment> segments = createsMemorySegments(2);
subpartitionReader.readBuffers(segments,
FreeingBufferRecycler.INSTANCE);
- assertEquals(1, listener.numNotifications);
- assertEquals(2,
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
- assertEquals(0, segments.size());
+ assertThat(listener.numNotifications).isEqualTo(1);
+
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(1);
+ assertThat(segments.size()).isEqualTo(0);
segments = createsMemorySegments(2);
subpartitionReader.readBuffers(segments,
FreeingBufferRecycler.INSTANCE);
- assertEquals(1, listener.numNotifications);
- assertEquals(4,
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
- assertEquals(0, segments.size());
+ assertThat(listener.numNotifications).isEqualTo(1);
+
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(2);
+ assertThat(segments.size()).isEqualTo(0);
while (subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers() >
0) {
checkNotNull(subpartitionReader.getNextBuffer()).buffer().recycleBuffer();
@@ -121,11 +118,10 @@ public class SortMergeSubpartitionReaderTest extends
TestLogger {
segments = createsMemorySegments(numBuffersPerSubpartition);
subpartitionReader.readBuffers(segments,
FreeingBufferRecycler.INSTANCE);
- assertEquals(2, listener.numNotifications);
- assertEquals(
- numBuffersPerSubpartition - 4,
- subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
- assertEquals(4, segments.size());
+ assertThat(listener.numNotifications).isEqualTo(2);
+ assertThat(numBuffersPerSubpartition - 2)
+
.isEqualTo(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
+ assertThat(segments.size()).isEqualTo(1);
}
@Test
@@ -133,24 +129,28 @@ public class SortMergeSubpartitionReaderTest extends
TestLogger {
SortMergeSubpartitionReader subpartitionReader =
createSortMergeSubpartitionReader(new
CountingAvailabilityListener());
- assertNull(subpartitionReader.getNextBuffer());
-
assertFalse(subpartitionReader.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable());
+ assertThat(subpartitionReader.getNextBuffer()).isNull();
+
assertThat(subpartitionReader.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable())
+ .isFalse();
Queue<MemorySegment> segments =
createsMemorySegments(numBuffersPerSubpartition);
subpartitionReader.readBuffers(segments,
FreeingBufferRecycler.INSTANCE);
for (int i = numBuffersPerSubpartition - 1; i >= 0; --i) {
-
assertTrue(subpartitionReader.getAvailabilityAndBacklog(i).isAvailable());
+ if
(!subpartitionReader.getAvailabilityAndBacklog(i).isAvailable()) {
+ continue;
+ }
ResultSubpartition.BufferAndBacklog bufferAndBacklog =
checkNotNull(subpartitionReader.getNextBuffer());
- assertEquals(
- ByteBuffer.wrap(dataBytes),
bufferAndBacklog.buffer().getNioBufferReadable());
- assertEquals(bufferAndBacklog.buffersInBacklog(), i == 0 ? 0 : i -
1);
- Buffer.DataType dataType =
- i == 0
- ? Buffer.DataType.NONE
- : (i > 1 ? Buffer.DataType.DATA_BUFFER :
Buffer.DataType.EVENT_BUFFER);
- assertEquals(dataType, bufferAndBacklog.getNextDataType());
+ int numBytes = bufferAndBacklog.buffer().readableBytes();
+ MemorySegment segment =
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
+ Buffer fullBuffer =
+ ((CompositeBuffer)
bufferAndBacklog.buffer()).getFullBufferData(segment);
+
assertThat(ByteBuffer.wrap(dataBytes)).isEqualTo(fullBuffer.getNioBufferReadable());
+ assertThat(bufferAndBacklog.buffersInBacklog()).isEqualTo(i == 0 ?
0 : i - 1);
+ Buffer.DataType dataType = i <= 1 ? Buffer.DataType.NONE :
Buffer.DataType.DATA_BUFFER;
+ assertThat(dataType).isEqualTo(bufferAndBacklog.getNextDataType());
+ fullBuffer.recycleBuffer();
}
}
@@ -165,19 +165,19 @@ public class SortMergeSubpartitionReaderTest extends
TestLogger {
createSortMergeSubpartitionReader(listener);
subpartitionReader.readBuffers(segments, segments::add);
- assertEquals(1, listener.numNotifications);
- assertEquals(5,
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
+ assertThat(listener.numNotifications).isEqualTo(1);
+
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(4);
subpartitionReader.fail(new RuntimeException("Test exception."));
- assertTrue(subpartitionReader.getReleaseFuture().isDone());
- assertEquals(0,
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
-
assertTrue(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
- assertTrue(subpartitionReader.isReleased());
+
assertThat(subpartitionReader.getReleaseFuture().isDone()).isTrue();
+
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
+
assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
+ assertThat(subpartitionReader.isReleased()).isTrue();
- assertEquals(2, listener.numNotifications);
- assertNotNull(subpartitionReader.getFailureCause());
+ assertThat(listener.numNotifications).isEqualTo(2);
+ assertThat(subpartitionReader.getFailureCause()).isNotNull();
} finally {
- assertEquals(numSegments, segments.size());
+ assertThat(numSegments).isEqualTo(segments.size());
}
}
@@ -192,19 +192,19 @@ public class SortMergeSubpartitionReaderTest extends
TestLogger {
createSortMergeSubpartitionReader(listener);
subpartitionReader.readBuffers(segments, segments::add);
- assertEquals(1, listener.numNotifications);
- assertEquals(5,
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
+ assertThat(listener.numNotifications).isEqualTo(1);
+
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(4);
subpartitionReader.releaseAllResources();
- assertTrue(subpartitionReader.getReleaseFuture().isDone());
- assertEquals(0,
subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers());
-
assertTrue(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable());
- assertTrue(subpartitionReader.isReleased());
+
assertThat(subpartitionReader.getReleaseFuture().isDone()).isTrue();
+
assertThat(subpartitionReader.unsynchronizedGetNumberOfQueuedBuffers()).isEqualTo(0);
+
assertThat(subpartitionReader.getAvailabilityAndBacklog(0).isAvailable()).isTrue();
+ assertThat(subpartitionReader.isReleased()).isTrue();
- assertEquals(1, listener.numNotifications);
- assertNull(subpartitionReader.getFailureCause());
+ assertThat(listener.numNotifications).isEqualTo(1);
+ assertThat(subpartitionReader.getFailureCause()).isNull();
} finally {
- assertEquals(numSegments, segments.size());
+ assertThat(numSegments).isEqualTo(segments.size());
}
}
@@ -221,7 +221,7 @@ public class SortMergeSubpartitionReaderTest extends
TestLogger {
subpartitionReader.releaseAllResources();
subpartitionReader.readBuffers(segments, segments::add);
} finally {
- assertEquals(numSegments, segments.size());
+ assertThat(numSegments).isEqualTo(segments.size());
}
}
@@ -233,16 +233,17 @@ public class SortMergeSubpartitionReaderTest extends
TestLogger {
Queue<MemorySegment> segments =
createsMemorySegments(numBuffersPerSubpartition);
subpartitionReader.readBuffers(segments,
FreeingBufferRecycler.INSTANCE);
-
assertTrue(subpartitionReader.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable());
+
assertThat(subpartitionReader.getAvailabilityAndBacklog(Integer.MAX_VALUE).isAvailable())
+ .isTrue();
subpartitionReader.releaseAllResources();
- assertNull(subpartitionReader.getNextBuffer());
+ assertThat(subpartitionReader.getNextBuffer()).isNull();
}
private SortMergeSubpartitionReader createSortMergeSubpartitionReader(
BufferAvailabilityListener listener) throws Exception {
PartitionedFileReader fileReader =
new PartitionedFileReader(partitionedFile, 0, dataFileChannel,
indexFileChannel);
- assertTrue(fileReader.hasRemaining());
+ assertThat(fileReader.hasRemaining()).isTrue();
return new SortMergeSubpartitionReader(listener, fileReader);
}