This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ae2275e [CELEBORN-125] RemoteShuffleOutputGate completes functions to 
support shuffle write (#1067)
5ae2275e is described below

commit 5ae2275e3e43c2c8d6ae5b7c6c4662a7196de686
Author: zhongqiangczq <[email protected]>
AuthorDate: Wed Dec 14 10:10:21 2022 +0800

    [CELEBORN-125] RemoteShuffleOutputGate completes functions to support 
shuffle write (#1067)
---
 client-flink/flink-1.14/pom.xml                    |   5 +
 .../plugin/flink/RemoteShuffleOutputGate.java      |  62 +++++++----
 .../flink/RemoteShuffleOutputGateSuiteJ.java       | 101 +++++++++++++++++
 .../celeborn/plugin/flink/buffer/BufferHeader.java |  68 ++++++++++++
 .../celeborn/plugin/flink/buffer/BufferPacker.java | 105 ++++++++++++++++++
 .../celeborn/plugin/flink/utils/BufferUtils.java   | 119 +++++++++++++++++++++
 6 files changed, 438 insertions(+), 22 deletions(-)

diff --git a/client-flink/flink-1.14/pom.xml b/client-flink/flink-1.14/pom.xml
index 30681778..6b88888f 100644
--- a/client-flink/flink-1.14/pom.xml
+++ b/client-flink/flink-1.14/pom.xml
@@ -55,5 +55,10 @@
       <version>${project.version}</version>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
index f30dfadc..e6f03e2a 100644
--- 
a/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
+++ 
b/client-flink/flink-1.14/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGate.java
@@ -20,6 +20,7 @@ package org.apache.celeborn.plugin.flink;
 import java.io.IOException;
 import java.util.Optional;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -30,17 +31,32 @@ import org.apache.celeborn.client.ShuffleClient;
 import org.apache.celeborn.common.CelebornConf;
 import org.apache.celeborn.common.identity.UserIdentifier;
 import org.apache.celeborn.common.protocol.PartitionLocation;
+import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
+import org.apache.celeborn.plugin.flink.utils.BufferUtils;
 import org.apache.celeborn.plugin.flink.utils.Utils;
 
 /**
  * A transportation gate used to spill buffers from {@link 
ResultPartitionWriter} to remote shuffle
- * worker.
+ * worker. The whole process of communication between outputGate and shuffle 
worker could be
+ * described as below:
+ *
+ * <ul>
+ *   <li>1. Client registers shuffle to get partitionLocation which stores the 
data in remote
+ *       shuffle;
+ *   <li>2. Client sends PushDataHandShake to transfer handshake message;
+ *   <li>3. Client sends RegionStart which announces the start of a writing 
region and maybe get a
+ *       new partitionLocation;
+ *   <li>4. Client write data;
+ *   <li>5. Client sends RegionFinish to indicate writing finish of aregion;
+ *   <li>6. Repeat from step-2 to step-5;
+ *   <li>7. Client sends mapend to indicate writing finish;
+ * </ul>
  */
 public class RemoteShuffleOutputGate {
 
   private final RemoteShuffleDescriptor shuffleDesc;
   protected final int numSubs;
-  private final ShuffleClient shuffleWriteClient;
+  protected ShuffleClient shuffleWriteClient;
   protected final SupplierWithException<BufferPool, IOException> 
bufferPoolFactory;
   protected BufferPool bufferPool;
   private CelebornConf celebornConf;
@@ -50,6 +66,7 @@ public class RemoteShuffleOutputGate {
   private int currentRegionIndex = 0;
 
   private int bufferSize;
+  private BufferPacker bufferPacker;
   private String applicationId;
   private int shuffleId;
   private int mapId;
@@ -75,7 +92,7 @@ public class RemoteShuffleOutputGate {
     this.numSubs = numSubs;
     this.bufferPoolFactory = bufferPoolFactory;
     this.shuffleWriteClient = createWriteClient();
-    // this.bufferPacker = new BufferPacker(this::write);
+    this.bufferPacker = new BufferPacker(this::write);
     this.celebornConf = celebornConf;
     this.numMappers = numMappers;
     this.bufferSize = bufferSize;
@@ -99,7 +116,7 @@ public class RemoteShuffleOutputGate {
         "Too few buffers for transfer, the minimum valid required size is 2.");
 
     // guarantee that we have at least one buffer
-    // BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
+    BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
 
     // handshake
     handshake();
@@ -112,7 +129,7 @@ public class RemoteShuffleOutputGate {
 
   /** Writes a {@link Buffer} to a subpartition. */
   public void write(Buffer buffer, int subIdx) throws InterruptedException {
-    // bufferPacker.process(buffer, subIdx);
+    bufferPacker.process(buffer, subIdx);
   }
 
   /**
@@ -158,7 +175,7 @@ public class RemoteShuffleOutputGate {
    * region-finish.
    */
   public void regionFinish() throws InterruptedException {
-    // bufferPacker.drain();
+    bufferPacker.drain();
     try {
       shuffleWriteClient.regionFinish(
           applicationId, shuffleId, mapId, attemptId, partitionLocation);
@@ -178,7 +195,7 @@ public class RemoteShuffleOutputGate {
     if (bufferPool != null) {
       bufferPool.lazyDestroy();
     }
-    // bufferPacker.close();
+    bufferPacker.close();
     shuffleWriteClient.shutDown();
   }
 
@@ -187,26 +204,27 @@ public class RemoteShuffleOutputGate {
     return shuffleDesc;
   }
 
-  private ShuffleClient createWriteClient() {
+  @VisibleForTesting
+  ShuffleClient createWriteClient() {
     return ShuffleClient.get(rssMetaServiceHost, rssMetaServicePort, 
celebornConf, userIdentifier);
   }
 
   /** Writes a piece of data to a subpartition. */
   public void write(ByteBuf byteBuf, int subIdx) throws InterruptedException {
-    //    try {
-    //         byteBuf.retain();
-    //      shuffleWriteClient.pushData(
-    //          applicationId,
-    //          shuffleId,
-    //          mapId,
-    //          attemptId,
-    //          subIdx,
-    //          io.netty.buffer.Unpooled.wrappedBuffer(byteBuf.nioBuffer()),
-    //          partitionLocation,
-    //          () -> byteBuf.release());
-    //    } catch (IOException e) {
-    //      Utils.rethrowAsRuntimeException(e);
-    //    }
+    try {
+      byteBuf.retain();
+      shuffleWriteClient.pushDataToLocation(
+          applicationId,
+          shuffleId,
+          mapId,
+          attemptId,
+          subIdx,
+          io.netty.buffer.Unpooled.wrappedBuffer(byteBuf.nioBuffer()),
+          partitionLocation,
+          () -> byteBuf.release());
+    } catch (IOException e) {
+      Utils.rethrowAsRuntimeException(e);
+    }
   }
 
   public void handshake() {
diff --git 
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
 
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
new file mode 100644
index 00000000..ca4c43e9
--- /dev/null
+++ 
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleOutputGateSuiteJ.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Optional;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.celeborn.client.ShuffleClientImpl;
+import org.apache.celeborn.common.protocol.PartitionLocation;
+
+public class RemoteShuffleOutputGateSuiteJ {
+  private RemoteShuffleOutputGate remoteShuffleOutputGate = 
mock(RemoteShuffleOutputGate.class);
+  private ShuffleClientImpl shuffleClient = mock(ShuffleClientImpl.class);
+
+  @Before
+  public void setup() {
+    remoteShuffleOutputGate.shuffleWriteClient = shuffleClient;
+  }
+
+  @Test
+  public void TestSimpleWriteData() throws IOException, InterruptedException {
+
+    PartitionLocation partitionLocation =
+        new PartitionLocation(1, 0, "localhost", 123, 245, 789, 238, 
PartitionLocation.Mode.MASTER);
+    when(shuffleClient.registerMapPartitionTask(any(), anyInt(), anyInt(), 
anyInt(), anyInt()))
+        .thenAnswer(t -> partitionLocation);
+    doNothing()
+        .when(remoteShuffleOutputGate.shuffleWriteClient)
+        .pushDataHandShake(anyString(), anyInt(), anyInt(), anyInt(), 
anyInt(), anyInt(), any());
+
+    remoteShuffleOutputGate.handshake();
+
+    when(remoteShuffleOutputGate.shuffleWriteClient.regionStart(
+            any(), anyInt(), anyInt(), anyInt(), any(), anyInt(), 
anyBoolean()))
+        .thenAnswer(t -> Optional.empty());
+    remoteShuffleOutputGate.regionStart(false);
+
+    remoteShuffleOutputGate.write(createBuffer(), 0);
+
+    doNothing()
+        .when(remoteShuffleOutputGate.shuffleWriteClient)
+        .regionFinish(any(), anyInt(), anyInt(), anyInt(), any());
+    remoteShuffleOutputGate.regionFinish();
+
+    doNothing()
+        .when(remoteShuffleOutputGate.shuffleWriteClient)
+        .mapperEnd(any(), anyInt(), anyInt(), anyInt(), anyInt());
+    remoteShuffleOutputGate.finish();
+
+    doNothing().when(remoteShuffleOutputGate.shuffleWriteClient).shutDown();
+    remoteShuffleOutputGate.close();
+  }
+
+  private Buffer createBuffer() throws IOException, InterruptedException {
+    int segmentSize = 32 * 1024;
+    NetworkBufferPool networkBufferPool =
+        new NetworkBufferPool(256, 32 * 1024, Duration.ofMillis(30000L));
+    BufferPool bufferPool =
+        networkBufferPool.createBufferPool(
+            8 * 1024 * 1024 / segmentSize, 8 * 1024 * 1024 / segmentSize);
+
+    MemorySegment memorySegment = bufferPool.requestMemorySegmentBlocking();
+
+    memorySegment.put(0, new byte[] {1, 2, 3});
+
+    Buffer buffer = new NetworkBuffer(memorySegment, bufferPool);
+    return buffer;
+  }
+}
diff --git 
a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferHeader.java
 
b/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferHeader.java
new file mode 100644
index 00000000..6dc6350c
--- /dev/null
+++ 
b/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferHeader.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.buffer;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+
+/** Header information for a {@link 
org.apache.flink.runtime.io.network.buffer.Buffer}. */
+public class BufferHeader {
+
+  private final Buffer.DataType dataType;
+
+  private final boolean isCompressed;
+  private int subPartitionId;
+  private int attemptId;
+  private int nextBatchId;
+  private int totalLength;
+
+  // current buffer's size
+  private final int size;
+
+  public BufferHeader(Buffer.DataType dataType, boolean isCompressed, int 
size) {
+    this(0, 0, 0, size + 2, dataType, isCompressed, size);
+  }
+
+  public BufferHeader(
+      int subPartitionId,
+      int attemptId,
+      int nextBatchId,
+      int totalLength,
+      Buffer.DataType dataType,
+      boolean isCompressed,
+      int size) {
+    this.subPartitionId = subPartitionId;
+    this.attemptId = attemptId;
+    this.nextBatchId = nextBatchId;
+    this.totalLength = totalLength;
+    this.dataType = dataType;
+    this.isCompressed = isCompressed;
+    this.size = size;
+  }
+
+  public Buffer.DataType getDataType() {
+    return dataType;
+  }
+
+  public boolean isCompressed() {
+    return isCompressed;
+  }
+
+  public int getSize() {
+    return size;
+  }
+}
diff --git 
a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
 
b/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
new file mode 100644
index 00000000..dd40b2d7
--- /dev/null
+++ 
b/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.buffer;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import org.apache.celeborn.plugin.flink.utils.BufferUtils;
+
+/** Harness used to pack multiple partial buffers together as a full one. */
+public class BufferPacker {
+
+  public interface BiConsumerWithException<T, U, E extends Throwable> {
+    void accept(T var1, U var2) throws E;
+  }
+
+  private final BiConsumerWithException<ByteBuf, Integer, 
InterruptedException> ripeBufferHandler;
+
+  private Buffer cachedBuffer;
+
+  private int currentSubIdx = -1;
+
+  public BufferPacker(
+      BiConsumerWithException<ByteBuf, Integer, InterruptedException> 
ripeBufferHandler) {
+    this.ripeBufferHandler = ripeBufferHandler;
+  }
+
+  public void process(Buffer buffer, int subIdx) throws InterruptedException {
+    if (buffer == null) {
+      return;
+    }
+
+    if (buffer.readableBytes() == 0) {
+      buffer.recycleBuffer();
+      return;
+    }
+
+    if (cachedBuffer == null) {
+      cachedBuffer = buffer;
+      currentSubIdx = subIdx;
+    } else if (currentSubIdx != subIdx) {
+      Buffer dumpedBuffer = cachedBuffer;
+      cachedBuffer = buffer;
+      int targetSubIdx = currentSubIdx;
+      currentSubIdx = subIdx;
+      handleRipeBuffer(dumpedBuffer, targetSubIdx);
+    } else {
+      /**
+       * this is an optimization。if cachedBuffer can contain other buffer, 
then other buffer can
+       * reuse the same HEADER_LENGTH_PREFIX of the cachedBuffer, so 
cachedbuffer just read datas
+       * whose length is buffer.readableBytes() - 
BufferUtils.HEADER_LENGTH_PREFIX
+       */
+      if (cachedBuffer.readableBytes() + buffer.readableBytes() - 
BufferUtils.HEADER_LENGTH_PREFIX
+          <= cachedBuffer.getMaxCapacity()) {
+        cachedBuffer
+            .asByteBuf()
+            .writeBytes(
+                buffer.asByteBuf(),
+                BufferUtils.HEADER_LENGTH_PREFIX,
+                buffer.readableBytes() - BufferUtils.HEADER_LENGTH_PREFIX);
+        buffer.recycleBuffer();
+      } else {
+        Buffer dumpedBuffer = cachedBuffer;
+        cachedBuffer = buffer;
+        handleRipeBuffer(dumpedBuffer, currentSubIdx);
+      }
+    }
+  }
+
+  public void drain() throws InterruptedException {
+    if (cachedBuffer != null) {
+      handleRipeBuffer(cachedBuffer, currentSubIdx);
+    }
+    cachedBuffer = null;
+    currentSubIdx = -1;
+  }
+
+  private void handleRipeBuffer(Buffer buffer, int subIdx) throws 
InterruptedException {
+    buffer.setCompressed(false);
+    ripeBufferHandler.accept(buffer.asByteBuf(), subIdx);
+  }
+
+  public void close() {
+    if (cachedBuffer != null) {
+      cachedBuffer.recycleBuffer();
+      cachedBuffer = null;
+    }
+    currentSubIdx = -1;
+  }
+}
diff --git 
a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
 
b/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
new file mode 100644
index 00000000..92ec0695
--- /dev/null
+++ 
b/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink.utils;
+
+import static org.apache.celeborn.plugin.flink.utils.Utils.checkArgument;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import org.apache.celeborn.plugin.flink.buffer.BufferHeader;
+
+/** Utility methods to process flink buffers. */
+public class BufferUtils {
+
+  // subpartitionid(4) + attemptId(4) + nextBatchId(4) + compressedsize
+  public static final int HEADER_LENGTH_PREFIX = 4 * 4;
+  // dataType(1) + size(4)
+  public static final int HEADER_LENGTH = HEADER_LENGTH_PREFIX + 1 + 4;
+
+  /**
+   * Copies the data of the compressed buffer and the corresponding buffer 
header to the origin
+   * buffer. The origin buffer must reserve the {@link #HEADER_LENGTH} space 
for the header data.
+   */
+  public static void setCompressedDataWithHeader(Buffer buffer, Buffer 
compressedBuffer) {
+    checkArgument(buffer != null, "Must be not null.");
+    checkArgument(buffer.getReaderIndex() == 0, "Illegal reader index.");
+
+    boolean isCompressed = compressedBuffer != null && 
compressedBuffer.isCompressed();
+    int dataLength =
+        isCompressed ? compressedBuffer.readableBytes() : 
buffer.readableBytes() - HEADER_LENGTH;
+    ByteBuf byteBuf = buffer.asByteBuf();
+    setBufferHeader(byteBuf, buffer.getDataType(), isCompressed, dataLength);
+
+    if (isCompressed) {
+      byteBuf.writeBytes(compressedBuffer.asByteBuf());
+    }
+    buffer.setSize(dataLength + HEADER_LENGTH);
+  }
+
+  public static void setBufferHeader(
+      ByteBuf byteBuf, Buffer.DataType dataType, boolean isCompressed, int 
dataLength) {
+    byteBuf.writerIndex(0);
+    byteBuf.writeInt(0);
+    byteBuf.writeInt(0);
+    byteBuf.writeInt(0);
+    byteBuf.writeInt(0);
+    byteBuf.writeByte(dataType.ordinal());
+    byteBuf.writeBoolean(isCompressed);
+    byteBuf.writeInt(dataLength);
+  }
+
+  public static BufferHeader getBufferHeader(Buffer buffer, int position) {
+    return getBufferHeader(buffer, position, false);
+  }
+
+  public static BufferHeader getBufferHeader(Buffer buffer, int position, 
boolean isFirst) {
+    ByteBuf byteBuf = buffer.asByteBuf();
+    byteBuf.readerIndex(position);
+    if (isFirst) {
+      return new BufferHeader(
+          Buffer.DataType.values()[byteBuf.readByte()], byteBuf.readBoolean(), 
byteBuf.readInt());
+    } else {
+      return new BufferHeader(
+          byteBuf.readInt(),
+          byteBuf.readInt(),
+          byteBuf.readInt(),
+          byteBuf.readInt(),
+          Buffer.DataType.values()[byteBuf.readByte()],
+          byteBuf.readBoolean(),
+          byteBuf.readInt());
+    }
+  }
+
+  public static void reserveNumRequiredBuffers(BufferPool bufferPool, int 
numRequiredBuffers)
+      throws IOException {
+    long startTime = System.nanoTime();
+    List<MemorySegment> buffers = new ArrayList<>(numRequiredBuffers);
+    try {
+      // guarantee that we have at least the minimal number of buffers
+      while (buffers.size() < numRequiredBuffers) {
+        MemorySegment segment = bufferPool.requestMemorySegment();
+        if (segment != null) {
+          buffers.add(segment);
+          continue;
+        }
+
+        Thread.sleep(10);
+        if ((System.nanoTime() - startTime) > 3L * 60 * 1000_000_000) {
+          throw new IOException("Could not allocate the required number of 
buffers in 3 minutes.");
+        }
+      }
+    } catch (Throwable throwable) {
+      throw new IOException(throwable);
+    } finally {
+      buffers.forEach(bufferPool::recycle);
+    }
+  }
+}

Reply via email to