This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 5ae2275e [CELEBORN-125] RemoteShuffleOutputGate completes functions to
support shuffle write (#1067)
5ae2275e is described below
commit 5ae2275e3e43c2c8d6ae5b7c6c4662a7196de686
Author: zhongqiangczq <[email protected]>
AuthorDate: Wed Dec 14 10:10:21 2022 +0800
[CELEBORN-125] RemoteShuffleOutputGate completes functions to support
shuffle write (#1067)
---
client-flink/flink-1.14/pom.xml | 5 +
.../plugin/flink/RemoteShuffleOutputGate.java | 62 +++++++----
.../flink/RemoteShuffleOutputGateSuiteJ.java | 101 +++++++++++++++++
.../celeborn/plugin/flink/buffer/BufferHeader.java | 68 ++++++++++++
.../celeborn/plugin/flink/buffer/BufferPacker.java | 105 ++++++++++++++++++
.../celeborn/plugin/flink/utils/BufferUtils.java | 119 +++++++++++++++++++++
6 files changed, 438 insertions(+), 22 deletions(-)
diff --git a/client-flink/flink-1.14/pom.xml b/client-flink/flink-1.14/pom.xml
index 30681778..6b88888f 100644
--- a/client-flink/flink-1.14/pom.xml
+++ b/client-flink/flink-1.14/pom.xml
@@ -55,5 +55,10 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
index f30dfadc..e6f03e2a 100644
---
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
+++
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
@@ -20,6 +20,7 @@ package org.apache.celeborn.plugin.flink;
import java.io.IOException;
import java.util.Optional;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -30,17 +31,32 @@ import org.apache.celeborn.client.ShuffleClient;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.identity.UserIdentifier;
import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
+import org.apache.celeborn.plugin.flink.utils.BufferUtils;
import org.apache.celeborn.plugin.flink.utils.Utils;
/**
* A transportation gate used to spill buffers from {@link
ResultPartitionWriter} to remote shuffle
- * worker.
+ * worker. The whole process of communication between outputGate and shuffle
worker could be
+ * described as below:
+ *
+ * <ul>
+ * <li>1. Client registers shuffle to get partitionLocation which stores the
data in remote
+ * shuffle;
+ * <li>2. Client sends PushDataHandShake to transfer handshake message;
+ * <li>3. Client sends RegionStart which announces the start of a writing
region and maybe get a
+ * new partitionLocation;
+ * <li>4. Client write data;
+ * <li>5. Client sends RegionFinish to indicate writing finish of aregion;
+ * <li>6. Repeat from step-2 to step-5;
+ * <li>7. Client sends mapend to indicate writing finish;
+ * </ul>
*/
public class RemoteShuffleOutputGate {
private final RemoteShuffleDescriptor shuffleDesc;
protected final int numSubs;
- private final ShuffleClient shuffleWriteClient;
+ protected ShuffleClient shuffleWriteClient;
protected final SupplierWithException<BufferPool, IOException>
bufferPoolFactory;
protected BufferPool bufferPool;
private CelebornConf celebornConf;
@@ -50,6 +66,7 @@ public class RemoteShuffleOutputGate {
private int currentRegionIndex = 0;
private int bufferSize;
+ private BufferPacker bufferPacker;
private String applicationId;
private int shuffleId;
private int mapId;
@@ -75,7 +92,7 @@ public class RemoteShuffleOutputGate {
this.numSubs = numSubs;
this.bufferPoolFactory = bufferPoolFactory;
this.shuffleWriteClient = createWriteClient();
- // this.bufferPacker = new BufferPacker(this::write);
+ this.bufferPacker = new BufferPacker(this::write);
this.celebornConf = celebornConf;
this.numMappers = numMappers;
this.bufferSize = bufferSize;
@@ -99,7 +116,7 @@ public class RemoteShuffleOutputGate {
"Too few buffers for transfer, the minimum valid required size is 2.");
// guarantee that we have at least one buffer
- // BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
+ BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
// handshake
handshake();
@@ -112,7 +129,7 @@ public class RemoteShuffleOutputGate {
/** Writes a {@link Buffer} to a subpartition. */
public void write(Buffer buffer, int subIdx) throws InterruptedException {
- // bufferPacker.process(buffer, subIdx);
+ bufferPacker.process(buffer, subIdx);
}
/**
@@ -158,7 +175,7 @@ public class RemoteShuffleOutputGate {
* region-finish.
*/
public void regionFinish() throws InterruptedException {
- // bufferPacker.drain();
+ bufferPacker.drain();
try {
shuffleWriteClient.regionFinish(
applicationId, shuffleId, mapId, attemptId, partitionLocation);
@@ -178,7 +195,7 @@ public class RemoteShuffleOutputGate {
if (bufferPool != null) {
bufferPool.lazyDestroy();
}
- // bufferPacker.close();
+ bufferPacker.close();
shuffleWriteClient.shutDown();
}
@@ -187,26 +204,27 @@ public class RemoteShuffleOutputGate {
return shuffleDesc;
}
- private ShuffleClient createWriteClient() {
+ @VisibleForTesting
+ ShuffleClient createWriteClient() {
return ShuffleClient.get(rssMetaServiceHost, rssMetaServicePort,
celebornConf, userIdentifier);
}
/** Writes a piece of data to a subpartition. */
public void write(ByteBuf byteBuf, int subIdx) throws InterruptedException {
- // try {
- // byteBuf.retain();
- // shuffleWriteClient.pushData(
- // applicationId,
- // shuffleId,
- // mapId,
- // attemptId,
- // subIdx,
- // io.netty.buffer.Unpooled.wrappedBuffer(byteBuf.nioBuffer()),
- // partitionLocation,
- // () -> byteBuf.release());
- // } catch (IOException e) {
- // Utils.rethrowAsRuntimeException(e);
- // }
+ try {
+ byteBuf.retain();
+ shuffleWriteClient.pushDataToLocation(
+ applicationId,
+ shuffleId,
+ mapId,
+ attemptId,
+ subIdx,
+ io.netty.buffer.Unpooled.wrappedBuffer(byteBuf.nioBuffer()),
+ partitionLocation,
+ () -> byteBuf.release());
+ } catch (IOException e) {
+ Utils.rethrowAsRuntimeException(e);
+ }
}
public void handshake() {
diff --git
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
new file mode 100644
index 00000000..ca4c43e9
--- /dev/null
+++
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Optional;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.celeborn.client.ShuffleClientImpl;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+
+public class RemoteShuffleOutputGateSuiteJ {
+ private RemoteShuffleOutputGate remoteShuffleOutputGate =
mock(RemoteShuffleOutputGate.class);
+ private ShuffleClientImpl shuffleClient = mock(ShuffleClientImpl.class);
+
+ @Before
+ public void setup() {
+ remoteShuffleOutputGate.shuffleWriteClient = shuffleClient;
+ }
+
+ @Test
+ public void TestSimpleWriteData() throws IOException, InterruptedException {
+
+ PartitionLocation partitionLocation =
+ new PartitionLocation(1, 0, "localhost", 123, 245, 789, 238,
PartitionLocation.Mode.MASTER);
+ when(shuffleClient.registerMapPartitionTask(any(), anyInt(), anyInt(),
anyInt(), anyInt()))
+ .thenAnswer(t -> partitionLocation);
+ doNothing()
+ .when(remoteShuffleOutputGate.shuffleWriteClient)
+ .pushDataHandShake(anyString(), anyInt(), anyInt(), anyInt(),
anyInt(), anyInt(), any());
+
+ remoteShuffleOutputGate.handshake();
+
+ when(remoteShuffleOutputGate.shuffleWriteClient.regionStart(
+ any(), anyInt(), anyInt(), anyInt(), any(), anyInt(),
anyBoolean()))
+ .thenAnswer(t -> Optional.empty());
+ remoteShuffleOutputGate.regionStart(false);
+
+ remoteShuffleOutputGate.write(createBuffer(), 0);
+
+ doNothing()
+ .when(remoteShuffleOutputGate.shuffleWriteClient)
+ .regionFinish(any(), anyInt(), anyInt(), anyInt(), any());
+ remoteShuffleOutputGate.regionFinish();
+
+ doNothing()
+ .when(remoteShuffleOutputGate.shuffleWriteClient)
+ .mapperEnd(any(), anyInt(), anyInt(), anyInt(), anyInt());
+ remoteShuffleOutputGate.finish();
+
+ doNothing().when(remoteShuffleOutputGate.shuffleWriteClient).shutDown();
+ remoteShuffleOutputGate.close();
+ }
+
+ private Buffer createBuffer() throws IOException, InterruptedException {
+ int segmentSize = 32 * 1024;
+ NetworkBufferPool networkBufferPool =
+ new NetworkBufferPool(256, 32 * 1024, Duration.ofMillis(30000L));
+ BufferPool bufferPool =
+ networkBufferPool.createBufferPool(
+ 8 * 1024 * 1024 / segmentSize, 8 * 1024 * 1024 / segmentSize);
+
+ MemorySegment memorySegment = bufferPool.requestMemorySegmentBlocking();
+
+ memorySegment.put(0, new byte[] {1, 2, 3});
+
+ Buffer buffer = new NetworkBuffer(memorySegment, bufferPool);
+ return buffer;
+ }
+}
diff --git
a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferHeader.java
b/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferHeader.java
new file mode 100644
index 00000000..6dc6350c
--- /dev/null
+++
b/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferHeader.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.buffer;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+/** Header information for a {@link
org.apache.flink.runtime.io.network.buffer.Buffer}. */
+public class BufferHeader {
+
+ private final Buffer.DataType dataType;
+
+ private final boolean isCompressed;
+ private int subPartitionId;
+ private int attemptId;
+ private int nextBatchId;
+ private int totalLength;
+
+ // current buffer's size
+ private final int size;
+
+ public BufferHeader(Buffer.DataType dataType, boolean isCompressed, int
size) {
+ this(0, 0, 0, size + 2, dataType, isCompressed, size);
+ }
+
+ public BufferHeader(
+ int subPartitionId,
+ int attemptId,
+ int nextBatchId,
+ int totalLength,
+ Buffer.DataType dataType,
+ boolean isCompressed,
+ int size) {
+ this.subPartitionId = subPartitionId;
+ this.attemptId = attemptId;
+ this.nextBatchId = nextBatchId;
+ this.totalLength = totalLength;
+ this.dataType = dataType;
+ this.isCompressed = isCompressed;
+ this.size = size;
+ }
+
+ public Buffer.DataType getDataType() {
+ return dataType;
+ }
+
+ public boolean isCompressed() {
+ return isCompressed;
+ }
+
+ public int getSize() {
+ return size;
+ }
+}
diff --git
a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
b/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
new file mode 100644
index 00000000..dd40b2d7
--- /dev/null
+++
b/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.buffer;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import org.apache.celeborn.plugin.flink.utils.BufferUtils;
+
+/** Harness used to pack multiple partial buffers together as a full one. */
+public class BufferPacker {
+
+ public interface BiConsumerWithException<T, U, E extends Throwable> {
+ void accept(T var1, U var2) throws E;
+ }
+
+ private final BiConsumerWithException<ByteBuf, Integer,
InterruptedException> ripeBufferHandler;
+
+ private Buffer cachedBuffer;
+
+ private int currentSubIdx = -1;
+
+ public BufferPacker(
+ BiConsumerWithException<ByteBuf, Integer, InterruptedException>
ripeBufferHandler) {
+ this.ripeBufferHandler = ripeBufferHandler;
+ }
+
+ public void process(Buffer buffer, int subIdx) throws InterruptedException {
+ if (buffer == null) {
+ return;
+ }
+
+ if (buffer.readableBytes() == 0) {
+ buffer.recycleBuffer();
+ return;
+ }
+
+ if (cachedBuffer == null) {
+ cachedBuffer = buffer;
+ currentSubIdx = subIdx;
+ } else if (currentSubIdx != subIdx) {
+ Buffer dumpedBuffer = cachedBuffer;
+ cachedBuffer = buffer;
+ int targetSubIdx = currentSubIdx;
+ currentSubIdx = subIdx;
+ handleRipeBuffer(dumpedBuffer, targetSubIdx);
+ } else {
+ /**
+ * this is an optimization。if cachedBuffer can contain other buffer,
then other buffer can
+ * reuse the same HEADER_LENGTH_PREFIX of the cachedBuffer, so
cachedbuffer just read datas
+ * whose length is buffer.readableBytes() -
BufferUtils.HEADER_LENGTH_PREFIX
+ */
+ if (cachedBuffer.readableBytes() + buffer.readableBytes() -
BufferUtils.HEADER_LENGTH_PREFIX
+ <= cachedBuffer.getMaxCapacity()) {
+ cachedBuffer
+ .asByteBuf()
+ .writeBytes(
+ buffer.asByteBuf(),
+ BufferUtils.HEADER_LENGTH_PREFIX,
+ buffer.readableBytes() - BufferUtils.HEADER_LENGTH_PREFIX);
+ buffer.recycleBuffer();
+ } else {
+ Buffer dumpedBuffer = cachedBuffer;
+ cachedBuffer = buffer;
+ handleRipeBuffer(dumpedBuffer, currentSubIdx);
+ }
+ }
+ }
+
+ public void drain() throws InterruptedException {
+ if (cachedBuffer != null) {
+ handleRipeBuffer(cachedBuffer, currentSubIdx);
+ }
+ cachedBuffer = null;
+ currentSubIdx = -1;
+ }
+
+ private void handleRipeBuffer(Buffer buffer, int subIdx) throws
InterruptedException {
+ buffer.setCompressed(false);
+ ripeBufferHandler.accept(buffer.asByteBuf(), subIdx);
+ }
+
+ public void close() {
+ if (cachedBuffer != null) {
+ cachedBuffer.recycleBuffer();
+ cachedBuffer = null;
+ }
+ currentSubIdx = -1;
+ }
+}
diff --git
a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
b/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
new file mode 100644
index 00000000..92ec0695
--- /dev/null
+++
b/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.utils;
+
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkArgument;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import org.apache.celeborn.plugin.flink.buffer.BufferHeader;
+
+/** Utility methods to process flink buffers. */
+public class BufferUtils {
+
+ // subpartitionid(4) + attemptId(4) + nextBatchId(4) + compressedsize
+ public static final int HEADER_LENGTH_PREFIX = 4 * 4;
+ // dataType(1) + size(4)
+ public static final int HEADER_LENGTH = HEADER_LENGTH_PREFIX + 1 + 4;
+
+ /**
+ * Copies the data of the compressed buffer and the corresponding buffer
header to the origin
+ * buffer. The origin buffer must reserve the {@link #HEADER_LENGTH} space
for the header data.
+ */
+ public static void setCompressedDataWithHeader(Buffer buffer, Buffer
compressedBuffer) {
+ checkArgument(buffer != null, "Must be not null.");
+ checkArgument(buffer.getReaderIndex() == 0, "Illegal reader index.");
+
+ boolean isCompressed = compressedBuffer != null &&
compressedBuffer.isCompressed();
+ int dataLength =
+ isCompressed ? compressedBuffer.readableBytes() :
buffer.readableBytes() - HEADER_LENGTH;
+ ByteBuf byteBuf = buffer.asByteBuf();
+ setBufferHeader(byteBuf, buffer.getDataType(), isCompressed, dataLength);
+
+ if (isCompressed) {
+ byteBuf.writeBytes(compressedBuffer.asByteBuf());
+ }
+ buffer.setSize(dataLength + HEADER_LENGTH);
+ }
+
+ public static void setBufferHeader(
+ ByteBuf byteBuf, Buffer.DataType dataType, boolean isCompressed, int
dataLength) {
+ byteBuf.writerIndex(0);
+ byteBuf.writeInt(0);
+ byteBuf.writeInt(0);
+ byteBuf.writeInt(0);
+ byteBuf.writeInt(0);
+ byteBuf.writeByte(dataType.ordinal());
+ byteBuf.writeBoolean(isCompressed);
+ byteBuf.writeInt(dataLength);
+ }
+
+ public static BufferHeader getBufferHeader(Buffer buffer, int position) {
+ return getBufferHeader(buffer, position, false);
+ }
+
+ public static BufferHeader getBufferHeader(Buffer buffer, int position,
boolean isFirst) {
+ ByteBuf byteBuf = buffer.asByteBuf();
+ byteBuf.readerIndex(position);
+ if (isFirst) {
+ return new BufferHeader(
+ Buffer.DataType.values()[byteBuf.readByte()], byteBuf.readBoolean(),
byteBuf.readInt());
+ } else {
+ return new BufferHeader(
+ byteBuf.readInt(),
+ byteBuf.readInt(),
+ byteBuf.readInt(),
+ byteBuf.readInt(),
+ Buffer.DataType.values()[byteBuf.readByte()],
+ byteBuf.readBoolean(),
+ byteBuf.readInt());
+ }
+ }
+
+ public static void reserveNumRequiredBuffers(BufferPool bufferPool, int
numRequiredBuffers)
+ throws IOException {
+ long startTime = System.nanoTime();
+ List<MemorySegment> buffers = new ArrayList<>(numRequiredBuffers);
+ try {
+ // guarantee that we have at least the minimal number of buffers
+ while (buffers.size() < numRequiredBuffers) {
+ MemorySegment segment = bufferPool.requestMemorySegment();
+ if (segment != null) {
+ buffers.add(segment);
+ continue;
+ }
+
+ Thread.sleep(10);
+ if ((System.nanoTime() - startTime) > 3L * 60 * 1000_000_000) {
+ throw new IOException("Could not allocate the required number of
buffers in 3 minutes.");
+ }
+ }
+ } catch (Throwable throwable) {
+ throw new IOException(throwable);
+ } finally {
+ buffers.forEach(bufferPool::recycle);
+ }
+ }
+}