This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a7b903bc91 HDDS-2146. Optimize block write path performance by 
reducing no of watchForCommit calls. (#5272)
a7b903bc91 is described below

commit a7b903bc9196b19cbaea17c5f5b4aee49cf79e5f
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Mon Sep 18 14:29:45 2023 -0700

    HDDS-2146. Optimize block write path performance by reducing no of 
watchForCommit calls. (#5272)
---
 .../hdds/scm/storage/AbstractCommitWatcher.java    | 191 +++++++++++++++++
 .../hdds/scm/storage/BlockDataStreamOutput.java    |   6 +-
 .../hadoop/hdds/scm/storage/CommitWatcher.java     | 227 +++------------------
 .../hdds/scm/storage/RatisBlockOutputStream.java   |   2 +-
 .../hdds/scm/storage/StreamCommitWatcher.java      | 181 +---------------
 .../dev-support/findbugsExcludeFile.xml            |   2 +-
 .../scm/storage}/TestCommitWatcher.java            |  20 +-
 .../ozone/client/rpc/TestBlockOutputStream.java    |  18 +-
 .../rpc/TestBlockOutputStreamFlushDelay.java       |  18 +-
 .../rpc/TestBlockOutputStreamWithFailures.java     |  28 +--
 ...estBlockOutputStreamWithFailuresFlushDelay.java |  28 +--
 .../ozone/client/rpc/TestWatchForCommit.java       |   8 +-
 12 files changed, 302 insertions(+), 427 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
new file mode 100644
index 0000000000..0c5501c792
--- /dev/null
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.SortedMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This class executes watchForCommit on ratis pipeline and releases
+ * buffers once data successfully gets replicated.
+ */
+abstract class AbstractCommitWatcher<BUFFER> {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbstractCommitWatcher.class);
+
+  /**
+   * Commit index -> buffers: when a commit index is acknowledged,
+   * the corresponding buffers can be released.
+   */
+  private final SortedMap<Long, List<BUFFER>> commitIndexMap
+      = new ConcurrentSkipListMap<>();
+  /**
+   * Commit index -> reply future:
+   * cache to reply futures to avoid sending duplicated watch requests.
+   */
+  private final ConcurrentMap<Long, CompletableFuture<XceiverClientReply>>
+      replies = new ConcurrentHashMap<>();
+
+  private final XceiverClientSpi client;
+
+  private long totalAckDataLength;
+
+  AbstractCommitWatcher(XceiverClientSpi client) {
+    this.client = client;
+  }
+
+  @VisibleForTesting
+  SortedMap<Long, List<BUFFER>> getCommitIndexMap() {
+    return commitIndexMap;
+  }
+
+  void updateCommitInfoMap(long index, List<BUFFER> buffers) {
+    commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>())
+        .addAll(buffers);
+  }
+
+  /** @return the total data which has been acknowledged. */
+  long getTotalAckDataLength() {
+    return totalAckDataLength;
+  }
+
+  long addAckDataLength(long acked) {
+    totalAckDataLength += acked;
+    return totalAckDataLength;
+  }
+
+  /**
+   * Watch for commit for the first index.
+   * This is useful when the buffer is full
+   * since the first chunk can be released once it has been committed.
+   * Otherwise, the client write is blocked.
+   *
+   * @return {@link XceiverClientReply} reply from raft client
+   * @throws IOException in case watchForCommit fails
+   */
+  XceiverClientReply watchOnFirstIndex() throws IOException {
+    if (commitIndexMap.isEmpty()) {
+      return null;
+    }
+    return watchForCommit(commitIndexMap.firstKey());
+  }
+
+  /**
+   * Watch for commit for the last index.
+   * This is useful when the buffer is not full
+   * since it will wait for all the chunks in the buffer to get committed.
+   * Since the buffer is not full, the client write is not blocked.
+   *
+   * @return {@link XceiverClientReply} reply from raft client
+   * @throws IOException in case watchForCommit fails
+   */
+  XceiverClientReply watchOnLastIndex() throws IOException {
+    if (commitIndexMap.isEmpty()) {
+      return null;
+    }
+    return watchForCommit(commitIndexMap.lastKey());
+  }
+
+  /**
+   * Watch for commit for a particular index.
+   *
+   * @param commitIndex log index to watch for
+   * @return minimum commit index replicated to all nodes
+   * @throws IOException IOException in case watch gets timed out
+   */
+  XceiverClientReply watchForCommit(long commitIndex)
+      throws IOException {
+    final MemoizedSupplier<CompletableFuture<XceiverClientReply>> supplier
+        = JavaUtils.memoize(CompletableFuture::new);
+    final CompletableFuture<XceiverClientReply> f = 
replies.compute(commitIndex,
+        (key, value) -> value != null ? value : supplier.get());
+    if (!supplier.isInitialized()) {
+      // future already exists
+      return f.join();
+    }
+
+    try {
+      final XceiverClientReply reply = client.watchForCommit(commitIndex);
+      f.complete(reply);
+      final CompletableFuture<XceiverClientReply> removed
+          = replies.remove(commitIndex);
+      Preconditions.checkState(removed == f);
+
+      final long index = reply != null ? reply.getLogIndex() : 0;
+      adjustBuffers(index);
+      return reply;
+    } catch (InterruptedException e) {
+      // Re-interrupt the thread while catching InterruptedException
+      Thread.currentThread().interrupt();
+      throw getIOExceptionForWatchForCommit(commitIndex, e);
+    } catch (TimeoutException | ExecutionException e) {
+      throw getIOExceptionForWatchForCommit(commitIndex, e);
+    }
+  }
+
+  List<BUFFER> remove(long i) {
+    final List<BUFFER> buffers = commitIndexMap.remove(i);
+    Objects.requireNonNull(buffers, () -> "commitIndexMap.remove(" + i + ")");
+    return buffers;
+  }
+
+  /** Release the buffers for the given index. */
+  abstract void releaseBuffers(long index);
+
+  void adjustBuffers(long commitIndex) {
+    commitIndexMap.keySet().stream()
+        .filter(p -> p <= commitIndex)
+        .forEach(this::releaseBuffers);
+  }
+
+  void releaseBuffersOnException() {
+    adjustBuffers(client.getReplicatedMinCommitIndex());
+  }
+
+  IOException getIOExceptionForWatchForCommit(long commitIndex, Exception e) {
+    LOG.warn("watchForCommit failed for index {}", commitIndex, e);
+    IOException ioException = new IOException(
+        "Unexpected Storage Container Exception: " + e, e);
+    releaseBuffersOnException();
+    return ioException;
+  }
+
+  void cleanup() {
+    commitIndexMap.clear();
+    replies.clear();
+  }
+}
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index 67e5e3ca49..95f37eecb2 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -376,8 +376,8 @@ public class BlockDataStreamOutput implements 
ByteBufferStreamOutput {
     checkOpen();
     try {
       XceiverClientReply reply = bufferFull ?
-          commitWatcher.streamWatchOnFirstIndex() :
-          commitWatcher.streamWatchOnLastIndex();
+          commitWatcher.watchOnFirstIndex() :
+          commitWatcher.watchOnLastIndex();
       if (reply != null) {
         List<DatanodeDetails> dnList = reply.getDatanodes();
         if (!dnList.isEmpty()) {
@@ -454,7 +454,7 @@ public class BlockDataStreamOutput implements 
ByteBufferStreamOutput {
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Adding index " + asyncReply.getLogIndex() +
                     " commitMap size "
-                    + commitWatcher.getCommitInfoMapSize() + " flushLength "
+                    + commitWatcher.getCommitIndexMap().size() + " flushLength 
"
                     + flushPos + " blockID " + blockID);
               }
               // for standalone protocol, logIndex will always be 0.
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
index 195921f6fc..3c7f8a2360 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
@@ -24,226 +24,57 @@
  */
 package org.apache.hadoop.hdds.scm.storage;
 
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 /**
  * This class executes watchForCommit on ratis pipeline and releases
  * buffers once data successfully gets replicated.
  */
-public class CommitWatcher {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(CommitWatcher.class);
-
+class CommitWatcher extends AbstractCommitWatcher<ChunkBuffer> {
   // A reference to the pool of buffers holding the data
-  private BufferPool bufferPool;
-
-  // The map should maintain the keys (logIndexes) in order so that while
-  // removing we always end up updating incremented data flushed length.
-  // Also, corresponding to the logIndex, the corresponding list of buffers 
will
-  // be released from the buffer pool.
-  private Map<Long, List<ChunkBuffer>> commitIndex2flushedDataMap;
+  private final BufferPool bufferPool;
 
   // future Map to hold up all putBlock futures
-  private ConcurrentHashMap<Long,
-      CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
-      futureMap;
+  private final ConcurrentMap<Long, CompletableFuture<
+      ContainerCommandResponseProto>> futureMap = new ConcurrentHashMap<>();
 
-  private XceiverClientSpi xceiverClient;
-
-  // total data which has been successfully flushed and acknowledged
-  // by all servers
-  private long totalAckDataLength;
-
-  public CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient) {
+  CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient) {
+    super(xceiverClient);
     this.bufferPool = bufferPool;
-    this.xceiverClient = xceiverClient;
-    commitIndex2flushedDataMap = new ConcurrentSkipListMap<>();
-    totalAckDataLength = 0;
-    futureMap = new ConcurrentHashMap<>();
-  }
-
-  /**
-   * just update the totalAckDataLength. In case of failure,
-   * we will read the data starting from totalAckDataLength.
-   */
-  private long releaseBuffers(List<Long> indexes) {
-    Preconditions.checkArgument(!commitIndex2flushedDataMap.isEmpty());
-    for (long index : indexes) {
-      Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index));
-      final List<ChunkBuffer> buffers
-          = commitIndex2flushedDataMap.remove(index);
-      long length = buffers.stream().mapToLong(ChunkBuffer::position).sum();
-      totalAckDataLength += length;
-      // clear the future object from the future Map
-      final CompletableFuture<ContainerCommandResponseProto> remove =
-          futureMap.remove(totalAckDataLength);
-      if (remove == null) {
-        LOG.error("Couldn't find required future for " + totalAckDataLength);
-        for (Long key : futureMap.keySet()) {
-          LOG.error("Existing acknowledged data: " + key);
-        }
-      }
-      Preconditions.checkNotNull(remove);
-      for (ChunkBuffer byteBuffer : buffers) {
-        bufferPool.releaseBuffer(byteBuffer);
-      }
-    }
-    return totalAckDataLength;
-  }
-
-  public void updateCommitInfoMap(long index, List<ChunkBuffer> buffers) {
-    commitIndex2flushedDataMap.computeIfAbsent(index, k -> new LinkedList<>())
-        .addAll(buffers);
-  }
-
-  int getCommitInfoMapSize() {
-    return commitIndex2flushedDataMap.size();
   }
 
-  /**
-   * Calls watch for commit for the first index in commitIndex2flushedDataMap 
to
-   * the Ratis client.
-   * @return reply reply from raft client
-   * @throws IOException in case watchForCommit fails
-   */
-  public XceiverClientReply watchOnFirstIndex() throws IOException {
-    if (!commitIndex2flushedDataMap.isEmpty()) {
-      // wait for the  first commit index in the commitIndex2flushedDataMap
-      // to get committed to all or majority of nodes in case timeout
-      // happens.
-      long index =
-          commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v).min()
-              .getAsLong();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("waiting for first index {} to catch up", index);
-      }
-      return watchForCommit(index);
-    } else {
-      return null;
+  @Override
+  void releaseBuffers(long index) {
+    long acked = 0;
+    for (ChunkBuffer buffer : remove(index)) {
+      acked += buffer.position();
+      bufferPool.releaseBuffer(buffer);
     }
+    final long totalLength = addAckDataLength(acked);
+    // When putBlock is called, a future is added.
+    // When putBlock is replied, the future is removed below.
+    // Therefore, the removed future should not be null.
+    final CompletableFuture<ContainerCommandResponseProto> removed =
+        futureMap.remove(totalLength);
+    Objects.requireNonNull(removed, () -> "Future not found for "
+        + totalLength + ": existing = " + futureMap.keySet());
   }
 
-  /**
-   * Calls watch for commit for the first index in commitIndex2flushedDataMap 
to
-   * the Ratis client.
-   * @return reply reply from raft client
-   * @throws IOException in case watchForCommit fails
-   */
-  public XceiverClientReply watchOnLastIndex()
-      throws IOException {
-    if (!commitIndex2flushedDataMap.isEmpty()) {
-      // wait for the  commit index in the commitIndex2flushedDataMap
-      // to get committed to all or majority of nodes in case timeout
-      // happens.
-      long index =
-          commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v).max()
-              .getAsLong();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("waiting for last flush Index {} to catch up", index);
-      }
-      return watchForCommit(index);
-    } else {
-      return null;
-    }
-  }
-
-  private void adjustBuffers(long commitIndex) {
-    List<Long> keyList = commitIndex2flushedDataMap.keySet().stream()
-        .filter(p -> p <= commitIndex).collect(Collectors.toList());
-    if (!keyList.isEmpty()) {
-      releaseBuffers(keyList);
-    }
-  }
-
-  // It may happen that once the exception is encountered , we still might
-  // have successfully flushed up to a certain index. Make sure the buffers
-  // only contain data which have not been sufficiently replicated
-  void releaseBuffersOnException() {
-    adjustBuffers(xceiverClient.getReplicatedMinCommitIndex());
-  }
-
-  /**
-   * calls watchForCommit API of the Ratis Client. For Standalone client,
-   * it is a no op.
-   * @param commitIndex log index to watch for
-   * @return minimum commit index replicated to all nodes
-   * @throws IOException IOException in case watch gets timed out
-   */
-  public XceiverClientReply watchForCommit(long commitIndex)
-      throws IOException {
-    long index;
-    try {
-      XceiverClientReply reply =
-          xceiverClient.watchForCommit(commitIndex);
-      if (reply == null) {
-        index = 0;
-      } else {
-        index = reply.getLogIndex();
-      }
-      adjustBuffers(index);
-      return reply;
-    } catch (InterruptedException e) {
-      // Re-interrupt the thread while catching InterruptedException
-      Thread.currentThread().interrupt();
-      throw getIOExceptionForWatchForCommit(commitIndex, e);
-    } catch (TimeoutException | ExecutionException e) {
-      throw getIOExceptionForWatchForCommit(commitIndex, e);
-    }
-  }
-
-  private IOException getIOExceptionForWatchForCommit(long commitIndex,
-                                                       Exception e) {
-    LOG.warn("watchForCommit failed for index {}", commitIndex, e);
-    IOException ioException = new IOException(
-        "Unexpected Storage Container Exception: " + e.toString(), e);
-    releaseBuffersOnException();
-    return ioException;
-  }
-
-  @VisibleForTesting
-  public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
-    return commitIndex2flushedDataMap;
-  }
-
-  public ConcurrentMap<Long,
-        CompletableFuture<ContainerProtos.
-            ContainerCommandResponseProto>> getFutureMap() {
+  ConcurrentMap<Long, CompletableFuture<
+      ContainerCommandResponseProto>> getFutureMap() {
     return futureMap;
   }
 
-  public long getTotalAckDataLength() {
-    return totalAckDataLength;
-  }
-
+  @Override
   public void cleanup() {
-    if (commitIndex2flushedDataMap != null) {
-      commitIndex2flushedDataMap.clear();
-    }
-    if (futureMap != null) {
-      futureMap.clear();
-    }
-    commitIndex2flushedDataMap = null;
+    super.cleanup();
+    futureMap.clear();
   }
 }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
