This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-7593 by this push:
     new d8ea69dae7 HDDS-9844. [hsync] De-synchronize write APIs (#6859)
d8ea69dae7 is described below

commit d8ea69dae7a853a03116a47c5f7e67db307f5308
Author: Duong Nguyen <[email protected]>
AuthorDate: Mon Jul 15 14:56:45 2024 -0700

    HDDS-9844. [hsync] De-synchronize write APIs (#6859)
---
 .../hdds/scm/storage/AbstractCommitWatcher.java    |   4 +-
 .../hadoop/hdds/scm/storage/BlockOutputStream.java | 308 ++++++++++++++-------
 .../apache/hadoop/hdds/scm/storage/BufferPool.java | 186 +++++++++----
 .../hadoop/hdds/scm/storage/CommitWatcher.java     |  44 +--
 .../hdds/scm/storage/ECBlockOutputStream.java      |  18 +-
 .../hdds/scm/storage/RatisBlockOutputStream.java   |  23 +-
 .../hadoop/hdds/scm/storage/TestBufferPool.java    | 120 ++++++--
 .../common/ChunkBufferImplWithByteBuffer.java      |   4 +-
 .../client/io/BlockOutputStreamEntryPool.java      |  11 +-
 .../hadoop/ozone/client/io/KeyOutputStream.java    |  29 +-
 .../java/org/apache/hadoop/fs/ozone/TestHSync.java | 142 +++++++---
 .../hadoop/hdds/scm/storage/TestCommitWatcher.java |  13 -
 .../ozone/client/rpc/TestBlockOutputStream.java    |  20 +-
 13 files changed, 588 insertions(+), 334 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
index 957f761ccb..0c6b929344 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
@@ -79,7 +79,7 @@ abstract class AbstractCommitWatcher<BUFFER> {
   }
 
   /** @return the total data which has been acknowledged. */
-  long getTotalAckDataLength() {
+  synchronized long getTotalAckDataLength() {
     return totalAckDataLength;
   }
 
@@ -166,7 +166,7 @@ abstract class AbstractCommitWatcher<BUFFER> {
   /** Release the buffers for the given index. */
   abstract void releaseBuffers(long index);
 
-  void adjustBuffers(long commitIndex) {
+  synchronized void adjustBuffers(long commitIndex) {
     commitIndexMap.keySet().stream()
         .filter(p -> p <= commitIndex)
         .forEach(this::releaseBuffers);
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 2d40439c33..0e8a0d56fe 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -112,8 +112,11 @@ public class BlockOutputStream extends OutputStream {
   private final AtomicReference<IOException> ioException;
   private final ExecutorService responseExecutor;
 
-  // the effective length of data flushed so far
-  private long totalDataFlushedLength;
+  // the effective length of data sent to datanodes (via writeChunk).
+  private long totalWriteChunkLength;
+
+  // The effective length of data flushed to datanodes (via putBlock).
+  private long totalPutBlockLength;
 
   // effective data write attempted so far for the block
   private long writtenDataLength;
@@ -146,6 +149,8 @@ public class BlockOutputStream extends OutputStream {
   private final ContainerClientMetrics clientMetrics;
   private boolean allowPutBlockPiggybacking;
 
+  private CompletableFuture<Void> lastFlushFuture;
+
   /**
    * Creates a new BlockOutputStream.
    *
@@ -208,7 +213,8 @@ public class BlockOutputStream extends OutputStream {
 
     this.responseExecutor = blockOutputStreamResourceProvider.get();
     bufferList = null;
-    totalDataFlushedLength = 0;
+    totalWriteChunkLength = 0;
+    totalPutBlockLength = 0;
     writtenDataLength = 0;
     failedServers = new ArrayList<>(0);
     ioException = new AtomicReference<>(null);
@@ -233,7 +239,7 @@ public class BlockOutputStream extends OutputStream {
     return true;
   }
 
-  void refreshCurrentBuffer() {
+  synchronized void refreshCurrentBuffer() {
     currentBuffer = bufferPool.getCurrentBuffer();
     currentBufferRemaining =
         currentBuffer != null ? currentBuffer.remaining() : 0;
@@ -247,7 +253,7 @@ public class BlockOutputStream extends OutputStream {
     return 0;
   }
 
-  public long getWrittenDataLength() {
+  public synchronized long getWrittenDataLength() {
     return writtenDataLength;
   }
 
@@ -262,7 +268,7 @@ public class BlockOutputStream extends OutputStream {
 
   @VisibleForTesting
   public long getTotalDataFlushedLength() {
-    return totalDataFlushedLength;
+    return totalPutBlockLength;
   }
 
   @VisibleForTesting
@@ -293,17 +299,21 @@ public class BlockOutputStream extends OutputStream {
   @Override
   public void write(int b) throws IOException {
     checkOpen();
-    allocateNewBufferIfNeeded();
-    currentBuffer.put((byte) b);
-    currentBufferRemaining--;
-    writeChunkIfNeeded();
-    writtenDataLength++;
-    doFlushOrWatchIfNeeded();
+    synchronized (this) {
+      allocateNewBufferIfNeeded();
+      currentBuffer.put((byte) b);
+      currentBufferRemaining--;
+      updateWrittenDataLength(1);
+      writeChunkIfNeeded();
+      doFlushOrWatchIfNeeded();
+    }
   }
 
   private void writeChunkIfNeeded() throws IOException {
     if (currentBufferRemaining == 0) {
+      LOG.debug("WriteChunk from write(), buffer = {}", currentBuffer);
       writeChunk(currentBuffer);
+      updateWriteChunkLength();
     }
   }
 
@@ -315,51 +325,66 @@ public class BlockOutputStream extends OutputStream {
     }
     if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
         || ((off + len) < 0)) {
-      throw new IndexOutOfBoundsException();
+      throw new IndexOutOfBoundsException("Offset=" + off + " and len="
+          + len + " don't match the array length of " + b.length);
     }
     if (len == 0) {
       return;
     }
-
-    while (len > 0) {
-      allocateNewBufferIfNeeded();
-      final int writeLen = Math.min(currentBufferRemaining, len);
-      currentBuffer.put(b, off, writeLen);
-      currentBufferRemaining -= writeLen;
-      writeChunkIfNeeded();
-      off += writeLen;
-      len -= writeLen;
-      updateWrittenDataLength(writeLen);
-      doFlushOrWatchIfNeeded();
+    synchronized (this) {
+      while (len > 0) {
+        allocateNewBufferIfNeeded();
+        final int writeLen = Math.min(currentBufferRemaining, len);
+        currentBuffer.put(b, off, writeLen);
+        currentBufferRemaining -= writeLen;
+        updateWrittenDataLength(writeLen);
+        writeChunkIfNeeded();
+        off += writeLen;
+        len -= writeLen;
+        doFlushOrWatchIfNeeded();
+      }
     }
   }
 
-  public void updateWrittenDataLength(int writeLen) {
+  protected synchronized void updateWrittenDataLength(int writeLen) {
     writtenDataLength += writeLen;
   }
 
   private void doFlushOrWatchIfNeeded() throws IOException {
     if (currentBufferRemaining == 0) {
       if (bufferPool.getNumberOfUsedBuffers() % flushPeriod == 0) {
-        updateFlushLength();
-        executePutBlock(false, false);
+        updatePutBlockLength();
+        CompletableFuture<PutBlockResult> putBlockFuture = 
executePutBlock(false, false);
+        this.lastFlushFuture = watchForCommitAsync(putBlockFuture);
       }
-      // Data in the bufferPool can not exceed streamBufferMaxSize
-      if (bufferPool.getNumberOfUsedBuffers() == bufferPool.getCapacity()) {
+
+      if (bufferPool.isAtCapacity()) {
         handleFullBuffer();
       }
     }
   }
 
-  private void allocateNewBufferIfNeeded() {
+  private void allocateNewBufferIfNeeded() throws IOException {
     if (currentBufferRemaining == 0) {
-      currentBuffer = bufferPool.allocateBuffer(config.getBufferIncrement());
-      currentBufferRemaining = currentBuffer.remaining();
+      try {
+        currentBuffer = bufferPool.allocateBuffer(config.getBufferIncrement());
+        currentBufferRemaining = currentBuffer.remaining();
+        LOG.debug("Allocated new buffer {}, used = {}, capacity = {}", 
currentBuffer,
+            bufferPool.getNumberOfUsedBuffers(), bufferPool.getCapacity());
+      } catch (InterruptedException e) {
+        handleInterruptedException(e, false);
+      }
     }
   }
 
-  private void updateFlushLength() {
-    totalDataFlushedLength = writtenDataLength;
+  private void updateWriteChunkLength() {
+    Preconditions.checkState(Thread.holdsLock(this));
+    totalWriteChunkLength = writtenDataLength;
+  }
+
+  private void updatePutBlockLength() {
+    Preconditions.checkState(Thread.holdsLock(this));
+    totalPutBlockLength = totalWriteChunkLength;
   }
 
   /**
@@ -370,7 +395,7 @@ public class BlockOutputStream extends OutputStream {
    */
 
   // In this case, the data is already cached in the currentBuffer.
-  public void writeOnRetry(long len) throws IOException {
+  public synchronized void writeOnRetry(long len) throws IOException {
     if (len == 0) {
       return;
     }
@@ -379,8 +404,9 @@ public class BlockOutputStream extends OutputStream {
     }
     Preconditions.checkArgument(len <= 
streamBufferArgs.getStreamBufferMaxSize());
     int count = 0;
+    List<ChunkBuffer> allocatedBuffers = bufferPool.getAllocatedBuffers();
     while (len > 0) {
-      ChunkBuffer buffer = bufferPool.getBuffer(count);
+      ChunkBuffer buffer = allocatedBuffers.get(count);
       long writeLen = Math.min(buffer.position(), len);
       if (!buffer.hasRemaining()) {
         writeChunk(buffer);
@@ -396,8 +422,10 @@ public class BlockOutputStream extends OutputStream {
       if (writtenDataLength % streamBufferArgs.getStreamBufferFlushSize() == 
0) {
         // reset the position to zero as now we will be reading the
         // next buffer in the list
-        updateFlushLength();
-        executePutBlock(false, false);
+        updateWriteChunkLength();
+        updatePutBlockLength();
+        CompletableFuture<PutBlockResult> putBlockResultFuture = 
executePutBlock(false, false);
+        lastFlushFuture = watchForCommitAsync(putBlockResultFuture);
       }
       if (writtenDataLength == streamBufferArgs.getStreamBufferMaxSize()) {
         handleFullBuffer();
@@ -412,20 +440,15 @@ public class BlockOutputStream extends OutputStream {
    * @throws IOException
    */
   private void handleFullBuffer() throws IOException {
-    waitForFlushAndCommit(true);
-  }
-
-  void waitForFlushAndCommit(boolean bufferFull) throws IOException {
     try {
       checkOpen();
-      waitOnFlushFutures();
+      waitOnFlushFuture();
     } catch (ExecutionException e) {
       handleExecutionException(e);
     } catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
       handleInterruptedException(ex, true);
     }
-    watchForCommit(bufferFull);
   }
 
   void releaseBuffersOnException() {
@@ -439,22 +462,19 @@ public class BlockOutputStream extends OutputStream {
     refreshCurrentBuffer();
   }
 
-  XceiverClientReply sendWatchForCommit(boolean bufferFull)
+  /**
+   * Watch for a specific commit index.
+   */
+  XceiverClientReply sendWatchForCommit(long commitIndex)
       throws IOException {
     return null;
   }
 
-  /**
-   * calls watchForCommit API of the Ratis Client. For Standalone client,
-   * it is a no op.
-   * @param bufferFull flag indicating whether bufferFull condition is hit or
-   *              its called as part flush/close
-   * @throws IOException IOException in case watch gets timed out
-   */
-  private void watchForCommit(boolean bufferFull) throws IOException {
+  private void watchForCommit(long commitIndex) throws IOException {
     checkOpen();
     try {
-      final XceiverClientReply reply = sendWatchForCommit(bufferFull);
+      LOG.debug("Entering watchForCommit commitIndex = {}", commitIndex);
+      final XceiverClientReply reply = sendWatchForCommit(commitIndex);
       if (reply != null) {
         List<DatanodeDetails> dnList = reply.getDatanodes();
         if (!dnList.isEmpty()) {
@@ -469,7 +489,7 @@ public class BlockOutputStream extends OutputStream {
       setIoException(ioe);
       throw getIoException();
     }
-    refreshCurrentBuffer();
+    LOG.debug("Leaving watchForCommit commitIndex = {}", commitIndex);
   }
 
   void updateCommitInfo(XceiverClientReply reply, List<ChunkBuffer> buffers) {
@@ -480,11 +500,10 @@ public class BlockOutputStream extends OutputStream {
    * @param force true if no data was written since most recent putBlock and
    *            stream is being closed
    */
-  CompletableFuture<ContainerProtos.
-      ContainerCommandResponseProto> executePutBlock(boolean close,
+  CompletableFuture<PutBlockResult> executePutBlock(boolean close,
       boolean force) throws IOException {
     checkOpen();
-    long flushPos = totalDataFlushedLength;
+    long flushPos = totalWriteChunkLength;
     final List<ChunkBuffer> byteBufferList;
     if (!force) {
       Preconditions.checkNotNull(bufferList);
@@ -495,8 +514,8 @@ public class BlockOutputStream extends OutputStream {
       byteBufferList = null;
     }
 
-    CompletableFuture<ContainerProtos.
-        ContainerCommandResponseProto> flushFuture = null;
+    final CompletableFuture<ContainerCommandResponseProto> flushFuture;
+    final XceiverClientReply asyncReply;
     try {
       BlockData blockData = containerBlockData.build();
       LOG.debug("sending PutBlock {}", blockData);
@@ -507,10 +526,8 @@ public class BlockOutputStream extends OutputStream {
         containerBlockData.clearChunks();
       }
 
-      XceiverClientReply asyncReply =
-          putBlockAsync(xceiverClient, blockData, close, tokenString);
-      CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
-          asyncReply.getResponse();
+      asyncReply = putBlockAsync(xceiverClient, blockData, close, tokenString);
+      CompletableFuture<ContainerCommandResponseProto> future = 
asyncReply.getResponse();
       flushFuture = future.thenApplyAsync(e -> {
         try {
           validateResponse(e);
@@ -537,13 +554,10 @@ public class BlockOutputStream extends OutputStream {
     } catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
       handleInterruptedException(ex, false);
+      // never reach, just to make compiler happy.
+      return null;
     }
-    putFlushFuture(flushPos, flushFuture);
-    return flushFuture;
-  }
-
-  void putFlushFuture(long flushPos,
-      CompletableFuture<ContainerCommandResponseProto> flushFuture) {
+    return flushFuture.thenApply(r -> new PutBlockResult(flushPos, 
asyncReply.getLogIndex(), r));
   }
 
   @Override
@@ -551,12 +565,15 @@ public class BlockOutputStream extends OutputStream {
     if (xceiverClientFactory != null && xceiverClient != null
         && bufferPool != null && bufferPool.getSize() > 0
         && (!streamBufferArgs.isStreamBufferFlushDelay() ||
-            writtenDataLength - totalDataFlushedLength
-                >= streamBufferArgs.getStreamBufferSize())) {
+            unflushedLength() >= streamBufferArgs.getStreamBufferSize())) {
       handleFlush(false);
     }
   }
 
+  private synchronized long unflushedLength() {
+    return writtenDataLength - totalPutBlockLength;
+  }
+
   private void writeChunkCommon(ChunkBuffer buffer)
       throws IOException {
     // This data in the buffer will be pushed to datanode and a reference will
@@ -570,16 +587,16 @@ public class BlockOutputStream extends OutputStream {
     bufferList.add(buffer);
   }
 
-  private void writeChunk(ChunkBuffer buffer)
-      throws IOException {
+  private void writeChunk(ChunkBuffer buffer) throws IOException {
     writeChunkCommon(buffer);
     writeChunkToContainer(buffer.duplicate(0, buffer.position()), false, 
false);
   }
 
-  private void writeChunkAndPutBlock(ChunkBuffer buffer, boolean close)
+  private CompletableFuture<PutBlockResult> writeChunkAndPutBlock(ChunkBuffer 
buffer, boolean close)
       throws IOException {
+    LOG.debug("WriteChunk and Putblock from flush, buffer={}", buffer);
     writeChunkCommon(buffer);
-    writeChunkToContainer(buffer.duplicate(0, buffer.position()), true, close);
+    return writeChunkToContainer(buffer.duplicate(0, buffer.position()), true, 
close);
   }
 
   /**
@@ -607,40 +624,77 @@ public class BlockOutputStream extends OutputStream {
   private void handleFlushInternal(boolean close)
       throws IOException, InterruptedException, ExecutionException {
     checkOpen();
+    LOG.debug("Start handleFlushInternal close={}", close);
+    CompletableFuture<Void> toWaitFor = handleFlushInternalSynchronized(close);
+
+    if (toWaitFor != null) {
+      LOG.debug("Waiting for flush");
+      try {
+        toWaitFor.get();
+      } catch (ExecutionException ex) {
+        if (ex.getCause() instanceof FlushRuntimeException) {
+          throw ((FlushRuntimeException) ex.getCause()).cause;
+        } else {
+          throw ex;
+        }
+      }
+      LOG.debug("Flush done.");
+    }
+  }
+
+  private synchronized CompletableFuture<Void> 
handleFlushInternalSynchronized(boolean close) throws IOException {
+    CompletableFuture<PutBlockResult> putBlockResultFuture = null;
     // flush the last chunk data residing on the currentBuffer
-    if (totalDataFlushedLength < writtenDataLength) {
-      refreshCurrentBuffer();
+    if (totalWriteChunkLength < writtenDataLength) {
       Preconditions.checkArgument(currentBuffer.position() > 0);
 
       // This can be a partially filled chunk. Since we are flushing the buffer
       // here, we just limit this buffer to the current position. So that next
       // write will happen in new buffer
+      updateWriteChunkLength();
+      updatePutBlockLength();
       if (currentBuffer.hasRemaining()) {
         if (allowPutBlockPiggybacking) {
-          updateFlushLength();
-          writeChunkAndPutBlock(currentBuffer, close);
+          putBlockResultFuture = writeChunkAndPutBlock(currentBuffer, close);
         } else {
           writeChunk(currentBuffer);
-          updateFlushLength();
-          executePutBlock(close, false);
+          putBlockResultFuture = executePutBlock(close, false);
+        }
+        if (!close) {
+          // reset current buffer so that the next write will allocate a new 
one.
+          currentBuffer = null;
+          currentBufferRemaining = 0;
         }
       } else {
-        updateFlushLength();
-        executePutBlock(close, false);
+        putBlockResultFuture = executePutBlock(close, false);
+        // set lastFuture.
       }
+    } else if (totalPutBlockLength < totalWriteChunkLength) {
+      // There're no pending written data, but there're uncommitted data.
+      updatePutBlockLength();
+      putBlockResultFuture = executePutBlock(close, false);
     } else if (close) {
       // forcing an "empty" putBlock if stream is being closed without new
       // data since latest flush - we need to send the "EOF" flag
-      executePutBlock(true, true);
+      updatePutBlockLength();
+      putBlockResultFuture = executePutBlock(true, true);
+    } else {
+      LOG.debug("Flushing without data");
+    }
+    if (putBlockResultFuture != null) {
+      this.lastFlushFuture = watchForCommitAsync(putBlockResultFuture);
     }
-    waitOnFlushFutures();
-    watchForCommit(false);
-    // just check again if the exception is hit while waiting for the
-    // futures to ensure flush has indeed succeeded
+    return lastFlushFuture;
+  }
 
-    // irrespective of whether the commitIndex2flushedDataMap is empty
-    // or not, ensure there is no exception set
-    checkOpen();
+  private CompletableFuture<Void> 
watchForCommitAsync(CompletableFuture<PutBlockResult> putBlockResultFuture) {
+    return putBlockResultFuture.thenAccept(x -> {
+      try {
+        watchForCommit(x.commitIndex);
+      } catch (IOException e) {
+        throw new FlushRuntimeException(e);
+      }
+    });
   }
 
   @Override
@@ -658,11 +712,11 @@ public class BlockOutputStream extends OutputStream {
     }
   }
 
-  void waitOnFlushFutures() throws InterruptedException, ExecutionException {
+  void waitOnFlushFuture() throws InterruptedException, ExecutionException {
   }
 
   void validateResponse(
-      ContainerProtos.ContainerCommandResponseProto responseProto)
+      ContainerCommandResponseProto responseProto)
       throws IOException {
     try {
       // if the ioException is already set, it means a prev request has failed
@@ -742,7 +796,20 @@ public class BlockOutputStream extends OutputStream {
    * checksum
    * @return
    */
-  CompletableFuture<ContainerCommandResponseProto> writeChunkToContainer(
+  CompletableFuture<ContainerCommandResponseProto> 
writeChunkToContainer(ChunkBuffer chunk) throws IOException {
+    return writeChunkToContainer(chunk, false, false).thenApply(x -> 
x.response);
+  }
+
+  /**
+   * Writes buffered data as a new chunk to the container and saves chunk
+   * information to be used later in putKey call.
+   *
+   * @throws IOException if there is an I/O error while performing the call
+   * @throws OzoneChecksumException if there is an error while computing
+   * checksum
+   * @return
+   */
+  private CompletableFuture<PutBlockResult> writeChunkToContainer(
       ChunkBuffer chunk, boolean putBlockPiggybacking, boolean close) throws 
IOException {
     int effectiveChunkSize = chunk.remaining();
     final long offset = chunkOffset.getAndAdd(effectiveChunkSize);
@@ -756,7 +823,7 @@ public class BlockOutputStream extends OutputStream {
         .setChecksumData(checksumData.getProtoBufMessage())
         .build();
 
-    long flushPos = totalDataFlushedLength;
+    long flushPos = totalWriteChunkLength;
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Writing chunk {} length {} at offset {}",
@@ -776,8 +843,9 @@ public class BlockOutputStream extends OutputStream {
     }
 
     final List<ChunkBuffer> byteBufferList;
-    CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+    CompletableFuture<ContainerCommandResponseProto>
         validateFuture = null;
+    XceiverClientReply asyncReply;
     try {
       BlockData blockData = null;
 
@@ -804,9 +872,9 @@ public class BlockOutputStream extends OutputStream {
         byteBufferList = null;
       }
 
-      XceiverClientReply asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
+      asyncReply = writeChunkAsync(xceiverClient, chunkInfo,
           blockID.get(), data, tokenString, replicationIndex, blockData, 
close);
-      CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+      CompletableFuture<ContainerCommandResponseProto>
           respFuture = asyncReply.getResponse();
       validateFuture = respFuture.thenApplyAsync(e -> {
         try {
@@ -835,11 +903,10 @@ public class BlockOutputStream extends OutputStream {
     } catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
       handleInterruptedException(ex, false);
+      // never reach.
+      return null;
     }
-    if (putBlockPiggybacking) {
-      putFlushFuture(flushPos, validateFuture);
-    }
-    return validateFuture;
+    return validateFuture.thenApply(x -> new PutBlockResult(flushPos, 
asyncReply.getLogIndex(), x));
   }
 
   private void handleSuccessfulPutBlock(
@@ -856,9 +923,8 @@ public class BlockOutputStream extends OutputStream {
       LOG.debug(
           "Adding index " + asyncReply.getLogIndex() + " flushLength "
               + flushPos + " numBuffers " + byteBufferList.size()
-              + " blockID " + blockID + " bufferPool size" + bufferPool
-              .getSize() + " currentBufferIndex " + bufferPool
-              .getCurrentBufferIndex());
+              + " blockID " + blockID + " bufferPool size " + bufferPool
+              .getSize());
     }
     // for standalone protocol, logIndex will always be 0.
     updateCommitInfo(asyncReply, byteBufferList);
@@ -1053,6 +1119,10 @@ public class BlockOutputStream extends OutputStream {
     throw getIoException();
   }
 
+  protected synchronized CompletableFuture<Void> getLastFlushFuture() {
+    return lastFlushFuture;
+  }
+
   /**
    * Get the Replication Index.
    * @return replicationIndex
@@ -1060,4 +1130,30 @@ public class BlockOutputStream extends OutputStream {
   public int getReplicationIndex() {
     return replicationIndex;
   }
+
+  static class PutBlockResult {
+    private final long flushPosition;
+    private final long commitIndex;
+    private final ContainerCommandResponseProto response;
+
+    PutBlockResult(long flushPosition, long commitIndex, 
ContainerCommandResponseProto response) {
+      this.flushPosition = flushPosition;
+      this.commitIndex = commitIndex;
+      this.response = response;
+    }
+
+    ContainerCommandResponseProto getResponse() {
+      return response;
+    }
+  }
+
+  /**
+   * RuntimeException to wrap watchForCommit errors when running 
asynchronously.
+   */
+  private static class FlushRuntimeException extends RuntimeException {
+    private final IOException cause;
+    FlushRuntimeException(IOException cause) {
+      this.cause = cause;
+    }
+  }
 }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
index b68b56f67c..abacf318c4 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
@@ -20,30 +20,45 @@ package org.apache.hadoop.hdds.scm.storage;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.scm.ByteStringConversion;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.Preconditions;
-
-import static java.util.Collections.emptyList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * This class creates and manages pool of n buffers.
+ * A bounded pool implementation that provides {@link ChunkBuffer}s. This pool 
allows allocating and releasing
+ * {@link ChunkBuffer}.
+ * This pool is designed for concurrent access to allocation and release. It 
imposes a maximum number of buffers to be
+ * allocated at the same time and once the limit has been approached, the 
thread requesting a new allocation needs to
+ * wait until a allocated buffer is released.
  */
 public class BufferPool {
+  public static final Logger LOG = LoggerFactory.getLogger(BufferPool.class);
 
   private static final BufferPool EMPTY = new BufferPool(0, 0);
-
-  private final List<ChunkBuffer> bufferList;
-  private int currentBufferIndex;
   private final int bufferSize;
   private final int capacity;
   private final Function<ByteBuffer, ByteString> byteStringConversion;
 
+  private final LinkedList<ChunkBuffer> allocated = new LinkedList<>();
+  private final LinkedList<ChunkBuffer> released = new LinkedList<>();
+  private ChunkBuffer currentBuffer = null;
+  private final Lock lock = new ReentrantLock();
+  private final Condition notFull = lock.newCondition();
+
+
   public static BufferPool empty() {
     return EMPTY;
   }
@@ -57,8 +72,6 @@ public class BufferPool {
       Function<ByteBuffer, ByteString> byteStringConversion) {
     this.capacity = capacity;
     this.bufferSize = bufferSize;
-    bufferList = capacity == 0 ? emptyList() : new ArrayList<>(capacity);
-    currentBufferIndex = -1;
     this.byteStringConversion = byteStringConversion;
   }
 
@@ -67,81 +80,135 @@ public class BufferPool {
   }
 
   ChunkBuffer getCurrentBuffer() {
-    return currentBufferIndex == -1 ? null : 
bufferList.get(currentBufferIndex);
+    return doInLock(() -> currentBuffer);
   }
 
   /**
-   * If the currentBufferIndex is less than the buffer size - 1,
-   * it means, the next buffer in the list has been freed up for
-   * rewriting. Reuse the next available buffer in such cases.
-   * <p>
-   * In case, the currentBufferIndex == buffer.size and buffer size is still
-   * less than the capacity to be allocated, just allocate a buffer of size
-   * chunk size.
+   * Allocate a new {@link ChunkBuffer}, waiting for a buffer to be released 
when this pool already allocates at
+   * capacity.
    */
-  public ChunkBuffer allocateBuffer(int increment) {
-    final int nextBufferIndex = currentBufferIndex + 1;
-
-    Preconditions.assertTrue(nextBufferIndex < capacity, () ->
-        "next index: " + nextBufferIndex + " >= capacity: " + capacity);
-
-    currentBufferIndex = nextBufferIndex;
-
-    if (currentBufferIndex < bufferList.size()) {
-      return getBuffer(currentBufferIndex);
-    } else {
-      final ChunkBuffer newBuffer = ChunkBuffer.allocate(bufferSize, 
increment);
-      bufferList.add(newBuffer);
-      return newBuffer;
+  public ChunkBuffer allocateBuffer(int increment) throws InterruptedException 
{
+    lock.lockInterruptibly();
+    try {
+      Preconditions.assertTrue(allocated.size() + released.size() <= capacity, 
() ->
+          "Total created buffer must not exceed capacity.");
+
+      while (allocated.size() == capacity) {
+        LOG.debug("Allocation needs to wait the pool is at capacity (allocated 
= capacity = {}).", capacity);
+        notFull.await();
+      }
+      // Get a buffer to allocate, preferably from the released ones.
+      final ChunkBuffer buffer = released.isEmpty() ?
+          ChunkBuffer.allocate(bufferSize, increment) : released.removeFirst();
+      allocated.add(buffer);
+      currentBuffer = buffer;
+
+      LOG.debug("Allocated new buffer {}, number of used buffers {}, capacity 
{}.",
+          buffer, allocated.size(), capacity);
+      return buffer;
+    } finally {
+      lock.unlock();
     }
   }
 
-  void releaseBuffer(ChunkBuffer chunkBuffer) {
-    Preconditions.assertTrue(!bufferList.isEmpty(), "empty buffer list");
-    Preconditions.assertSame(bufferList.get(0), chunkBuffer,
-        "only the first buffer can be released");
-    Preconditions.assertTrue(currentBufferIndex >= 0,
-        () -> "current buffer: " + currentBufferIndex);
+  void releaseBuffer(ChunkBuffer buffer) {
+    LOG.debug("Releasing buffer {}", buffer);
+    lock.lock();
+    try {
+      Preconditions.assertTrue(removeByIdentity(allocated, buffer), "Releasing 
unknown buffer");
+      buffer.clear();
+      released.add(buffer);
+      if (buffer == currentBuffer) {
+        currentBuffer = null;
+      }
+      notFull.signal();
+    } finally {
+      lock.unlock();
+    }
+  }
 
-    // always remove from head of the list and append at last
-    final ChunkBuffer buffer = bufferList.remove(0);
-    buffer.clear();
-    bufferList.add(buffer);
-    currentBufferIndex--;
+  /**
+   * Remove an item from a list by identity.
+   * @return true if the item is found and removed from the list, otherwise 
false.
+   */
+  private static <T> boolean removeByIdentity(List<T> list, T toRemove) {
+    int i = 0;
+    for (T item : list) {
+      if (item == toRemove) {
+        break;
+      } else {
+        i++;
+      }
+    }
+    if (i < list.size()) {
+      list.remove(i);
+      return true;
+    }
+    return false;
   }
 
-  public void clearBufferPool() {
-    bufferList.forEach(ChunkBuffer::close);
-    bufferList.clear();
-    currentBufferIndex = -1;
+  /**
+   * Wait until one buffer is available.
+   * @throws InterruptedException
+   */
+  @VisibleForTesting
+  public void waitUntilAvailable() throws InterruptedException {
+    lock.lockInterruptibly();
+    try {
+      while (allocated.size() == capacity) {
+        notFull.await();
+      }
+    } finally {
+      lock.unlock();
+    }
   }
 
-  public void checkBufferPoolEmpty() {
-    Preconditions.assertSame(0, computeBufferData(), "total buffer size");
+  public void clearBufferPool() {
+    lock.lock();
+    try {
+      allocated.forEach(ChunkBuffer::close);
+      released.forEach(ChunkBuffer::close);
+      allocated.clear();
+      released.clear();
+      currentBuffer = null;
+    } finally {
+      lock.unlock();
+    }
   }
 
   public long computeBufferData() {
-    long totalBufferSize = 0;
-    for (ChunkBuffer buf : bufferList) {
-      totalBufferSize += buf.position();
-    }
-    return totalBufferSize;
+    return doInLock(() -> {
+      long totalBufferSize = 0;
+      for (ChunkBuffer buf : allocated) {
+        totalBufferSize += buf.position();
+      }
+      return totalBufferSize;
+    });
   }
 
   public int getSize() {
-    return bufferList.size();
+    return doInLock(() -> allocated.size() + released.size());
   }
 
-  public ChunkBuffer getBuffer(int index) {
-    return bufferList.get(index);
+  public List<ChunkBuffer> getAllocatedBuffers() {
+    return doInLock(() -> new ArrayList<>(allocated));
   }
 
-  int getCurrentBufferIndex() {
-    return currentBufferIndex;
+  public int getNumberOfUsedBuffers() {
+    return doInLock(allocated::size);
   }
 
-  public int getNumberOfUsedBuffers() {
-    return currentBufferIndex + 1;
+  private <T> T doInLock(Supplier<T> supplier) {
+    lock.lock();
+    try {
+      return supplier.get();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  public boolean isAtCapacity() {
+    return getNumberOfUsedBuffers() == capacity;
   }
 
   public int getCapacity() {
@@ -151,4 +218,5 @@ public class BufferPool {
   public int getBufferSize() {
     return bufferSize;
   }
+
 }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
index aa339409ec..7c3bb40ef3 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
@@ -24,17 +24,9 @@
  */
 package org.apache.hadoop.hdds.scm.storage;
 
-import com.google.common.annotations.VisibleForTesting;
-import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-
 /**
  * This class executes watchForCommit on ratis pipeline and releases
  * buffers once data successfully gets replicated.
@@ -43,10 +35,6 @@ class CommitWatcher extends 
AbstractCommitWatcher<ChunkBuffer> {
   // A reference to the pool of buffers holding the data
   private final BufferPool bufferPool;
 
-  // future Map to hold up all putBlock futures
-  private final ConcurrentMap<Long, 
CompletableFuture<ContainerCommandResponseProto>>
-      futureMap = new ConcurrentHashMap<>();
-
   CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient) {
     super(xceiverClient);
     this.bufferPool = bufferPool;
@@ -59,37 +47,17 @@ class CommitWatcher extends 
AbstractCommitWatcher<ChunkBuffer> {
       acked += buffer.position();
       bufferPool.releaseBuffer(buffer);
     }
-    final long totalLength = addAckDataLength(acked);
-    // When putBlock is called, a future is added.
-    // When putBlock is replied, the future is removed below.
-    // Therefore, the removed future should not be null.
-    final CompletableFuture<ContainerCommandResponseProto> removed =
-        futureMap.remove(totalLength);
-    Objects.requireNonNull(removed, () -> "Future not found for "
-        + totalLength + ": existing = " + futureMap.keySet());
-  }
-
-  @VisibleForTesting
-  ConcurrentMap<Long, CompletableFuture<ContainerCommandResponseProto>> 
getFutureMap() {
-    return futureMap;
-  }
-
-  public void putFlushFuture(long flushPos, 
CompletableFuture<ContainerCommandResponseProto> flushFuture) {
-    futureMap.compute(flushPos,
-        (key, previous) -> previous == null ? flushFuture :
-            previous.thenCombine(flushFuture, (prev, curr) -> curr));
-  }
-
+    // TODO move the flush future map to BOS:
+    //  When there are concurrent watchForCommits, there's no guarantee of the 
order of execution
+    //  and the following logic to address the flushed length become 
irrelevant.
+    //  The flush future should be handled by BlockOutputStream and use the 
flushIndex which is a result of
+    //  executePutBlock.
 
-  public void waitOnFlushFutures() throws InterruptedException, 
ExecutionException {
-    // wait for all the transactions to complete
-    CompletableFuture.allOf(futureMap.values().toArray(
-        new CompletableFuture[0])).get();
+    addAckDataLength(acked);
   }
 
   @Override
   public void cleanup() {
     super.cleanup();
-    futureMap.clear();
   }
 }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
index 843727cef2..bbb3f30687 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ECBlockOutputStream.java
@@ -87,16 +87,16 @@ public class ECBlockOutputStream extends BlockOutputStream {
   }
 
   @Override
-  public void write(byte[] b, int off, int len) throws IOException {
+  public synchronized void write(byte[] b, int off, int len) throws 
IOException {
     this.currentChunkRspFuture =
         writeChunkToContainer(
-            ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)), false, false);
+            ChunkBuffer.wrap(ByteBuffer.wrap(b, off, len)));
     updateWrittenDataLength(len);
   }
 
   public CompletableFuture<ContainerProtos.ContainerCommandResponseProto> 
write(
       ByteBuffer buff) throws IOException {
-    return writeChunkToContainer(ChunkBuffer.wrap(buff), false, false);
+    return writeChunkToContainer(ChunkBuffer.wrap(buff));
   }
 
   public CompletableFuture<ContainerProtos.
@@ -199,7 +199,7 @@ public class ECBlockOutputStream extends BlockOutputStream {
       ContainerCommandResponseProto> executePutBlock(boolean close,
       boolean force, long blockGroupLength) throws IOException {
     updateBlockGroupLengthInPutBlockMeta(blockGroupLength);
-    return executePutBlock(close, force);
+    return executePutBlock(close, 
force).thenApply(PutBlockResult::getResponse);
   }
 
   private void updateBlockGroupLengthInPutBlockMeta(final long blockGroupLen) {
@@ -232,13 +232,13 @@ public class ECBlockOutputStream extends 
BlockOutputStream {
    * @param force true if no data was written since most recent putBlock and
    *            stream is being closed
    */
-  public CompletableFuture<ContainerProtos.
-      ContainerCommandResponseProto> executePutBlock(boolean close,
+  @Override
+  public CompletableFuture<PutBlockResult> executePutBlock(boolean close,
       boolean force) throws IOException {
     checkOpen();
 
     CompletableFuture<ContainerProtos.
-        ContainerCommandResponseProto> flushFuture = null;
+        ContainerCommandResponseProto> flushFuture;
     try {
       ContainerProtos.BlockData blockData = getContainerBlockData().build();
       XceiverClientReply asyncReply =
@@ -273,9 +273,11 @@ public class ECBlockOutputStream extends BlockOutputStream 
{
     } catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
       handleInterruptedException(ex, false);
+      // never reach.
+      return null;
     }
     this.putBlkRspFuture = flushFuture;
-    return flushFuture;
+    return flushFuture.thenApply(r -> new PutBlockResult(0, 0, r));
   }
 
   /**
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
index b587b1d131..c0e99a5b4a 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.storage;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.hdds.client.BlockID;
-import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.StreamBufferArgs;
@@ -102,9 +101,8 @@ public class RatisBlockOutputStream extends 
BlockOutputStream
   }
 
   @Override
-  XceiverClientReply sendWatchForCommit(boolean bufferFull) throws IOException 
{
-    return bufferFull ? commitWatcher.watchOnFirstIndex()
-        : commitWatcher.watchOnLastIndex();
+  XceiverClientReply sendWatchForCommit(long commitIndex) throws IOException {
+    return commitWatcher.watchForCommit(commitIndex);
   }
 
   @Override
@@ -113,13 +111,11 @@ public class RatisBlockOutputStream extends 
BlockOutputStream
   }
 
   @Override
-  void putFlushFuture(long flushPos, 
CompletableFuture<ContainerCommandResponseProto> flushFuture) {
-    commitWatcher.putFlushFuture(flushPos, flushFuture);
-  }
-
-  @Override
-  void waitOnFlushFutures() throws InterruptedException, ExecutionException {
-    commitWatcher.waitOnFlushFutures();
+  void waitOnFlushFuture() throws InterruptedException, ExecutionException {
+    CompletableFuture<Void> flushFuture = getLastFlushFuture();
+    if (flushFuture != null) {
+      flushFuture.get();
+    }
   }
 
   @Override
@@ -135,10 +131,7 @@ public class RatisBlockOutputStream extends 
BlockOutputStream
   @Override
   public void hsync() throws IOException {
     if (!isClosed()) {
-      if (getBufferPool() != null && getBufferPool().getSize() > 0) {
-        handleFlush(false);
-      }
-      waitForFlushAndCommit(false);
+      handleFlush(false);
     }
   }
 }
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBufferPool.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBufferPool.java
index b56c503df9..e92b96ebce 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBufferPool.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBufferPool.java
@@ -20,38 +20,128 @@ package org.apache.hadoop.hdds.scm.storage;
 
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.GenericTestUtils.LogCapturer;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.slf4j.event.Level;
 
 import java.util.Deque;
 import java.util.LinkedList;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Test for {@link BufferPool}.
  */
 class TestBufferPool {
 
+  @BeforeAll
+  static void init() {
+    GenericTestUtils.setLogLevel(BufferPool.LOG, Level.DEBUG);
+  }
+
   @Test
-  void testBufferPool() {
+  void testBufferPool() throws Exception {
     testBufferPool(BufferPool.empty());
     testBufferPool(1, 1);
     testBufferPool(3, 1 << 20);
     testBufferPool(10, 1 << 10);
   }
 
-  private static void testBufferPool(final int capacity, final int bufferSize) 
{
+  @Test
+  void testBufferPoolConcurrently() throws Exception {
+    final BufferPool pool = new BufferPool(1 << 20, 10);
+    final Deque<ChunkBuffer> buffers = assertAllocate(pool);
+
+    assertAllocationBlocked(pool);
+    assertAllocationBlockedUntilReleased(pool, buffers);
+  }
+
+  private void assertAllocationBlockedUntilReleased(BufferPool pool, 
Deque<ChunkBuffer> buffers) throws Exception {
+    // As the pool is full, allocation will need to wait until a buffer is 
released.
+    assertFull(pool);
+
+    LogCapturer logCapturer = LogCapturer.captureLogs(BufferPool.LOG);
+    AtomicReference<ChunkBuffer> allocated = new AtomicReference<>();
+    AtomicBoolean allocatorStarted = new AtomicBoolean();
+    Thread allocator = new Thread(() -> {
+      try {
+        allocatorStarted.set(true);
+        allocated.set(pool.allocateBuffer(0));
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    });
+    ChunkBuffer toRelease = buffers.removeFirst();
+    Thread releaser = new Thread(() -> pool.releaseBuffer(toRelease));
+
+    allocator.start();
+    // ensure allocator has already started.
+    while (!allocatorStarted.get()) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    releaser.start();
+    allocator.join();
+    assertEquals(toRelease, allocated.get());
+    assertTrue(logCapturer.getOutput().contains("Allocation needs to wait the 
pool is at capacity"));
+  }
+
+  private void assertAllocationBlocked(BufferPool pool) throws Exception {
+    // As the pool is full, new allocation will be blocked interruptably if no 
allocated buffer is released.
+    assertFull(pool);
+
+    LogCapturer logCapturer = LogCapturer.captureLogs(BufferPool.LOG);
+    AtomicBoolean allocatorStarted = new AtomicBoolean();
+    AtomicBoolean interrupted = new AtomicBoolean(false);
+    Thread allocator = new Thread(() -> {
+      try {
+        allocatorStarted.set(true);
+        pool.allocateBuffer(0);
+      } catch (InterruptedException e) {
+        interrupted.set(true);
+      }
+    });
+
+    allocator.start();
+    // ensure allocator has already started.
+    while (!allocatorStarted.get()) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    // Now the allocator is stuck because pool is full and no one releases.
+    // And it can be interrupted.
+    allocator.interrupt();
+    allocator.join();
+    assertTrue(interrupted.get());
+    assertTrue(logCapturer.getOutput().contains("Allocation needs to wait the 
pool is at capacity"));
+  }
+
+  private static void testBufferPool(final int capacity, final int bufferSize) 
throws Exception {
     final BufferPool pool = new BufferPool(bufferSize, capacity);
     assertEquals(capacity, pool.getCapacity());
     assertEquals(bufferSize, pool.getBufferSize());
     testBufferPool(pool);
   }
 
-  private static void testBufferPool(final BufferPool pool) {
+  private static void testBufferPool(final BufferPool pool) throws Exception {
     assertEmpty(pool);
     final Deque<ChunkBuffer> buffers = assertAllocate(pool);
     assertFull(pool);
@@ -62,10 +152,10 @@ class TestBufferPool {
   private static void assertEmpty(BufferPool pool) {
     assertEquals(0, pool.getNumberOfUsedBuffers());
     assertEquals(0, pool.getSize());
-    assertEquals(-1, pool.getCurrentBufferIndex());
+    assertNull(pool.getCurrentBuffer());
   }
 
-  private static Deque<ChunkBuffer> assertAllocate(BufferPool pool) {
+  private static Deque<ChunkBuffer> assertAllocate(BufferPool pool) throws 
Exception {
     final int capacity = pool.getCapacity();
     final int size = pool.getBufferSize();
     final Deque<ChunkBuffer> buffers = new LinkedList<>();
@@ -96,19 +186,13 @@ class TestBufferPool {
     final int capacity = pool.getCapacity();
     assertEquals(capacity, pool.getSize());
     assertEquals(capacity, pool.getNumberOfUsedBuffers());
-    assertThrows(IllegalStateException.class, () -> pool.allocateBuffer(0));
   }
 
-  // buffers are released and reallocated FIFO
+  // buffers are released and reallocated
   private static void assertReallocate(BufferPool pool,
-      Deque<ChunkBuffer> buffers) {
+      Deque<ChunkBuffer> buffers) throws Exception {
     final int capacity = pool.getCapacity();
     for (int i = 0; i < 3 * capacity; i++) {
-      if (capacity > 1) {
-        assertThrows(IllegalStateException.class,
-            () -> pool.releaseBuffer(buffers.getLast()));
-      }
-
       final ChunkBuffer released = buffers.removeFirst();
       pool.releaseBuffer(released);
       assertEquals(0, released.position());
@@ -122,18 +206,20 @@ class TestBufferPool {
     }
   }
 
-  private static void assertRelease(BufferPool pool,
-      Deque<ChunkBuffer> buffers) {
+  private static void assertRelease(BufferPool pool, Deque<ChunkBuffer> 
buffers) {
+    // assert that allocated buffers can be released in any order.
     final int capacity = pool.getCapacity();
+    boolean pickFirst = false;
     for (int i = capacity - 1; i >= 0; i--) {
-      final ChunkBuffer released = buffers.removeFirst();
+      final ChunkBuffer released = pickFirst ? buffers.removeFirst() : 
buffers.removeLast();
+      pickFirst = !pickFirst;
       pool.releaseBuffer(released);
       assertEquals(i, pool.getNumberOfUsedBuffers());
       assertThrows(IllegalStateException.class,
           () -> pool.releaseBuffer(released));
     }
 
-    pool.checkBufferPoolEmpty();
+    assertEquals(0, pool.computeBufferData());
   }
 
   private static void fill(ChunkBuffer buf) {
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
index fe2ee5fa8a..36c16e92bf 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.UUID;
 import java.util.function.Function;
 
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -34,6 +35,7 @@ import org.apache.ratis.util.UncheckedAutoCloseable;
 final class ChunkBufferImplWithByteBuffer implements ChunkBuffer {
   private final ByteBuffer buffer;
   private final UncheckedAutoCloseable underlying;
+  private final UUID identity = UUID.randomUUID();
 
   ChunkBufferImplWithByteBuffer(ByteBuffer buffer) {
     this(buffer, null);
@@ -161,6 +163,6 @@ final class ChunkBufferImplWithByteBuffer implements 
ChunkBuffer {
   @Override
   public String toString() {
     return getClass().getSimpleName() + ":limit=" + buffer.limit()
-        + "@" + Integer.toHexString(hashCode());
+        + "@" + identity;
   }
 }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index bb4e3e3f08..c25b4508e9 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -171,7 +171,7 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
             .build();
   }
 
-  private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
+  private synchronized void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
     Preconditions.checkNotNull(subKeyInfo.getPipeline());
     streamEntries.add(createStreamEntry(subKeyInfo));
   }
@@ -248,7 +248,7 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
    * @param containerID id of the closed container
    * @param pipelineId id of the associated pipeline
    */
-  void discardPreallocatedBlocks(long containerID, PipelineID pipelineId) {
+  synchronized void discardPreallocatedBlocks(long containerID, PipelineID 
pipelineId) {
     // currentStreamIndex < streamEntries.size() signifies that, there are 
still
     // pre allocated blocks available.
 
@@ -283,7 +283,7 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
     return keyArgs.getKeyName();
   }
 
-  long getKeyLength() {
+  synchronized long getKeyLength() {
     return streamEntries.stream()
         .mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
   }
@@ -339,10 +339,7 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
   void hsyncKey(long offset) throws IOException {
     if (keyArgs != null) {
       // in test, this could be null
-      long length = getKeyLength();
-      Preconditions.checkArgument(offset == length,
-              "Expected offset: " + offset + " expected len: " + length);
-      keyArgs.setDataSize(length);
+      keyArgs.setDataSize(offset);
       keyArgs.setLocationInfoList(getLocationInfoList());
       // When the key is multipart upload part file upload, we should not
       // commit the key, as this is not an actual key, this is a just a
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index e080ea2d34..1160d9667a 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -90,7 +90,7 @@ public class KeyOutputStream extends OutputStream
   // how much of data is actually written yet to underlying stream
   private long offset;
   // how much data has been ingested into the stream
-  private long writeOffset;
+  private volatile long writeOffset;
   // whether an exception is encountered while write and whole write could
   // not succeed
   private boolean isException;
@@ -195,7 +195,7 @@ public class KeyOutputStream extends OutputStream
    * @throws IOException
    */
   @Override
-  public synchronized void write(byte[] b, int off, int len)
+  public void write(byte[] b, int off, int len)
       throws IOException {
     checkNotClosed();
     if (b == null) {
@@ -208,8 +208,10 @@ public class KeyOutputStream extends OutputStream
     if (len == 0) {
       return;
     }
-    handleWrite(b, off, len, false);
-    writeOffset += len;
+    synchronized (this) {
+      handleWrite(b, off, len, false);
+      writeOffset += len;
+    }
   }
 
   private void handleWrite(byte[] b, int off, long len, boolean retry)
@@ -361,7 +363,7 @@ public class KeyOutputStream extends OutputStream
     }
   }
 
-  private void markStreamClosed() {
+  private synchronized void markStreamClosed() {
     blockOutputStreamEntryPool.cleanup();
     closed = true;
   }
@@ -435,7 +437,7 @@ public class KeyOutputStream extends OutputStream
   }
 
   @Override
-  public synchronized void flush() throws IOException {
+  public void flush() throws IOException {
     checkNotClosed();
     handleFlushOrClose(StreamAction.FLUSH);
   }
@@ -446,7 +448,7 @@ public class KeyOutputStream extends OutputStream
   }
 
   @Override
-  public synchronized void hsync() throws IOException {
+  public void hsync() throws IOException {
     if (replication.getReplicationType() != ReplicationType.RATIS) {
       throw new UnsupportedOperationException(
           "Replication type is not " + ReplicationType.RATIS);
@@ -460,11 +462,12 @@ public class KeyOutputStream extends OutputStream
 
     handleFlushOrClose(StreamAction.HSYNC);
 
-    Preconditions.checkState(offset >= hsyncPos,
-        "offset = %s < hsyncPos = %s", offset, hsyncPos);
-
-    MetricUtil.captureLatencyNs(clientMetrics::addHsyncLatency,
-        () -> blockOutputStreamEntryPool.hsyncKey(hsyncPos));
+    synchronized (this) {
+      Preconditions.checkState(offset >= hsyncPos,
+          "offset = %s < hsyncPos = %s", offset, hsyncPos);
+      MetricUtil.captureLatencyNs(clientMetrics::addHsyncLatency,
+          () -> blockOutputStreamEntryPool.hsyncKey(hsyncPos));
+    }
   }
 
   /**
@@ -729,7 +732,7 @@ public class KeyOutputStream extends OutputStream
    * the last state of the volatile {@link #closed} field.
    * @throws IOException if the connection is closed.
    */
-  private void checkNotClosed() throws IOException {
+  private synchronized void checkNotClosed() throws IOException {
     if (closed) {
       throw new IOException(
           ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
index 13cae7bff9..2aa6a71d51 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Stream;
 
@@ -44,6 +45,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
@@ -153,7 +155,7 @@ public class TestHSync {
   private static final BucketLayout BUCKET_LAYOUT = 
BucketLayout.FILE_SYSTEM_OPTIMIZED;
 
   private static final int CHUNK_SIZE = 4 << 12;
-  private static final int FLUSH_SIZE = 2 * CHUNK_SIZE;
+  private static final int FLUSH_SIZE = 3 * CHUNK_SIZE;
   private static final int MAX_FLUSH_SIZE = 2 * FLUSH_SIZE;
   private static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE;
   private static final int SERVICE_INTERVAL = 100;
@@ -209,6 +211,8 @@ public class TestHSync {
     GenericTestUtils.setLogLevel(BlockInputStream.LOG, Level.DEBUG);
     GenericTestUtils.setLogLevel(KeyValueHandler.LOG, Level.DEBUG);
 
+    GenericTestUtils.setLogLevel(BufferPool.LOG, Level.DEBUG);
+
     openKeyCleanupService =
         (OpenKeyCleanupService) 
cluster.getOzoneManager().getKeyManager().getOpenKeyCleanupService();
     openKeyCleanupService.suspend();
@@ -627,45 +631,58 @@ public class TestHSync {
     out.writeAndHsync(data);
 
     final byte[] buffer = new byte[4 << 10];
-    int offset = 0;
-    try (FSDataInputStream in = fs.open(file)) {
-      final long skipped = in.skip(length);
-      assertEquals(length, skipped);
-
-      for (; ;) {
-        final int n = in.read(buffer, 0, buffer.length);
-        if (n <= 0) {
-          break;
+    AtomicInteger attempted = new AtomicInteger();
+
+    // hsync returns when all datanodes acknowledge they have received the 
PutBlock request. Yet, the PutBlock
+    // may not been applied to their database. So, an immediate read right 
after hsync may not see the synced data
+    // yet. This does not violate hsync guarantee: it ensures all current 
pending data becomes persistent.
+    GenericTestUtils.waitFor(() -> {
+      attempted.addAndGet(1);
+      int offset = 0;
+      try (FSDataInputStream in = fs.open(file)) {
+        final long skipped = in.skip(length);
+        assertEquals(length, skipped);
+
+        for (; ;) {
+          final int n = in.read(buffer, 0, buffer.length);
+          if (n <= 0) {
+            break;
+          }
+          for (int i = 0; i < n; i++) {
+            assertEquals(data[offset + i], buffer[i], "expected at offset " + 
offset + " i=" + i);
+          }
+          offset += n;
         }
-        for (int i = 0; i < n; i++) {
-          assertEquals(data[offset + i], buffer[i],
-              "expected at offset " + offset + " i=" + i);
+        if (offset != data.length) {
+          LOG.error("Read attempt #{} failed. offset {}, expected data.length 
{}", attempted, offset, data.length);
+          return false;
+        } else {
+          LOG.debug("Read attempt #{} succeeded. offset {}, expected 
data.length {}", attempted, offset, data.length);
+          return true;
         }
-        offset += n;
+      } catch (IOException e) {
+        LOG.error("Exception is thrown during read", e);
+        return false;
       }
-    }
-    assertEquals(data.length, offset);
+    }, 500, 3000);
   }
 
-  private void runConcurrentWriteHSync(Path file,
-      final FSDataOutputStream out, int initialDataSize)
-      throws InterruptedException, IOException {
-    final byte[] data = new byte[initialDataSize];
-    ThreadLocalRandom.current().nextBytes(data);
-
-    AtomicReference<IOException> writerException = new AtomicReference<>();
-    AtomicReference<IOException> syncerException = new AtomicReference<>();
+  private int runConcurrentWriteHSync(Path file,
+      final FSDataOutputStream out, byte[] data, int syncThreadsCount) throws 
Exception {
 
-    LOG.info("runConcurrentWriteHSync {} with size {}",
-        file, initialDataSize);
+    AtomicReference<Exception> writerException = new AtomicReference<>();
+    AtomicReference<Exception> syncerException = new AtomicReference<>();
 
+    LOG.info("runConcurrentWriteHSync {} with size {}", file, data.length);
+    AtomicInteger writes = new AtomicInteger();
     final long start = Time.monotonicNow();
     // two threads: write and hsync
     Runnable writer = () -> {
       while ((Time.monotonicNow() - start < 10000)) {
         try {
           out.write(data);
-        } catch (IOException e) {
+          writes.incrementAndGet();
+        } catch (Exception e) {
           writerException.set(e);
           throw new RuntimeException(e);
         }
@@ -676,7 +693,7 @@ public class TestHSync {
       while ((Time.monotonicNow() - start < 10000)) {
         try {
           out.hsync();
-        } catch (IOException e) {
+        } catch (Exception e) {
           syncerException.set(e);
           throw new RuntimeException(e);
         }
@@ -684,11 +701,19 @@ public class TestHSync {
     };
 
     Thread writerThread = new Thread(writer);
+    writerThread.setName("Writer");
     writerThread.start();
-    Thread syncThread = new Thread(syncer);
-    syncThread.start();
+    Thread[] syncThreads = new Thread[syncThreadsCount];
+    for (int i = 0; i < syncThreadsCount; i++) {
+      syncThreads[i] = new Thread(syncer);
+      syncThreads[i].setName("Syncer-" + i);
+      syncThreads[i].start();
+    }
+
     writerThread.join();
-    syncThread.join();
+    for (Thread sync : syncThreads) {
+      sync.join();
+    }
 
     if (writerException.get() != null) {
       throw writerException.get();
@@ -696,29 +721,54 @@ public class TestHSync {
     if (syncerException.get() != null) {
       throw syncerException.get();
     }
+    return writes.get();
   }
 
-  @Test
-  public void testConcurrentWriteHSync()
-      throws IOException, InterruptedException {
-    final String rootPath = String.format("%s://%s/",
-        OZONE_OFS_URI_SCHEME, CONF.get(OZONE_OM_ADDRESS_KEY));
+  public static Stream<Arguments> concurrentWriteHSync() {
+    return Stream.of(
+        Arguments.of(1, 1, true),
+        Arguments.of(2, 1, true),
+        Arguments.of(8, 2, true),
+        Arguments.of(8, 8, true),
+        Arguments.of(8, 16, true),
+        Arguments.of(1, 1, false),
+        Arguments.of(8, 2, false),
+        Arguments.of(8, 16, false)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource("concurrentWriteHSync")
+  public void testConcurrentWriteHSync(int writeSampleSize, int 
syncThreadsCount,
+      boolean piggybackingEnabled) throws Exception {
+    final String rootPath = String.format("%s://%s/", OZONE_OFS_URI_SCHEME, 
CONF.get(OZONE_OM_ADDRESS_KEY));
+    CONF.setBoolean("ozone.client.stream.putblock.piggybacking", 
piggybackingEnabled);
     CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
 
-    final String dir = OZONE_ROOT + bucket.getVolumeName()
-        + OZONE_URI_DELIMITER + bucket.getName();
+    final String dir = OZONE_ROOT + bucket.getVolumeName() + 
OZONE_URI_DELIMITER + bucket.getName();
 
     try (FileSystem fs = FileSystem.get(CONF)) {
-      for (int i = 0; i < 5; i++) {
-        final Path file = new Path(dir, "file" + i);
-        try (FSDataOutputStream out =
-            fs.create(file, true)) {
-          int initialDataSize = 1 << i;
-          runConcurrentWriteHSync(file, out, initialDataSize);
-        }
+      final Path file = new Path(dir, "file" + writeSampleSize);
+      byte[] data = new byte[writeSampleSize];
+      ThreadLocalRandom.current().nextBytes(data);
+      int writes;
+      try (FSDataOutputStream out = fs.create(file, true)) {
+        writes = runConcurrentWriteHSync(file, out, data, syncThreadsCount);
+      }
+      validateWrittenFile(file, fs, data, writes);
+      fs.delete(file, false);
+    }
+  }
 
-        fs.delete(file, false);
+  private void validateWrittenFile(Path file, FileSystem fs, byte[] expected, 
int times) throws IOException {
+    try (FSDataInputStream is = fs.open(file)) {
+      byte[] actual = new byte[expected.length];
+      for (int i = 0; i < times; i++) {
+        assertEquals(expected.length, is.read(actual));
+        assertArrayEquals(expected, actual);
       }
+      // ensure nothing more can be read.
+      assertEquals(-1, is.read(expected));
     }
   }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
index c3ea911f19..17ca07cd52 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
@@ -209,7 +209,6 @@ public class TestCommitWatcher {
                 return v;
               });
           futures.add(future);
-          watcher.putFlushFuture(length, future);
           replies.add(reply);
         }
 
@@ -220,21 +219,15 @@ public class TestCommitWatcher {
         CompletableFuture<ContainerCommandResponseProto> future2 =
             futures.get(1);
         future1.get();
-        assertEquals(future1, watcher.getFutureMap().get((long) chunkSize));
-        // wait on 2nd putBlock to complete
         future2.get();
-        assertEquals(future2, watcher.getFutureMap().get((long) 2 * 
chunkSize));
         assertEquals(2, watcher.
             getCommitIndexMap().size());
         watcher.watchOnFirstIndex();
         
assertThat(watcher.getCommitIndexMap()).doesNotContainKey(replies.get(0).getLogIndex());
-        assertThat(watcher.getFutureMap()).doesNotContainKey((long) chunkSize);
         
assertThat(watcher.getTotalAckDataLength()).isGreaterThanOrEqualTo(chunkSize);
         watcher.watchOnLastIndex();
         
assertThat(watcher.getCommitIndexMap()).doesNotContainKey(replies.get(1).getLogIndex());
-        assertThat(watcher.getFutureMap()).doesNotContainKey((long) 2 * 
chunkSize);
         assertEquals(2 * chunkSize, watcher.getTotalAckDataLength());
-        assertThat(watcher.getFutureMap()).isEmpty();
         assertThat(watcher.getCommitIndexMap()).isEmpty();
       }
     } finally {
@@ -282,7 +275,6 @@ public class TestCommitWatcher {
                 return v;
               });
           futures.add(future);
-          watcher.putFlushFuture(length, future);
           replies.add(reply);
         }
 
@@ -293,14 +285,11 @@ public class TestCommitWatcher {
         CompletableFuture<ContainerCommandResponseProto> future2 =
             futures.get(1);
         future1.get();
-        assertEquals(future1, watcher.getFutureMap().get((long) chunkSize));
         // wait on 2nd putBlock to complete
         future2.get();
-        assertEquals(future2, watcher.getFutureMap().get((long) 2 * 
chunkSize));
         assertEquals(2, watcher.getCommitIndexMap().size());
         watcher.watchOnFirstIndex();
         
assertThat(watcher.getCommitIndexMap()).doesNotContainKey(replies.get(0).getLogIndex());
-        assertThat(watcher.getFutureMap()).doesNotContainKey((long) chunkSize);
         
assertThat(watcher.getTotalAckDataLength()).isGreaterThanOrEqualTo(chunkSize);
         cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
         cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
@@ -325,10 +314,8 @@ public class TestCommitWatcher {
             .getLogIndex()) {
           assertEquals(chunkSize, watcher.getTotalAckDataLength());
           assertEquals(1, watcher.getCommitIndexMap().size());
-          assertEquals(1, watcher.getFutureMap().size());
         } else {
           assertEquals(2 * chunkSize, watcher.getTotalAckDataLength());
-          assertThat(watcher.getFutureMap()).isEmpty();
           assertThat(watcher.getCommitIndexMap()).isEmpty();
         }
       }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index 67736033c5..2a6b2246b9 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -235,7 +235,6 @@ class TestBlockOutputStream {
       // in the buffer, with only one buffer being allocated in the buffer pool
 
       assertEquals(1, bufferPool.getSize());
-      assertEquals(0, bufferPool.getBuffer(0).position());
       assertEquals(totalWriteLength, blockOutputStream.getWrittenDataLength());
       assertEquals(totalWriteLength,
           blockOutputStream.getTotalDataFlushedLength());
@@ -587,20 +586,23 @@ class TestBlockOutputStream {
       byte[] data1 = RandomUtils.nextBytes(dataLength);
       key.write(data1);
 
-      // since its hitting the full bufferCondition, it will call 
watchForCommit
-      // and completes atleast putBlock for first flushSize worth of data
-      assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
-          .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
-      assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
-          .isLessThanOrEqualTo(pendingPutBlockCount + 1);
       KeyOutputStream keyOutputStream =
           assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-
-      assertEquals(1, keyOutputStream.getStreamEntries().size());
       RatisBlockOutputStream blockOutputStream =
           assertInstanceOf(RatisBlockOutputStream.class,
               keyOutputStream.getStreamEntries().get(0).getOutputStream());
+      BufferPool bufferPool = blockOutputStream.getBufferPool();
+      // since it's hitting the full bufferCondition, it will call 
watchForCommit
+      // however, the outputstream will not wait for watchForCommit, but the 
next call to
+      // write() will need to wait for at least one watchForCommit, indirectly 
when asking for new buffer allocation.
+      bufferPool.waitUntilAvailable();
 
+      assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
+          .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
+      assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
+          .isLessThanOrEqualTo(pendingPutBlockCount + 1);
+
+      assertEquals(1, keyOutputStream.getStreamEntries().size());
 
       assertEquals(4, blockOutputStream.getBufferPool().getSize());
       // writtenDataLength as well flushedDataLength will be updated here


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to