index 92edf2e2c7..ede7057496 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/RatisBlockOutputStream.java
@@ -92,7 +92,7 @@ public class RatisBlockOutputStream extends BlockOutputStream
 
   @VisibleForTesting
   public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
-    return commitWatcher.getCommitIndex2flushedDataMap();
+    return commitWatcher.getCommitIndexMap();
   }
 
   @Override
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
index 8ca70de816..195628ae58 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/StreamCommitWatcher.java
@@ -18,191 +18,30 @@
 
 package org.apache.hadoop.hdds.scm.storage;
 
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.MemoizedSupplier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
 
 /**
  * This class executes watchForCommit on ratis pipeline and releases
  * buffers once data successfully gets replicated.
  */
-public class StreamCommitWatcher {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(StreamCommitWatcher.class);
-
-  private Map<Long, List<StreamBuffer>> commitIndexMap;
+class StreamCommitWatcher extends AbstractCommitWatcher<StreamBuffer> {
   private final List<StreamBuffer> bufferList;
 
-  // total data which has been successfully flushed and acknowledged
-  // by all servers
-  private long totalAckDataLength;
-  private final ConcurrentMap<Long, CompletableFuture<XceiverClientReply>>
-      replies = new ConcurrentHashMap<>();
-
-  private final XceiverClientSpi xceiverClient;
-
-  public StreamCommitWatcher(XceiverClientSpi xceiverClient,
+  StreamCommitWatcher(XceiverClientSpi xceiverClient,
       List<StreamBuffer> bufferList) {
-    this.xceiverClient = xceiverClient;
-    commitIndexMap = new ConcurrentSkipListMap<>();
+    super(xceiverClient);
     this.bufferList = bufferList;
-    totalAckDataLength = 0;
-  }
-
-  public void updateCommitInfoMap(long index, List<StreamBuffer> buffers) {
-    commitIndexMap.computeIfAbsent(index, k -> new LinkedList<>())
-        .addAll(buffers);
-  }
-
-  int getCommitInfoMapSize() {
-    return commitIndexMap.size();
-  }
-
-  /**
-   * Calls watch for commit for the first index in commitIndex2flushedDataMap 
to
-   * the Ratis client.
-   * @return {@link XceiverClientReply} reply from raft client
-   * @throws IOException in case watchForCommit fails
-   */
-  public XceiverClientReply streamWatchOnFirstIndex() throws IOException {
-    if (!commitIndexMap.isEmpty()) {
-      // wait for the  first commit index in the commitIndex2flushedDataMap
-      // to get committed to all or majority of nodes in case timeout
-      // happens.
-      long index =
-          commitIndexMap.keySet().stream().mapToLong(v -> v).min()
-              .getAsLong();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("waiting for first index {} to catch up", index);
-      }
-      return streamWatchForCommit(index);
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * Calls watch for commit for the last index in commitIndex2flushedDataMap to
-   * the Ratis client.
-   * @return {@link XceiverClientReply} reply from raft client
-   * @throws IOException in case watchForCommit fails
-   */
-  public XceiverClientReply streamWatchOnLastIndex()
-      throws IOException {
-    if (!commitIndexMap.isEmpty()) {
-      // wait for the  commit index in the commitIndex2flushedDataMap
-      // to get committed to all or majority of nodes in case timeout
-      // happens.
-      long index =
-          commitIndexMap.keySet().stream().mapToLong(v -> v).max()
-              .getAsLong();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("waiting for last flush Index {} to catch up", index);
-      }
-      return streamWatchForCommit(index);
-    } else {
-      return null;
-    }
-  }
-
-  /**
-   * calls watchForCommit API of the Ratis Client. This method is for streaming
-   * and no longer requires releaseBuffers
-   * @param commitIndex log index to watch for
-   * @return minimum commit index replicated to all nodes
-   * @throws IOException IOException in case watch gets timed out
-   */
-  public XceiverClientReply streamWatchForCommit(long commitIndex)
-      throws IOException {
-    final MemoizedSupplier<CompletableFuture<XceiverClientReply>> supplier
-        = JavaUtils.memoize(CompletableFuture::new);
-    final CompletableFuture<XceiverClientReply> f = 
replies.compute(commitIndex,
-        (key, value) -> value != null ? value : supplier.get());
-    if (!supplier.isInitialized()) {
-      // future already exists
-      return f.join();
-    }
-
-    try {
-      XceiverClientReply reply =
-          xceiverClient.watchForCommit(commitIndex);
-      f.complete(reply);
-      final CompletableFuture<XceiverClientReply> removed
-          = replies.remove(commitIndex);
-      Preconditions.checkState(removed == f);
-
-      adjustBuffers(reply.getLogIndex());
-      return reply;
-    } catch (InterruptedException e) {
-      // Re-interrupt the thread while catching InterruptedException
-      Thread.currentThread().interrupt();
-      throw getIOExceptionForWatchForCommit(commitIndex, e);
-    } catch (TimeoutException | ExecutionException e) {
-      throw getIOExceptionForWatchForCommit(commitIndex, e);
-    }
-  }
-
-  void releaseBuffersOnException() {
-    adjustBuffers(xceiverClient.getReplicatedMinCommitIndex());
-  }
-
-  private void adjustBuffers(long commitIndex) {
-    List<Long> keyList = commitIndexMap.keySet().stream()
-        .filter(p -> p <= commitIndex).collect(Collectors.toList());
-    if (!keyList.isEmpty()) {
-      releaseBuffers(keyList);
-    }
-  }
-
-  private long releaseBuffers(List<Long> indexes) {
-    Preconditions.checkArgument(!commitIndexMap.isEmpty());
-    for (long index : indexes) {
-      Preconditions.checkState(commitIndexMap.containsKey(index));
-      final List<StreamBuffer> buffers = commitIndexMap.remove(index);
-      final long length =
-          buffers.stream().mapToLong(StreamBuffer::position).sum();
-      totalAckDataLength += length;
-      for (StreamBuffer byteBuffer : buffers) {
-        bufferList.remove(byteBuffer);
-      }
-    }
-    return totalAckDataLength;
-  }
-
-  public long getTotalAckDataLength() {
-    return totalAckDataLength;
-  }
-
-  private IOException getIOExceptionForWatchForCommit(long commitIndex,
-                                                       Exception e) {
-    LOG.warn("watchForCommit failed for index {}", commitIndex, e);
-    IOException ioException = new IOException(
-        "Unexpected Storage Container Exception: " + e.toString(), e);
-    releaseBuffersOnException();
-    return ioException;
   }
 
-  public void cleanup() {
-    if (commitIndexMap != null) {
-      commitIndexMap.clear();
+  @Override
+  void releaseBuffers(long index) {
+    long acked = 0;
+    for (StreamBuffer buffer : remove(index)) {
+      acked += buffer.position();
+      bufferList.remove(buffer);
     }
-    commitIndexMap = null;
+    addAckDataLength(acked);
   }
 }
diff --git a/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml 
b/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml
index c61a14559b..9436840c9c 100644
--- a/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-ozone/integration-test/dev-support/findbugsExcludeFile.xml
@@ -77,7 +77,7 @@
     <Bug pattern="RV_RETURN_VALUE_IGNORED" />
   </Match>
   <Match>
-    <Class name="org.apache.hadoop.ozone.client.rpc.TestCommitWatcher"/>
+    <Class name="org.apache.hadoop.hdds.scm.storage.TestCommitWatcher"/>
     <Bug pattern="URF_UNREAD_FIELD" />
   </Match>
   <Match>
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
similarity index 95%
rename from 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
rename to 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
index ade3d9d64a..5caf23936a 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestCommitWatcher.java
@@ -15,7 +15,7 @@
  * the License.
  */
 
-package org.apache.hadoop.ozone.client.rpc;
+package org.apache.hadoop.hdds.scm.storage;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -44,8 +44,6 @@ import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.storage.BufferPool;
-import org.apache.hadoop.hdds.scm.storage.CommitWatcher;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -230,19 +228,19 @@ public class TestCommitWatcher {
         future2.get();
         assertEquals(future2, watcher.getFutureMap().get((long) 2 * 
chunkSize));
         assertEquals(2, watcher.
-            getCommitIndex2flushedDataMap().size());
+            getCommitIndexMap().size());
         watcher.watchOnFirstIndex();
-        assertFalse(watcher.getCommitIndex2flushedDataMap()
+        assertFalse(watcher.getCommitIndexMap()
             .containsKey(replies.get(0).getLogIndex()));
         assertFalse(watcher.getFutureMap().containsKey((long) chunkSize));
         assertTrue(watcher.getTotalAckDataLength() >= chunkSize);
         watcher.watchOnLastIndex();
-        assertFalse(watcher.getCommitIndex2flushedDataMap()
+        assertFalse(watcher.getCommitIndexMap()
             .containsKey(replies.get(1).getLogIndex()));
         assertFalse(watcher.getFutureMap().containsKey((long) 2 * chunkSize));
         assertEquals(2 * chunkSize, watcher.getTotalAckDataLength());
         assertTrue(watcher.getFutureMap().isEmpty());
-        assertTrue(watcher.getCommitIndex2flushedDataMap().isEmpty());
+        assertTrue(watcher.getCommitIndexMap().isEmpty());
       }
     }
   }
@@ -303,9 +301,9 @@ public class TestCommitWatcher {
         // wait on 2nd putBlock to complete
         future2.get();
         assertEquals(future2, watcher.getFutureMap().get((long) 2 * 
chunkSize));
-        assertEquals(2, watcher.getCommitIndex2flushedDataMap().size());
+        assertEquals(2, watcher.getCommitIndexMap().size());
         watcher.watchOnFirstIndex();
-        assertFalse(watcher.getCommitIndex2flushedDataMap()
+        assertFalse(watcher.getCommitIndexMap()
             .containsKey(replies.get(0).getLogIndex()));
         assertFalse(watcher.getFutureMap().containsKey((long) chunkSize));
         assertTrue(watcher.getTotalAckDataLength() >= chunkSize);
@@ -333,12 +331,12 @@ public class TestCommitWatcher {
         if (ratisClient.getReplicatedMinCommitIndex() < replies.get(1)
             .getLogIndex()) {
           assertEquals(chunkSize, watcher.getTotalAckDataLength());
-          assertEquals(1, watcher.getCommitIndex2flushedDataMap().size());
+          assertEquals(1, watcher.getCommitIndexMap().size());
           assertEquals(1, watcher.getFutureMap().size());
         } else {
           assertEquals(2 * chunkSize, watcher.getTotalAckDataLength());
           assertTrue(watcher.getFutureMap().isEmpty());
-          assertTrue(watcher.getCommitIndex2flushedDataMap().isEmpty());
+          assertTrue(watcher.getCommitIndexMap().isEmpty());
         }
       }
     }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index 573fe8614b..1b62939a54 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -224,7 +224,8 @@ public class TestBlockOutputStream {
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
@@ -309,7 +310,8 @@ public class TestBlockOutputStream {
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(pendingWriteChunkCount, metrics
         .getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount, metrics
@@ -405,7 +407,8 @@ public class TestBlockOutputStream {
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(pendingWriteChunkCount, metrics
         .getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount, metrics
@@ -492,7 +495,8 @@ public class TestBlockOutputStream {
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
@@ -593,7 +597,8 @@ public class TestBlockOutputStream {
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
@@ -697,7 +702,8 @@ public class TestBlockOutputStream {
     Assert.assertEquals(totalOpCount + 9,
         metrics.getTotalOpCount());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
index d858b65a92..ef50f867b4 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
@@ -222,7 +222,8 @@ public class TestBlockOutputStreamFlushDelay {
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(totalWriteDataLength,
         blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
@@ -307,7 +308,8 @@ public class TestBlockOutputStreamFlushDelay {
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(pendingWriteChunkCount, metrics
         .getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount, metrics
@@ -403,7 +405,8 @@ public class TestBlockOutputStreamFlushDelay {
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(pendingWriteChunkCount, metrics
         .getPendingContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
     Assert.assertEquals(pendingPutBlockCount, metrics
@@ -490,7 +493,8 @@ public class TestBlockOutputStreamFlushDelay {
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
@@ -591,7 +595,8 @@ public class TestBlockOutputStreamFlushDelay {
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
@@ -695,7 +700,8 @@ public class TestBlockOutputStreamFlushDelay {
     Assert.assertEquals(totalOpCount + 8,
         metrics.getTotalOpCount());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     validateData(keyName, data1);
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 25fd2aedfa..3aef0a0dbd 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -247,7 +247,8 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
@@ -333,7 +334,8 @@ public class TestBlockOutputStreamWithFailures {
     // make sure the bufferPool is empty
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
@@ -432,15 +434,12 @@ public class TestBlockOutputStreamWithFailures {
     // now close the stream, It will update the ack length after watchForCommit
 
     key.close();
-    Assert
-        .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     // make sure the bufferPool is empty
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
     validateData(keyName, data1);
   }
@@ -501,7 +500,8 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertTrue(keyOutputStream.getLocationInfoList().size() == 0);
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
@@ -571,7 +571,8 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
@@ -661,7 +662,8 @@ public class TestBlockOutputStreamWithFailures {
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
@@ -752,7 +754,8 @@ public class TestBlockOutputStreamWithFailures {
     // make sure the bufferPool is empty
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
     // Written the same data twice
@@ -846,7 +849,8 @@ public class TestBlockOutputStreamWithFailures {
     // make sure the bufferPool is empty
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
index 9f1df6207e..937b170ee2 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
@@ -247,7 +247,8 @@ public class TestBlockOutputStreamWithFailuresFlushDelay {
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
@@ -335,7 +336,8 @@ public class TestBlockOutputStreamWithFailuresFlushDelay {
     // make sure the bufferPool is empty
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
@@ -434,15 +436,12 @@ public class TestBlockOutputStreamWithFailuresFlushDelay {
     // now close the stream, It will update the ack length after watchForCommit
 
     key.close();
-    Assert
-        .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     // make sure the bufferPool is empty
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
     Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
     validateData(keyName, data1);
   }
@@ -503,7 +502,8 @@ public class TestBlockOutputStreamWithFailuresFlushDelay {
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertTrue(keyOutputStream.getLocationInfoList().size() == 0);
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
@@ -573,7 +573,8 @@ public class TestBlockOutputStreamWithFailuresFlushDelay {
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertTrue(keyOutputStream.getStreamEntries().size() == 0);
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
@@ -663,7 +664,8 @@ public class TestBlockOutputStreamWithFailuresFlushDelay {
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
     Assert.assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
@@ -755,7 +757,8 @@ public class TestBlockOutputStreamWithFailuresFlushDelay {
     // make sure the bufferPool is empty
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(0, keyOutputStream.getStreamEntries().size());
     Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
     // Written the same data twice
@@ -849,7 +852,8 @@ public class TestBlockOutputStreamWithFailuresFlushDelay {
     // make sure the bufferPool is empty
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     Assert.assertEquals(0, keyOutputStream.getLocationInfoList().size());
     // Written the same data twice
     String dataString = new String(data1, UTF_8);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
index 070a4c47d8..b52b10ed53 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
@@ -236,17 +236,13 @@ public class TestWatchForCommit {
     Assert.assertTrue(keyOutputStream.getRetryCount() == 0);
     // now close the stream, It will update the ack length after watchForCommit
     key.close();
-    Assert
-        .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
-    Assert
-        .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
     Assert
         .assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
     // make sure the bufferPool is empty
     Assert
         .assertEquals(0, 
blockOutputStream.getBufferPool().computeBufferData());
-    Assert.assertNull(blockOutputStream.getCommitIndex2flushedDataMap());
+    Assert.assertTrue(
+        blockOutputStream.getCommitIndex2flushedDataMap().isEmpty());
     validateData(keyName, data1);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to