This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new ccbfed5f82b HDDS-14026. Close OutputStream properly in 
TestBlockOutputStream (#9392)
ccbfed5f82b is described below

commit ccbfed5f82bf4eaf5a7635ac491760b795ae442d
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Mon Dec 1 12:36:42 2025 +0100

    HDDS-14026. Close OutputStream properly in TestBlockOutputStream (#9392)
---
 .../ozone/client/rpc/TestBlockOutputStream.java    | 640 ++++++++--------
 .../rpc/TestBlockOutputStreamWithFailures.java     | 850 ++++++++++-----------
 2 files changed, 750 insertions(+), 740 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index 579b6482201..65c91bb8a5d 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -195,64 +195,67 @@ void testWriteLessThanChunkSize(boolean flushDelay, 
boolean enablePiggybacking)
           metrics.getPendingContainerOpCountMetrics(PutBlock);
       long totalOpCount = metrics.getTotalOpCount();
       String keyName = getKeyName();
-      OzoneOutputStream key = createKey(client, keyName);
       int dataLength = 50;
       final int totalWriteLength = dataLength * 2;
       byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
-      key.write(data1);
-      KeyOutputStream keyOutputStream =
-          assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-
-      assertEquals(1, keyOutputStream.getStreamEntries().size());
-      RatisBlockOutputStream blockOutputStream =
-          assertInstanceOf(RatisBlockOutputStream.class,
-              keyOutputStream.getStreamEntries().get(0).getOutputStream());
-
-      // we have written data less than a chunk size, the data will just sit
-      // in the buffer, with only one buffer being allocated in the buffer pool
-
-      BufferPool bufferPool = blockOutputStream.getBufferPool();
-      assertEquals(1, bufferPool.getSize());
-      //Just the writtenDataLength will be updated here
-      assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
-      // no data will be flushed till now
-      assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
-      assertEquals(0, blockOutputStream.getTotalAckDataLength());
-      assertEquals(pendingWriteChunkCount,
-          metrics.getPendingContainerOpCountMetrics(WriteChunk));
-      assertEquals(pendingPutBlockCount,
-          metrics.getPendingContainerOpCountMetrics(PutBlock));
+      KeyOutputStream keyOutputStream;
+      RatisBlockOutputStream blockOutputStream;
+      BufferPool bufferPool;
+      try (OzoneOutputStream key = createKey(client, keyName)) {
+        key.write(data1);
+        keyOutputStream =
+            assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
 
-      // commitIndex2FlushedData Map will be empty here
-      assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
-      // Total write data greater than or equal one chunk
-      // size to make sure flush will sync data.
-      key.write(data1);
-      // This will flush the data and update the flush length and the map.
-      key.flush();
+        assertEquals(1, keyOutputStream.getStreamEntries().size());
+        blockOutputStream =
+            assertInstanceOf(RatisBlockOutputStream.class,
+                keyOutputStream.getStreamEntries().get(0).getOutputStream());
 
-      // flush is a sync call, all pending operations will complete
-      assertEquals(pendingWriteChunkCount,
-          metrics.getPendingContainerOpCountMetrics(WriteChunk));
-      assertEquals(pendingPutBlockCount,
-          metrics.getPendingContainerOpCountMetrics(PutBlock));
-      // we have written data less than a chunk size, the data will just sit
-      // in the buffer, with only one buffer being allocated in the buffer pool
+        // we have written data less than a chunk size, the data will just sit
+        // in the buffer, with only one buffer being allocated in the buffer 
pool
 
-      assertEquals(1, bufferPool.getSize());
-      assertEquals(totalWriteLength, blockOutputStream.getWrittenDataLength());
-      assertEquals(totalWriteLength,
-          blockOutputStream.getTotalDataFlushedLength());
-      assertEquals(0,
-          blockOutputStream.getCommitIndex2flushedDataMap().size());
+        bufferPool = blockOutputStream.getBufferPool();
+        assertEquals(1, bufferPool.getSize());
+        //Just the writtenDataLength will be updated here
+        assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
-      // flush ensures watchForCommit updates the total length acknowledged
-      assertEquals(totalWriteLength, 
blockOutputStream.getTotalAckDataLength());
+        // no data will be flushed till now
+        assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
+        assertEquals(0, blockOutputStream.getTotalAckDataLength());
+        assertEquals(pendingWriteChunkCount,
+            metrics.getPendingContainerOpCountMetrics(WriteChunk));
+        assertEquals(pendingPutBlockCount,
+            metrics.getPendingContainerOpCountMetrics(PutBlock));
 
-      assertEquals(1, keyOutputStream.getStreamEntries().size());
-      // now close the stream, It will update ack length after watchForCommit
-      key.close();
+        // commitIndex2FlushedData Map will be empty here
+        assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
+        // Total write data greater than or equal one chunk
+        // size to make sure flush will sync data.
+        key.write(data1);
+        // This will flush the data and update the flush length and the map.
+        key.flush();
+
+        // flush is a sync call, all pending operations will complete
+        assertEquals(pendingWriteChunkCount,
+            metrics.getPendingContainerOpCountMetrics(WriteChunk));
+        assertEquals(pendingPutBlockCount,
+            metrics.getPendingContainerOpCountMetrics(PutBlock));
+        // we have written data less than a chunk size, the data will just sit
+        // in the buffer, with only one buffer being allocated in the buffer 
pool
+
+        assertEquals(1, bufferPool.getSize());
+        assertEquals(totalWriteLength, 
blockOutputStream.getWrittenDataLength());
+        assertEquals(totalWriteLength,
+            blockOutputStream.getTotalDataFlushedLength());
+        assertEquals(0,
+            blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+        // flush ensures watchForCommit updates the total length acknowledged
+        assertEquals(totalWriteLength, 
blockOutputStream.getTotalAckDataLength());
+
+        assertEquals(1, keyOutputStream.getStreamEntries().size());
+        // now close the stream, It will update ack length after watchForCommit
+      }
 
       assertEquals(pendingWriteChunkCount,
           metrics.getPendingContainerOpCountMetrics(WriteChunk));
@@ -293,86 +296,88 @@ void testWriteExactlyFlushSize(boolean flushDelay, 
boolean enablePiggybacking) t
       final long totalOpCount = metrics.getTotalOpCount();
 
       String keyName = getKeyName();
-      OzoneOutputStream key = createKey(client, keyName);
       // write data equal to 2 chunks
       int dataLength = FLUSH_SIZE;
       byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
-      key.write(data1);
-
-      assertEquals(writeChunkCount + 2,
-          metrics.getContainerOpCountMetrics(WriteChunk));
-      assertEquals(putBlockCount + 1,
-          metrics.getContainerOpCountMetrics(PutBlock));
-      // The WriteChunk and PutBlock can be completed soon.
-      assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
-          .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
-      assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
-          .isLessThanOrEqualTo(pendingPutBlockCount + 1);
-
-      KeyOutputStream keyOutputStream =
-          assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-      assertEquals(1, keyOutputStream.getStreamEntries().size());
-      RatisBlockOutputStream blockOutputStream =
-          assertInstanceOf(RatisBlockOutputStream.class,
-              keyOutputStream.getStreamEntries().get(0).getOutputStream());
-
-      // we have just written data equal flush Size = 2 chunks, at this time
-      // buffer pool will have 2 buffers allocated worth of chunk size
-
-      assertEquals(2, blockOutputStream.getBufferPool().getSize());
-      // writtenDataLength as well flushedDataLength will be updated here
-      assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
-      assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
-      assertEquals(0, blockOutputStream.getTotalAckDataLength());
-
-      // Before flush, if there was no pending PutBlock which means it is 
complete.
-      // It put a commit index into commitIndexMap.
-      assertEquals((metrics.getPendingContainerOpCountMetrics(PutBlock) == 
pendingPutBlockCount) ? 1 : 0,
-          blockOutputStream.getCommitIndex2flushedDataMap().size());
-
-      // Now do a flush.
-      key.flush();
-
-      assertEquals(1, keyOutputStream.getStreamEntries().size());
-      // The previously written data is equal to flushSize, so no action is
-      // triggered when execute flush, if flushDelay is enabled.
-      // If flushDelay is disabled, it will call waitOnFlushFutures to wait all
-      // putBlocks finished. It was broken because WriteChunk and PutBlock
-      // can be complete regardless of whether the flush executed or not.
-      if (flushDelay) {
+      KeyOutputStream keyOutputStream;
+      RatisBlockOutputStream blockOutputStream;
+      try (OzoneOutputStream key = createKey(client, keyName)) {
+        key.write(data1);
+
+        assertEquals(writeChunkCount + 2,
+            metrics.getContainerOpCountMetrics(WriteChunk));
+        assertEquals(putBlockCount + 1,
+            metrics.getContainerOpCountMetrics(PutBlock));
+        // The WriteChunk and PutBlock can be completed soon.
         assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
             .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
         assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
-            .isLessThanOrEqualTo(pendingWriteChunkCount + 1);
-      } else {
-        assertEquals(pendingWriteChunkCount,
-            metrics.getPendingContainerOpCountMetrics(WriteChunk));
-        assertEquals(pendingPutBlockCount,
-            metrics.getPendingContainerOpCountMetrics(PutBlock));
-      }
+            .isLessThanOrEqualTo(pendingPutBlockCount + 1);
+
+        keyOutputStream =
+            assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+        assertEquals(1, keyOutputStream.getStreamEntries().size());
+        blockOutputStream =
+            assertInstanceOf(RatisBlockOutputStream.class,
+                keyOutputStream.getStreamEntries().get(0).getOutputStream());
+
+        // we have just written data equal flush Size = 2 chunks, at this time
+        // buffer pool will have 2 buffers allocated worth of chunk size
+
+        assertEquals(2, blockOutputStream.getBufferPool().getSize());
+        // writtenDataLength as well flushedDataLength will be updated here
+        assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+        assertEquals(dataLength, 
blockOutputStream.getTotalDataFlushedLength());
+        assertEquals(0, blockOutputStream.getTotalAckDataLength());
+
+        // Before flush, if there was no pending PutBlock which means it is 
complete.
+        // It put a commit index into commitIndexMap.
+        assertEquals((metrics.getPendingContainerOpCountMetrics(PutBlock) == 
pendingPutBlockCount) ? 1 : 0,
+            blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+        // Now do a flush.
+        key.flush();
+
+        assertEquals(1, keyOutputStream.getStreamEntries().size());
+        // The previously written data is equal to flushSize, so no action is
+        // triggered when execute flush, if flushDelay is enabled.
+        // If flushDelay is disabled, it will call waitOnFlushFutures to wait 
all
+        // putBlocks finished. It was broken because WriteChunk and PutBlock
+        // can be complete regardless of whether the flush executed or not.
+        if (flushDelay) {
+          assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
+              .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
+          assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
+              .isLessThanOrEqualTo(pendingWriteChunkCount + 1);
+        } else {
+          assertEquals(pendingWriteChunkCount,
+              metrics.getPendingContainerOpCountMetrics(WriteChunk));
+          assertEquals(pendingPutBlockCount,
+              metrics.getPendingContainerOpCountMetrics(PutBlock));
+        }
+
+        // Since the data in the buffer is already flushed, flush here will 
have
+        // no impact on the counters and data structures
+        assertEquals(2, blockOutputStream.getBufferPool().getSize());
+
+        // No action is triggered when execute flush, BlockOutputStream will 
not
+        // be updated.
+        assertEquals(flushDelay ? dataLength : 0,
+            blockOutputStream.getBufferPool().computeBufferData());
+        assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+        assertEquals(dataLength, 
blockOutputStream.getTotalDataFlushedLength());
+        // If the flushDelay feature is enabled, nothing happens.
+        // The assertions will be as same as those before flush.
+        // If it flushed, the Commit index will be removed.
+        assertEquals((flushDelay &&
+                (metrics.getPendingContainerOpCountMetrics(PutBlock) == 
pendingPutBlockCount)) ? 1 : 0,
+            blockOutputStream.getCommitIndex2flushedDataMap().size());
+        assertEquals(flushDelay ? 0 : dataLength,
+            blockOutputStream.getTotalAckDataLength());
 
-      // Since the data in the buffer is already flushed, flush here will have
-      // no impact on the counters and data structures
-      assertEquals(2, blockOutputStream.getBufferPool().getSize());
-
-      // No action is triggered when execute flush, BlockOutputStream will not
-      // be updated.
-      assertEquals(flushDelay ? dataLength : 0,
-          blockOutputStream.getBufferPool().computeBufferData());
-      assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-      assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
-      // If the flushDelay feature is enabled, nothing happens.
-      // The assertions will be as same as those before flush.
-      // If it flushed, the Commit index will be removed.
-      assertEquals((flushDelay &&
-              (metrics.getPendingContainerOpCountMetrics(PutBlock) == 
pendingPutBlockCount)) ? 1 : 0,
-          blockOutputStream.getCommitIndex2flushedDataMap().size());
-      assertEquals(flushDelay ? 0 : dataLength,
-          blockOutputStream.getTotalAckDataLength());
-
-      // now close the stream, It will update ack length after watchForCommit
-      key.close();
+        // now close the stream, It will update ack length after watchForCommit
+      }
 
       assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
       // make sure the bufferPool is empty
@@ -411,57 +416,60 @@ void testWriteMoreThanChunkSize(boolean flushDelay, 
boolean enablePiggybacking)
           PutBlock);
       long totalOpCount = metrics.getTotalOpCount();
       String keyName = getKeyName();
-      OzoneOutputStream key = createKey(client, keyName);
       // write data more than 1 chunk
       int dataLength = CHUNK_SIZE + 50;
       byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
-      key.write(data1);
-      assertEquals(totalOpCount + 1, metrics.getTotalOpCount());
-      KeyOutputStream keyOutputStream =
-          assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+      KeyOutputStream keyOutputStream;
+      RatisBlockOutputStream blockOutputStream;
+      BufferPool bufferPool;
+      try (OzoneOutputStream key = createKey(client, keyName)) {
+        key.write(data1);
+        assertEquals(totalOpCount + 1, metrics.getTotalOpCount());
+        keyOutputStream =
+            assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
 
-      assertEquals(1, keyOutputStream.getStreamEntries().size());
-      RatisBlockOutputStream blockOutputStream =
-          assertInstanceOf(RatisBlockOutputStream.class,
-              keyOutputStream.getStreamEntries().get(0).getOutputStream());
+        assertEquals(1, keyOutputStream.getStreamEntries().size());
+        blockOutputStream =
+            assertInstanceOf(RatisBlockOutputStream.class,
+                keyOutputStream.getStreamEntries().get(0).getOutputStream());
 
-      // we have just written data equal flush Size > 1 chunk, at this time
-      // buffer pool will have 2 buffers allocated worth of chunk size
+        // we have just written data equal flush Size > 1 chunk, at this time
+        // buffer pool will have 2 buffers allocated worth of chunk size
 
-      BufferPool bufferPool = blockOutputStream.getBufferPool();
-      assertEquals(2, bufferPool.getSize());
-      // writtenDataLength as well flushedDataLength will be updated here
-      assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+        bufferPool = blockOutputStream.getBufferPool();
+        assertEquals(2, bufferPool.getSize());
+        // writtenDataLength as well flushedDataLength will be updated here
+        assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
-      // since data written is still less than flushLength, flushLength will
-      // still be 0.
-      assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
-      assertEquals(0, blockOutputStream.getTotalAckDataLength());
+        // since data written is still less than flushLength, flushLength will
+        // still be 0.
+        assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
+        assertEquals(0, blockOutputStream.getTotalAckDataLength());
 
-      assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
+        assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
 
-      // This will flush the data and update the flush length and the map.
-      key.flush();
-      assertEquals(writeChunkCount + 2,
-          metrics.getContainerOpCountMetrics(WriteChunk));
-      assertEquals(putBlockCount + ((enablePiggybacking) ? 0 : 1),
-          metrics.getContainerOpCountMetrics(PutBlock));
-      assertEquals(pendingWriteChunkCount,
-          metrics.getPendingContainerOpCountMetrics(WriteChunk));
-      assertEquals(pendingPutBlockCount,
-          metrics.getPendingContainerOpCountMetrics(PutBlock));
+        // This will flush the data and update the flush length and the map.
+        key.flush();
+        assertEquals(writeChunkCount + 2,
+            metrics.getContainerOpCountMetrics(WriteChunk));
+        assertEquals(putBlockCount + ((enablePiggybacking) ? 0 : 1),
+            metrics.getContainerOpCountMetrics(PutBlock));
+        assertEquals(pendingWriteChunkCount,
+            metrics.getPendingContainerOpCountMetrics(WriteChunk));
+        assertEquals(pendingPutBlockCount,
+            metrics.getPendingContainerOpCountMetrics(PutBlock));
 
-      assertEquals(2, bufferPool.getSize());
-      assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+        assertEquals(2, bufferPool.getSize());
+        assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
-      assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
-      assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
+        assertEquals(dataLength, 
blockOutputStream.getTotalDataFlushedLength());
+        assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
 
-      // flush ensures watchForCommit updates the total length acknowledged
-      assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+        // flush ensures watchForCommit updates the total length acknowledged
+        assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
 
-      // now close the stream, It will update ack length after watchForCommit
-      key.close();
+        // now close the stream, It will update ack length after watchForCommit
+      }
       assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
       // make sure the bufferPool is empty
       assertEquals(0, bufferPool.computeBufferData());
@@ -501,56 +509,56 @@ void testWriteMoreThanFlushSize(boolean flushDelay, 
boolean enablePiggybacking)
       long totalOpCount = metrics.getTotalOpCount();
 
       String keyName = getKeyName();
-      OzoneOutputStream key = createKey(client, keyName);
       int dataLength = FLUSH_SIZE + 50;
       byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
-      key.write(data1);
-
-      assertEquals(totalOpCount + 3, metrics.getTotalOpCount());
-      KeyOutputStream keyOutputStream =
-          assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+      KeyOutputStream keyOutputStream;
+      RatisBlockOutputStream blockOutputStream;
+      try (OzoneOutputStream key = createKey(client, keyName)) {
+        key.write(data1);
 
-      assertEquals(1, keyOutputStream.getStreamEntries().size());
-      RatisBlockOutputStream blockOutputStream =
-          assertInstanceOf(RatisBlockOutputStream.class,
-              keyOutputStream.getStreamEntries().get(0).getOutputStream());
+        assertEquals(totalOpCount + 3, metrics.getTotalOpCount());
+        keyOutputStream =
+            assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
 
-      // we have just written data more than flush Size(2 chunks), at this time
-      // buffer pool will have 3 buffers allocated worth of chunk size
-
-      assertEquals(3, blockOutputStream.getBufferPool().getSize());
-      // writtenDataLength as well flushedDataLength will be updated here
-      assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+        assertEquals(1, keyOutputStream.getStreamEntries().size());
+        blockOutputStream =
+            assertInstanceOf(RatisBlockOutputStream.class,
+                keyOutputStream.getStreamEntries().get(0).getOutputStream());
 
-      assertEquals(FLUSH_SIZE, blockOutputStream.getTotalDataFlushedLength());
-      assertEquals(0, blockOutputStream.getTotalAckDataLength());
+        // we have just written data more than flush Size(2 chunks), at this 
time
+        // buffer pool will have 3 buffers allocated worth of chunk size
 
-      // Before flush, if there was no pending PutBlock which means it is 
complete.
-      // It put a commit index into commitIndexMap.
-      assertEquals((metrics.getPendingContainerOpCountMetrics(PutBlock) == 
pendingPutBlockCount) ? 1 : 0,
-          blockOutputStream.getCommitIndex2flushedDataMap().size());
+        assertEquals(3, blockOutputStream.getBufferPool().getSize());
+        // writtenDataLength as well flushedDataLength will be updated here
+        assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
-      key.flush();
-      if (flushDelay) {
-        // If the flushDelay feature is enabled, nothing happens.
-        // The assertions will be as same as those before flush.
         assertEquals(FLUSH_SIZE, 
blockOutputStream.getTotalDataFlushedLength());
+        assertEquals(0, blockOutputStream.getTotalAckDataLength());
+
+        // Before flush, if there was no pending PutBlock which means it is 
complete.
+        // It put a commit index into commitIndexMap.
         assertEquals((metrics.getPendingContainerOpCountMetrics(PutBlock) == 
pendingPutBlockCount) ? 1 : 0,
             blockOutputStream.getCommitIndex2flushedDataMap().size());
 
-        assertEquals(0, blockOutputStream.getTotalAckDataLength());
-        assertEquals(1, keyOutputStream.getStreamEntries().size());
-      } else {
-        assertEquals(dataLength, 
blockOutputStream.getTotalDataFlushedLength());
-        assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
-
-        assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-        assertEquals(1, keyOutputStream.getStreamEntries().size());
+        key.flush();
+        if (flushDelay) {
+          // If the flushDelay feature is enabled, nothing happens.
+          // The assertions will be as same as those before flush.
+          assertEquals(FLUSH_SIZE, 
blockOutputStream.getTotalDataFlushedLength());
+          assertEquals((metrics.getPendingContainerOpCountMetrics(PutBlock) == 
pendingPutBlockCount) ? 1 : 0,
+              blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+          assertEquals(0, blockOutputStream.getTotalAckDataLength());
+          assertEquals(1, keyOutputStream.getStreamEntries().size());
+        } else {
+          assertEquals(dataLength, 
blockOutputStream.getTotalDataFlushedLength());
+          assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+          assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+          assertEquals(1, keyOutputStream.getStreamEntries().size());
+        }
       }
 
-
-      key.close();
-
       assertEquals(pendingWriteChunkCount,
           metrics.getPendingContainerOpCountMetrics(WriteChunk));
       assertEquals(pendingPutBlockCount,
@@ -590,65 +598,67 @@ void testWriteExactlyMaxFlushSize(boolean flushDelay, 
boolean enablePiggybacking
       long totalOpCount = metrics.getTotalOpCount();
 
       String keyName = getKeyName();
-      OzoneOutputStream key = createKey(client, keyName);
       int dataLength = MAX_FLUSH_SIZE;
       byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
-      key.write(data1);
-
-      KeyOutputStream keyOutputStream =
-          assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-      RatisBlockOutputStream blockOutputStream =
-          assertInstanceOf(RatisBlockOutputStream.class,
-              keyOutputStream.getStreamEntries().get(0).getOutputStream());
-      BufferPool bufferPool = blockOutputStream.getBufferPool();
-      // since it's hitting the full bufferCondition, it will call 
watchForCommit
-      // however, the outputstream will not wait for watchForCommit, but the 
next call to
-      // write() will need to wait for at least one watchForCommit, indirectly 
when asking for new buffer allocation.
-      bufferPool.waitUntilAvailable();
-
-      assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
-          .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
-      assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
-          .isLessThanOrEqualTo(pendingPutBlockCount + 1);
-
-      assertEquals(1, keyOutputStream.getStreamEntries().size());
-
-      assertEquals(4, blockOutputStream.getBufferPool().getSize());
-      // writtenDataLength as well flushedDataLength will be updated here
-      assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
-      assertEquals(MAX_FLUSH_SIZE,
-          blockOutputStream.getTotalDataFlushedLength());
-
-      // since data equals to maxBufferSize is written, this will be a blocking
-      // call and hence will wait for atleast flushSize worth of data to get
-      // ack'd by all servers right here
-      assertThat(blockOutputStream.getTotalAckDataLength())
-          .isGreaterThanOrEqualTo(FLUSH_SIZE);
-
-      // watchForCommit will clean up atleast one entry from the map where each
-      // entry corresponds to flushSize worth of data
-
-      assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
-          .isLessThanOrEqualTo(1);
-
-      // This will flush the data and update the flush length and the map.
-      key.flush();
-      assertEquals(1, keyOutputStream.getStreamEntries().size());
-      assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
-
-      // Since the data in the buffer is already flushed, flush here will have
-      // no impact on the counters and data structures
-
-      assertEquals(4, blockOutputStream.getBufferPool().getSize());
-      assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
-      assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
-      assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
-          .isLessThanOrEqualTo(1);
-
-      // now close the stream, it will update ack length after watchForCommit
-      key.close();
+      KeyOutputStream keyOutputStream;
+      RatisBlockOutputStream blockOutputStream;
+      try (OzoneOutputStream key = createKey(client, keyName)) {
+        key.write(data1);
+
+        keyOutputStream =
+            assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+        blockOutputStream =
+            assertInstanceOf(RatisBlockOutputStream.class,
+                keyOutputStream.getStreamEntries().get(0).getOutputStream());
+        BufferPool bufferPool = blockOutputStream.getBufferPool();
+        // since it's hitting the full bufferCondition, it will call 
watchForCommit
+        // however, the outputstream will not wait for watchForCommit, but the 
next call to
+        // write() will need to wait for at least one watchForCommit, 
indirectly when asking for new buffer allocation.
+        bufferPool.waitUntilAvailable();
+
+        assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
+            .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
+        assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
+            .isLessThanOrEqualTo(pendingPutBlockCount + 1);
+
+        assertEquals(1, keyOutputStream.getStreamEntries().size());
+
+        assertEquals(4, blockOutputStream.getBufferPool().getSize());
+        // writtenDataLength as well flushedDataLength will be updated here
+        assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+        assertEquals(MAX_FLUSH_SIZE,
+            blockOutputStream.getTotalDataFlushedLength());
+
+        // since data equals to maxBufferSize is written, this will be a 
blocking
+        // call and hence will wait for atleast flushSize worth of data to get
+        // ack'd by all servers right here
+        assertThat(blockOutputStream.getTotalAckDataLength())
+            .isGreaterThanOrEqualTo(FLUSH_SIZE);
+
+        // watchForCommit will clean up atleast one entry from the map where 
each
+        // entry corresponds to flushSize worth of data
+
+        assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+            .isLessThanOrEqualTo(1);
+
+        // This will flush the data and update the flush length and the map.
+        key.flush();
+        assertEquals(1, keyOutputStream.getStreamEntries().size());
+        assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
+
+        // Since the data in the buffer is already flushed, flush here will 
have
+        // no impact on the counters and data structures
+
+        assertEquals(4, blockOutputStream.getBufferPool().getSize());
+        assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+        assertEquals(dataLength, 
blockOutputStream.getTotalDataFlushedLength());
+        assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+            .isLessThanOrEqualTo(1);
+
+        // now close the stream, it will update ack length after watchForCommit
+      }
       assertEquals(pendingWriteChunkCount,
           metrics.getPendingContainerOpCountMetrics(WriteChunk));
       assertEquals(pendingPutBlockCount,
@@ -684,71 +694,73 @@ void testWriteMoreThanMaxFlushSize(boolean flushDelay, 
boolean enablePiggybackin
           metrics.getPendingContainerOpCountMetrics(PutBlock);
       long totalOpCount = metrics.getTotalOpCount();
       String keyName = getKeyName();
-      OzoneOutputStream key = createKey(client, keyName);
       int dataLength = MAX_FLUSH_SIZE + 50;
       // write data more than 1 chunk
       byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
-      key.write(data1);
-      KeyOutputStream keyOutputStream =
-          assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-
-      // since it's hitting full-buffer, it will call watchForCommit
-      // and completes putBlock at least for first flushSize worth of data
-      assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
-          .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
-      assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
-          .isLessThanOrEqualTo(pendingPutBlockCount + 1);
-      assertEquals(writeChunkCount + 4,
-          metrics.getContainerOpCountMetrics(WriteChunk));
-      assertEquals(putBlockCount + 2,
-          metrics.getContainerOpCountMetrics(PutBlock));
-      assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
-      assertEquals(1, keyOutputStream.getStreamEntries().size());
-      RatisBlockOutputStream blockOutputStream =
-          assertInstanceOf(RatisBlockOutputStream.class,
-              keyOutputStream.getStreamEntries().get(0).getOutputStream());
-
-      assertThat(blockOutputStream.getBufferPool().getSize())
-          .isLessThanOrEqualTo(4);
-      // writtenDataLength as well flushedDataLength will be updated here
-      assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
-      assertEquals(MAX_FLUSH_SIZE,
-          blockOutputStream.getTotalDataFlushedLength());
-
-      // since data equals to maxBufferSize is written, this will be a blocking
-      // call and hence will wait for atleast flushSize worth of data to get
-      // ack'd by all servers right here
-      assertThat(blockOutputStream.getTotalAckDataLength())
-          .isGreaterThanOrEqualTo(FLUSH_SIZE);
-
-      // watchForCommit will clean up atleast one entry from the map where each
-      // entry corresponds to flushSize worth of data
-      assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
-          .isLessThanOrEqualTo(1);
-
-      // Now do a flush.
-      key.flush();
-      assertEquals(1, keyOutputStream.getStreamEntries().size());
-      assertEquals(pendingWriteChunkCount,
-          metrics.getPendingContainerOpCountMetrics(WriteChunk));
-      assertEquals(pendingPutBlockCount,
-          metrics.getPendingContainerOpCountMetrics(PutBlock));
+      KeyOutputStream keyOutputStream;
+      RatisBlockOutputStream blockOutputStream;
+      try (OzoneOutputStream key = createKey(client, keyName)) {
+        key.write(data1);
+        keyOutputStream =
+            assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+
+        // since it's hitting full-buffer, it will call watchForCommit
+        // and completes putBlock at least for first flushSize worth of data
+        assertThat(metrics.getPendingContainerOpCountMetrics(WriteChunk))
+            .isLessThanOrEqualTo(pendingWriteChunkCount + 2);
+        assertThat(metrics.getPendingContainerOpCountMetrics(PutBlock))
+            .isLessThanOrEqualTo(pendingPutBlockCount + 1);
+        assertEquals(writeChunkCount + 4,
+            metrics.getContainerOpCountMetrics(WriteChunk));
+        assertEquals(putBlockCount + 2,
+            metrics.getContainerOpCountMetrics(PutBlock));
+        assertEquals(totalOpCount + 6, metrics.getTotalOpCount());
+        assertEquals(1, keyOutputStream.getStreamEntries().size());
+        blockOutputStream =
+            assertInstanceOf(RatisBlockOutputStream.class,
+                keyOutputStream.getStreamEntries().get(0).getOutputStream());
+
+        assertThat(blockOutputStream.getBufferPool().getSize())
+            .isLessThanOrEqualTo(4);
+        // writtenDataLength as well flushedDataLength will be updated here
+        assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+        assertEquals(MAX_FLUSH_SIZE,
+            blockOutputStream.getTotalDataFlushedLength());
+
+        // since data equals to maxBufferSize is written, this will be a 
blocking
+        // call and hence will wait for atleast flushSize worth of data to get
+        // ack'd by all servers right here
+        assertThat(blockOutputStream.getTotalAckDataLength())
+            .isGreaterThanOrEqualTo(FLUSH_SIZE);
+
+        // watchForCommit will clean up atleast one entry from the map where 
each
+        // entry corresponds to flushSize worth of data
+        assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+            .isLessThanOrEqualTo(1);
+
+        // Now do a flush.
+        key.flush();
+        assertEquals(1, keyOutputStream.getStreamEntries().size());
+        assertEquals(pendingWriteChunkCount,
+            metrics.getPendingContainerOpCountMetrics(WriteChunk));
+        assertEquals(pendingPutBlockCount,
+            metrics.getPendingContainerOpCountMetrics(PutBlock));
 
-      // Since the data in the buffer is already flushed, flush here will have
-      // no impact on the counters and data structures
+        // Since the data in the buffer is already flushed, flush here will 
have
+        // no impact on the counters and data structures
 
-      assertThat(blockOutputStream.getBufferPool().getSize())
-          .isLessThanOrEqualTo(4);
-      assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-      // dataLength > MAX_FLUSH_SIZE
-      assertEquals(flushDelay ? MAX_FLUSH_SIZE : dataLength,
-          blockOutputStream.getTotalDataFlushedLength());
-      assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
-          .isLessThanOrEqualTo(2);
+        assertThat(blockOutputStream.getBufferPool().getSize())
+            .isLessThanOrEqualTo(4);
+        assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+        // dataLength > MAX_FLUSH_SIZE
+        assertEquals(flushDelay ? MAX_FLUSH_SIZE : dataLength,
+            blockOutputStream.getTotalDataFlushedLength());
+        assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+            .isLessThanOrEqualTo(2);
 
-      // now close the stream, it will update ack length after watchForCommit
-      key.close();
+        // now close the stream, it will update ack length after watchForCommit
+      }
       assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
       // make sure the bufferPool is empty
       assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index ff07b599a9c..70e15f9b677 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -107,116 +107,27 @@ void testContainerClose(boolean flushDelay, boolean 
enablePiggybacking) throws E
   private void testWatchForCommitWithCloseContainerException(OzoneClient 
client)
       throws Exception {
     String keyName = getKeyName();
-    OzoneOutputStream key = createKey(client, keyName);
     int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
     byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
-    key.write(data1);
-
-    KeyOutputStream keyOutputStream =
-        assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-
-    assertEquals(1, keyOutputStream.getStreamEntries().size());
-    RatisBlockOutputStream blockOutputStream =
-        assertInstanceOf(RatisBlockOutputStream.class,
-            keyOutputStream.getStreamEntries().get(0).getOutputStream());
-
-    // we have just written data more than flush Size(2 chunks), at this time
-    // buffer pool will have 4 buffers allocated worth of chunk size
-
-    assertEquals(4, blockOutputStream.getBufferPool().getSize());
-    // writtenDataLength as well flushedDataLength will be updated here
-    assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
-    assertEquals(MAX_FLUSH_SIZE, 
blockOutputStream.getTotalDataFlushedLength());
-
-    // since data equals to maxBufferSize is written, this will be a blocking
-    // call and hence will wait for atleast flushSize worth of data to get
-    // ack'd by all servers right here
-    assertThat(blockOutputStream.getTotalAckDataLength())
-        .isGreaterThanOrEqualTo(FLUSH_SIZE);
-
-    // watchForCommit will clean up atleast one entry from the map where each
-    // entry corresponds to flushSize worth of data
-    assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
-        .isLessThanOrEqualTo(1);
-
-    // This will flush the data and update the flush length and the map.
-    key.flush();
-
-    // flush is a sync call, all pending operations will complete
-    // Since the data in the buffer is already flushed, flush here will have
-    // no impact on the counters and data structures
-
-    assertEquals(4, blockOutputStream.getBufferPool().getSize());
-    assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
-    assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
-    // flush will make sure one more entry gets updated in the map
-    assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
-        .isLessThanOrEqualTo(2);
-
-    XceiverClientRatis raftClient =
-        (XceiverClientRatis) blockOutputStream.getXceiverClient();
-    assertEquals(3, raftClient.getCommitInfoMap().size());
-    // Close the containers on the Datanode and write more data
-    TestHelper.waitForContainerClose(key, cluster);
-    key.write(data1);
-
-    // As a part of handling the exception, 4 failed writeChunks  will be
-    // rewritten plus one partial chunk plus two putBlocks for flushSize
-    // and one flush for partial chunk
-    key.flush();
-    assertEquals(2, keyOutputStream.getStreamEntries().size());
-    assertInstanceOf(ContainerNotOpenException.class,
-        checkForException(blockOutputStream.getIoException()));
-
-    // Make sure the retryCount is reset after the exception is handled
-    assertEquals(0, keyOutputStream.getRetryCount());
-    // commitInfoMap will remain intact as there is no server failure
-    assertEquals(3, raftClient.getCommitInfoMap().size());
-    // now close the stream, It will update ack length after watchForCommit
-    key.close();
-    // make sure the bufferPool is empty
-    assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
-    assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
-    assertEquals(0, keyOutputStream.getStreamEntries().size());
-    // Written the same data twice
-    byte[] bytes = ArrayUtils.addAll(data1, data1);
-    validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
-  }
-
-  @ParameterizedTest
-  @MethodSource("clientParameters")
-  void testWatchForCommitDatanodeFailure(boolean flushDelay, boolean 
enablePiggybacking) throws Exception {
-    OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, 
enablePiggybacking);
-    try (OzoneClient client = newClient(cluster.getConf(), config)) {
-      String keyName = getKeyName();
-      OzoneOutputStream key = createKey(client, keyName);
-      int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
-      byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
+    KeyOutputStream keyOutputStream;
+    RatisBlockOutputStream blockOutputStream;
+    try (OzoneOutputStream key = createKey(client, keyName)) {
       key.write(data1);
-      // since its hitting the full bufferCondition, it will call 
watchForCommit
-      // and completes at least putBlock for first flushSize worth of data
-      KeyOutputStream keyOutputStream =
-          assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+
+      keyOutputStream = assertInstanceOf(KeyOutputStream.class, 
key.getOutputStream());
 
       assertEquals(1, keyOutputStream.getStreamEntries().size());
-      RatisBlockOutputStream blockOutputStream =
-          assertInstanceOf(RatisBlockOutputStream.class,
-              keyOutputStream.getStreamEntries().get(0).getOutputStream());
+      blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+          keyOutputStream.getStreamEntries().get(0).getOutputStream());
 
       // we have just written data more than flush Size(2 chunks), at this time
-      // buffer pool will have 3 buffers allocated worth of chunk size
+      // buffer pool will have 4 buffers allocated worth of chunk size
 
       assertEquals(4, blockOutputStream.getBufferPool().getSize());
       // writtenDataLength as well flushedDataLength will be updated here
       assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
-      // since data written is still less than flushLength, flushLength will
-      // still be 0.
-      assertEquals(MAX_FLUSH_SIZE,
-          blockOutputStream.getTotalDataFlushedLength());
+      assertEquals(MAX_FLUSH_SIZE, 
blockOutputStream.getTotalDataFlushedLength());
 
       // since data equals to maxBufferSize is written, this will be a blocking
       // call and hence will wait for atleast flushSize worth of data to get
@@ -224,14 +135,15 @@ void testWatchForCommitDatanodeFailure(boolean 
flushDelay, boolean enablePiggyba
       assertThat(blockOutputStream.getTotalAckDataLength())
           .isGreaterThanOrEqualTo(FLUSH_SIZE);
 
-      // watchForCommit will clean up atleast flushSize worth of data buffer
-      // where each entry corresponds to flushSize worth of data
+      // watchForCommit will clean up atleast one entry from the map where each
+      // entry corresponds to flushSize worth of data
       assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
-          .isLessThanOrEqualTo(2);
+          .isLessThanOrEqualTo(1);
 
       // This will flush the data and update the flush length and the map.
       key.flush();
 
+      // flush is a sync call, all pending operations will complete
       // Since the data in the buffer is already flushed, flush here will have
       // no impact on the counters and data structures
 
@@ -239,127 +151,215 @@ void testWatchForCommitDatanodeFailure(boolean 
flushDelay, boolean enablePiggyba
       assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
       assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
-      //  flush will make sure one more entry gets updated in the map
-      assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
+      // flush will make sure one more entry gets updated in the map
+      assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+          .isLessThanOrEqualTo(2);
 
       XceiverClientRatis raftClient =
           (XceiverClientRatis) blockOutputStream.getXceiverClient();
       assertEquals(3, raftClient.getCommitInfoMap().size());
-      Pipeline pipeline = raftClient.getPipeline();
-      stopAndRemove(pipeline.getNodes().get(0));
-
-      // again write data with more than max buffer limit. This will call
-      // watchForCommit again. Since the commit will happen 2 way, the
-      // commitInfoMap will get updated for servers which are alive
+      // Close the containers on the Datanode and write more data
+      TestHelper.waitForContainerClose(key, cluster);
       key.write(data1);
 
+      // As a part of handling the exception, 4 failed writeChunks  will be
+      // rewritten plus one partial chunk plus two putBlocks for flushSize
+      // and one flush for partial chunk
       key.flush();
-
       assertEquals(2, keyOutputStream.getStreamEntries().size());
-      // now close the stream, It will update ack length after watchForCommit
-      key.close();
+      assertInstanceOf(ContainerNotOpenException.class,
+          checkForException(blockOutputStream.getIoException()));
+
       // Make sure the retryCount is reset after the exception is handled
       assertEquals(0, keyOutputStream.getRetryCount());
-      // make sure the bufferPool is empty
-      assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
-      assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
-      assertEquals(0, keyOutputStream.getStreamEntries().size());
-      // Written the same data twice
-      byte[] bytes = ArrayUtils.addAll(data1, data1);
-      validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
+      // commitInfoMap will remain intact as there is no server failure
+      assertEquals(3, raftClient.getCommitInfoMap().size());
+      // now close the stream, It will update ack length after watchForCommit
     }
+    // make sure the bufferPool is empty
+    assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
+    assertEquals(0, keyOutputStream.getStreamEntries().size());
+    // Written the same data twice
+    byte[] bytes = ArrayUtils.addAll(data1, data1);
+    validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
   }
 
   @ParameterizedTest
   @MethodSource("clientParameters")
-  void test2DatanodesFailure(boolean flushDelay, boolean enablePiggybacking) 
throws Exception {
+  void testWatchForCommitDatanodeFailure(boolean flushDelay, boolean 
enablePiggybacking) throws Exception {
     OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, 
enablePiggybacking);
     try (OzoneClient client = newClient(cluster.getConf(), config)) {
       String keyName = getKeyName();
-      OzoneOutputStream key = createKey(client, keyName);
       int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
       byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
-      key.write(data1);
-      // since its hitting the full bufferCondition, it will call 
watchForCommit
-      // and completes atleast putBlock for first flushSize worth of data
-      KeyOutputStream keyOutputStream =
-          assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+      KeyOutputStream keyOutputStream;
+      RatisBlockOutputStream blockOutputStream;
+      try (OzoneOutputStream key = createKey(client, keyName)) {
+        key.write(data1);
+        // since its hitting the full bufferCondition, it will call 
watchForCommit
+        // and completes at least putBlock for first flushSize worth of data
+        keyOutputStream = assertInstanceOf(KeyOutputStream.class, 
key.getOutputStream());
+
+        assertEquals(1, keyOutputStream.getStreamEntries().size());
+        blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+            keyOutputStream.getStreamEntries().get(0).getOutputStream());
 
-      RatisBlockOutputStream blockOutputStream =
-          assertInstanceOf(RatisBlockOutputStream.class,
-              keyOutputStream.getStreamEntries().get(0).getOutputStream());
+        // we have just written data more than flush Size(2 chunks), at this 
time
+        // buffer pool will have 3 buffers allocated worth of chunk size
 
-      // we have just written data more than flush Size(2 chunks), at this time
-      // buffer pool will have 3 buffers allocated worth of chunk size
+        assertEquals(4, blockOutputStream.getBufferPool().getSize());
+        // writtenDataLength as well flushedDataLength will be updated here
+        assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
-      assertEquals(4, blockOutputStream.getBufferPool().getSize());
-      // writtenDataLength as well flushedDataLength will be updated here
-      assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+        // since data written is still less than flushLength, flushLength will
+        // still be 0.
+        assertEquals(MAX_FLUSH_SIZE,
+            blockOutputStream.getTotalDataFlushedLength());
 
-      assertEquals(MAX_FLUSH_SIZE,
-          blockOutputStream.getTotalDataFlushedLength());
+        // since data equals to maxBufferSize is written, this will be a 
blocking
+        // call and hence will wait for atleast flushSize worth of data to get
+        // ack'd by all servers right here
+        assertThat(blockOutputStream.getTotalAckDataLength())
+            .isGreaterThanOrEqualTo(FLUSH_SIZE);
 
-      // since data equals to maxBufferSize is written, this will be a blocking
-      // call and hence will wait for atleast flushSize worth of data to get
-      // acked by all servers right here
-      assertThat(blockOutputStream.getTotalAckDataLength())
-          .isGreaterThanOrEqualTo(FLUSH_SIZE);
+        // watchForCommit will clean up atleast flushSize worth of data buffer
+        // where each entry corresponds to flushSize worth of data
+        assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+            .isLessThanOrEqualTo(2);
 
-      // watchForCommit will clean up atleast one entry from the map where each
-      // entry corresponds to flushSize worth of data
-      assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
-          .isLessThanOrEqualTo(1);
+        // This will flush the data and update the flush length and the map.
+        key.flush();
 
-      // This will flush the data and update the flush length and the map.
-      key.flush();
-
-      // Since the data in the buffer is already flushed, flush here will have
-      // no impact on the counters and data structures
+        // Since the data in the buffer is already flushed, flush here will 
have
+        // no impact on the counters and data structures
 
-      assertEquals(4, blockOutputStream.getBufferPool().getSize());
-      assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+        assertEquals(4, blockOutputStream.getBufferPool().getSize());
+        assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
-      assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
-      // flush will make sure one more entry gets updated in the map
-      assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
-          .isLessThanOrEqualTo(2);
+        assertEquals(dataLength, 
blockOutputStream.getTotalDataFlushedLength());
+        //  flush will make sure one more entry gets updated in the map
+        assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
 
-      XceiverClientRatis raftClient =
-          (XceiverClientRatis) blockOutputStream.getXceiverClient();
-      assertEquals(3, raftClient.getCommitInfoMap().size());
-      Pipeline pipeline = raftClient.getPipeline();
-      stopAndRemove(pipeline.getNodes().get(0));
-      stopAndRemove(pipeline.getNodes().get(1));
-      // again write data with more than max buffer limit. This will call
-      // watchForCommit again. Since the commit will happen 2 way, the
-      // commitInfoMap will get updated for servers which are alive
+        XceiverClientRatis raftClient =
+            (XceiverClientRatis) blockOutputStream.getXceiverClient();
+        assertEquals(3, raftClient.getCommitInfoMap().size());
+        Pipeline pipeline = raftClient.getPipeline();
+        stopAndRemove(pipeline.getNodes().get(0));
 
-      // 4 writeChunks = maxFlushSize + 2 putBlocks  will be discarded here
-      // once exception is hit
-      key.write(data1);
+        // again write data with more than max buffer limit. This will call
+        // watchForCommit again. Since the commit will happen 2 way, the
+        // commitInfoMap will get updated for servers which are alive
+        key.write(data1);
 
-      // As a part of handling the exception, 4 failed writeChunks  will be
-      // rewritten plus one partial chunk plus two putBlocks for flushSize
-      // and one flush for partial chunk
-      key.flush();
+        key.flush();
 
-      Throwable ioException = checkForException(
-          blockOutputStream.getIoException());
-      // Since, 2 datanodes went down,
-      // a) if the pipeline gets destroyed quickly it will hit
-      //    GroupMismatchException.
-      // b) will hit close container exception if the container is closed
-      //    but pipeline is still not destroyed.
-      // c) will fail with RaftRetryFailureException if the leader election
-      //    did not finish before the request retry count finishes.
-      assertTrue(ioException instanceof RaftRetryFailureException
-          || ioException instanceof GroupMismatchException
-          || ioException instanceof ContainerNotOpenException);
+        assertEquals(2, keyOutputStream.getStreamEntries().size());
+        // now close the stream, It will update ack length after watchForCommit
+      }
       // Make sure the retryCount is reset after the exception is handled
       assertEquals(0, keyOutputStream.getRetryCount());
-      // now close the stream, It will update ack length after watchForCommit
+      // make sure the bufferPool is empty
+      assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+      assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
+      assertEquals(0, keyOutputStream.getStreamEntries().size());
+      // Written the same data twice
+      byte[] bytes = ArrayUtils.addAll(data1, data1);
+      validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
+    }
+  }
 
-      key.close();
+  @ParameterizedTest
+  @MethodSource("clientParameters")
+  void test2DatanodesFailure(boolean flushDelay, boolean enablePiggybacking) 
throws Exception {
+    OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, 
enablePiggybacking);
+    try (OzoneClient client = newClient(cluster.getConf(), config)) {
+      String keyName = getKeyName();
+      int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
+      byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
+      KeyOutputStream keyOutputStream;
+      RatisBlockOutputStream blockOutputStream;
+      try (OzoneOutputStream key = createKey(client, keyName)) {
+        key.write(data1);
+        // since its hitting the full bufferCondition, it will call 
watchForCommit
+        // and completes atleast putBlock for first flushSize worth of data
+        keyOutputStream = assertInstanceOf(KeyOutputStream.class, 
key.getOutputStream());
+
+        blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+            keyOutputStream.getStreamEntries().get(0).getOutputStream());
+
+        // we have just written data more than flush Size(2 chunks), at this 
time
+        // buffer pool will have 3 buffers allocated worth of chunk size
+
+        assertEquals(4, blockOutputStream.getBufferPool().getSize());
+        // writtenDataLength as well flushedDataLength will be updated here
+        assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+        assertEquals(MAX_FLUSH_SIZE,
+            blockOutputStream.getTotalDataFlushedLength());
+
+        // since data equals to maxBufferSize is written, this will be a 
blocking
+        // call and hence will wait for atleast flushSize worth of data to get
+        // acked by all servers right here
+        assertThat(blockOutputStream.getTotalAckDataLength())
+            .isGreaterThanOrEqualTo(FLUSH_SIZE);
+
+        // watchForCommit will clean up atleast one entry from the map where 
each
+        // entry corresponds to flushSize worth of data
+        assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+            .isLessThanOrEqualTo(1);
+
+        // This will flush the data and update the flush length and the map.
+        key.flush();
+
+        // Since the data in the buffer is already flushed, flush here will 
have
+        // no impact on the counters and data structures
+
+        assertEquals(4, blockOutputStream.getBufferPool().getSize());
+        assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+        assertEquals(dataLength, 
blockOutputStream.getTotalDataFlushedLength());
+        // flush will make sure one more entry gets updated in the map
+        assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+            .isLessThanOrEqualTo(2);
+
+        XceiverClientRatis raftClient =
+            (XceiverClientRatis) blockOutputStream.getXceiverClient();
+        assertEquals(3, raftClient.getCommitInfoMap().size());
+        Pipeline pipeline = raftClient.getPipeline();
+        stopAndRemove(pipeline.getNodes().get(0));
+        stopAndRemove(pipeline.getNodes().get(1));
+        // again write data with more than max buffer limit. This will call
+        // watchForCommit again. Since the commit will happen 2 way, the
+        // commitInfoMap will get updated for servers which are alive
+
+        // 4 writeChunks = maxFlushSize + 2 putBlocks  will be discarded here
+        // once exception is hit
+        key.write(data1);
+
+        // As a part of handling the exception, 4 failed writeChunks  will be
+        // rewritten plus one partial chunk plus two putBlocks for flushSize
+        // and one flush for partial chunk
+        key.flush();
+
+        Throwable ioException = checkForException(
+            blockOutputStream.getIoException());
+        // Since, 2 datanodes went down,
+        // a) if the pipeline gets destroyed quickly it will hit
+        //    GroupMismatchException.
+        // b) will hit close container exception if the container is closed
+        //    but pipeline is still not destroyed.
+        // c) will fail with RaftRetryFailureException if the leader election
+        //    did not finish before the request retry count finishes.
+        assertTrue(ioException instanceof RaftRetryFailureException
+            || ioException instanceof GroupMismatchException
+            || ioException instanceof ContainerNotOpenException);
+        // Make sure the retryCount is reset after the exception is handled
+        assertEquals(0, keyOutputStream.getRetryCount());
+        // now close the stream, It will update ack length after watchForCommit
+
+      }
       assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
       assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
       // make sure the bufferPool is empty
@@ -372,51 +372,51 @@ void test2DatanodesFailure(boolean flushDelay, boolean 
enablePiggybacking) throw
   private void testWriteMoreThanMaxFlushSize(OzoneClient client)
       throws Exception {
     String keyName = getKeyName();
-    OzoneOutputStream key = createKey(client, keyName);
     int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
     byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
-    key.write(data1);
+    KeyOutputStream keyOutputStream;
+    RatisBlockOutputStream blockOutputStream;
+    try (OzoneOutputStream key = createKey(client, keyName)) {
+      key.write(data1);
 
-    KeyOutputStream keyOutputStream =
-        assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+      keyOutputStream = assertInstanceOf(KeyOutputStream.class, 
key.getOutputStream());
 
-    assertEquals(1, keyOutputStream.getStreamEntries().size());
-    RatisBlockOutputStream blockOutputStream =
-        assertInstanceOf(RatisBlockOutputStream.class,
-            keyOutputStream.getStreamEntries().get(0).getOutputStream());
+      assertEquals(1, keyOutputStream.getStreamEntries().size());
+      blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+          keyOutputStream.getStreamEntries().get(0).getOutputStream());
 
-    assertThat(blockOutputStream.getBufferPool().getSize())
-        .isLessThanOrEqualTo(4);
-    assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+      assertThat(blockOutputStream.getBufferPool().getSize())
+          .isLessThanOrEqualTo(4);
+      assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
-    assertEquals(400, blockOutputStream.getTotalDataFlushedLength());
+      assertEquals(400, blockOutputStream.getTotalDataFlushedLength());
 
-    // This will flush the data and update the flush length and the map.
-    key.flush();
+      // This will flush the data and update the flush length and the map.
+      key.flush();
 
-    assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
+      assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
 
-    XceiverClientRatis raftClient =
-        (XceiverClientRatis) blockOutputStream.getXceiverClient();
-    assertEquals(3, raftClient.getCommitInfoMap().size());
-    // Close the containers on the Datanode and write more data
-    TestHelper.waitForContainerClose(key, cluster);
-    key.write(data1);
+      XceiverClientRatis raftClient =
+          (XceiverClientRatis) blockOutputStream.getXceiverClient();
+      assertEquals(3, raftClient.getCommitInfoMap().size());
+      // Close the containers on the Datanode and write more data
+      TestHelper.waitForContainerClose(key, cluster);
+      key.write(data1);
 
-    // As a part of handling the exception, 2 failed writeChunks  will be
-    // rewritten plus 1 putBlocks for flush
-    // and one flush for partial chunk
-    key.flush();
+      // As a part of handling the exception, 2 failed writeChunks  will be
+      // rewritten plus 1 putBlocks for flush
+      // and one flush for partial chunk
+      key.flush();
 
-    assertInstanceOf(ContainerNotOpenException.class,
-        checkForException(blockOutputStream.getIoException()));
-    // Make sure the retryCount is reset after the exception is handled
-    assertEquals(0, keyOutputStream.getRetryCount());
+      assertInstanceOf(ContainerNotOpenException.class,
+          checkForException(blockOutputStream.getIoException()));
+      // Make sure the retryCount is reset after the exception is handled
+      assertEquals(0, keyOutputStream.getRetryCount());
 
-    // commitInfoMap will remain intact as there is no server failure
-    assertEquals(3, raftClient.getCommitInfoMap().size());
-    // now close the stream, It will update ack length after watchForCommit
-    key.close();
+      // commitInfoMap will remain intact as there is no server failure
+      assertEquals(3, raftClient.getCommitInfoMap().size());
+      // now close the stream, It will update ack length after watchForCommit
+    }
     // make sure the bufferPool is empty
     assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
     assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
@@ -429,52 +429,52 @@ private void testWriteMoreThanMaxFlushSize(OzoneClient 
client)
 
   private void testExceptionDuringClose(OzoneClient client) throws Exception {
     String keyName = getKeyName();
-    OzoneOutputStream key = createKey(client, keyName);
     int dataLength = 167;
     byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
-    key.write(data1);
+    KeyOutputStream keyOutputStream;
+    RatisBlockOutputStream blockOutputStream;
+    try (OzoneOutputStream key = createKey(client, keyName)) {
+      key.write(data1);
 
-    KeyOutputStream keyOutputStream =
-        assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+      keyOutputStream = assertInstanceOf(KeyOutputStream.class, 
key.getOutputStream());
 
-    assertEquals(1, keyOutputStream.getStreamEntries().size());
-    RatisBlockOutputStream blockOutputStream =
-        assertInstanceOf(RatisBlockOutputStream.class,
-            keyOutputStream.getStreamEntries().get(0).getOutputStream());
+      assertEquals(1, keyOutputStream.getStreamEntries().size());
+      blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+          keyOutputStream.getStreamEntries().get(0).getOutputStream());
 
-    assertThat(blockOutputStream.getBufferPool().getSize())
-        .isLessThanOrEqualTo(2);
-    assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+      assertThat(blockOutputStream.getBufferPool().getSize())
+          .isLessThanOrEqualTo(2);
+      assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
-    assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
+      assertEquals(0, blockOutputStream.getTotalDataFlushedLength());
 
-    assertEquals(0, blockOutputStream.getTotalAckDataLength());
+      assertEquals(0, blockOutputStream.getTotalAckDataLength());
 
-    assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
-    // This will flush the data and update the flush length and the map.
-    key.flush();
-    // Since the data in the buffer is already flushed, flush here will have
-    // no impact on the counters and data structures
+      assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
+      // This will flush the data and update the flush length and the map.
+      key.flush();
+      // Since the data in the buffer is already flushed, flush here will have
+      // no impact on the counters and data structures
 
-    assertThat(blockOutputStream.getBufferPool().getSize())
-        .isLessThanOrEqualTo(2);
-    assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+      assertThat(blockOutputStream.getBufferPool().getSize())
+          .isLessThanOrEqualTo(2);
+      assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
-    assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
-    // flush will make sure one more entry gets updated in the map
-    assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
+      assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
+      // flush will make sure one more entry gets updated in the map
+      assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
 
-    XceiverClientRatis raftClient =
-        (XceiverClientRatis) blockOutputStream.getXceiverClient();
-    assertEquals(3, raftClient.getCommitInfoMap().size());
-    // Close the containers on the Datanode and write more data
-    TestHelper.waitForContainerClose(key, cluster);
-    key.write(data1);
+      XceiverClientRatis raftClient =
+          (XceiverClientRatis) blockOutputStream.getXceiverClient();
+      assertEquals(3, raftClient.getCommitInfoMap().size());
+      // Close the containers on the Datanode and write more data
+      TestHelper.waitForContainerClose(key, cluster);
+      key.write(data1);
 
-    // commitInfoMap will remain intact as there is no server failure
-    assertEquals(3, raftClient.getCommitInfoMap().size());
-    // now close the stream, It will hit exception
-    key.close();
+      // commitInfoMap will remain intact as there is no server failure
+      assertEquals(3, raftClient.getCommitInfoMap().size());
+      // now close the stream, It will hit exception
+    }
 
     assertInstanceOf(ContainerNotOpenException.class,
         checkForException(blockOutputStream.getIoException()));
@@ -493,119 +493,28 @@ private void testExceptionDuringClose(OzoneClient 
client) throws Exception {
   private void testWatchForCommitWithSingleNodeRatis(OzoneClient client)
       throws Exception {
     String keyName = getKeyName();
-    OzoneOutputStream key =
-        createKey(client, keyName, 0, ReplicationFactor.ONE);
     int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
     byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
-    key.write(data1);
-
-    KeyOutputStream keyOutputStream =
-        assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
-
-    assertEquals(1, keyOutputStream.getStreamEntries().size());
-    RatisBlockOutputStream blockOutputStream =
-        assertInstanceOf(RatisBlockOutputStream.class,
-            keyOutputStream.getStreamEntries().get(0).getOutputStream());
-
-    // we have just written data more than flush Size(2 chunks), at this time
-    // buffer pool will have up to 4 buffers allocated worth of chunk size
-
-    assertThat(blockOutputStream.getBufferPool().getSize())
-        .isLessThanOrEqualTo(4);
-    // writtenDataLength as well flushedDataLength will be updated here
-    assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
-    assertEquals(MAX_FLUSH_SIZE, 
blockOutputStream.getTotalDataFlushedLength());
-
-    // since data equals to maxBufferSize is written, this will be a blocking
-    // call and hence will wait for atleast flushSize worth of data to get
-    // ack'd by all servers right here
-    assertThat(blockOutputStream.getTotalAckDataLength())
-        .isGreaterThanOrEqualTo(FLUSH_SIZE);
-
-    // watchForCommit will clean up atleast one entry from the map where each
-    // entry corresponds to flushSize worth of data
-    assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
-        .isLessThanOrEqualTo(1);
-
-    // This will flush the data and update the flush length and the map.
-    key.flush();
-
-    // Since the data in the buffer is already flushed, flush here will have
-    // no impact on the counters and data structures
-
-    assertThat(blockOutputStream.getBufferPool().getSize())
-        .isLessThanOrEqualTo(4);
-    assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
-
-    assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
-    // flush will make sure one more entry gets updated in the map
-    assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
-        .isLessThanOrEqualTo(2);
-
-    XceiverClientRatis raftClient =
-        (XceiverClientRatis) blockOutputStream.getXceiverClient();
-    assertEquals(1, raftClient.getCommitInfoMap().size());
-    // Close the containers on the Datanode and write more data
-    TestHelper.waitForContainerClose(key, cluster);
-    // 4 writeChunks = maxFlushSize + 2 putBlocks  will be discarded here
-    // once exception is hit
-    key.write(data1);
-
-    // As a part of handling the exception, 4 failed writeChunks  will be
-    // rewritten plus one partial chunk plus two putBlocks for flushSize
-    // and one flush for partial chunk
-    key.flush();
-
-    assertInstanceOf(ContainerNotOpenException.class,
-        checkForException(blockOutputStream.getIoException()));
-    // Make sure the retryCount is reset after the exception is handled
-    assertEquals(0, keyOutputStream.getRetryCount());
-    // commitInfoMap will remain intact as there is no server failure
-    assertEquals(1, raftClient.getCommitInfoMap().size());
-    assertEquals(2, keyOutputStream.getStreamEntries().size());
-    // now close the stream, It will update ack length after watchForCommit
-    key.close();
-    // make sure the bufferPool is empty
-    assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
-    assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
-    assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
-    assertEquals(0, keyOutputStream.getLocationInfoList().size());
-    // Written the same data twice
-    byte[] bytes = ArrayUtils.addAll(data1, data1);
-    validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
-  }
-
-  @ParameterizedTest
-  @MethodSource("clientParameters")
-  void testDatanodeFailureWithSingleNode(boolean flushDelay, boolean 
enablePiggybacking) throws Exception {
-    OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, 
enablePiggybacking);
-    try (OzoneClient client = newClient(cluster.getConf(), config)) {
-      String keyName = getKeyName();
-      OzoneOutputStream key =
-          createKey(client, keyName, 0, ReplicationFactor.ONE);
-      int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
-      byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
+    KeyOutputStream keyOutputStream;
+    RatisBlockOutputStream blockOutputStream;
+    try (OzoneOutputStream key = createKey(client, keyName, 0, 
ReplicationFactor.ONE)) {
       key.write(data1);
-      // since its hitting the full bufferCondition, it will call 
watchForCommit
-      // and completes at least putBlock for first flushSize worth of data
-      KeyOutputStream keyOutputStream =
-          assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+
+      keyOutputStream = assertInstanceOf(KeyOutputStream.class, 
key.getOutputStream());
 
       assertEquals(1, keyOutputStream.getStreamEntries().size());
-      RatisBlockOutputStream blockOutputStream =
-          assertInstanceOf(RatisBlockOutputStream.class,
-              keyOutputStream.getStreamEntries().get(0).getOutputStream());
+      blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+          keyOutputStream.getStreamEntries().get(0).getOutputStream());
 
       // we have just written data more than flush Size(2 chunks), at this time
-      // buffer pool will have 3 buffers allocated worth of chunk size
+      // buffer pool will have up to 4 buffers allocated worth of chunk size
 
-      assertEquals(4, blockOutputStream.getBufferPool().getSize());
+      assertThat(blockOutputStream.getBufferPool().getSize())
+          .isLessThanOrEqualTo(4);
       // writtenDataLength as well flushedDataLength will be updated here
       assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
-      assertEquals(MAX_FLUSH_SIZE,
-          blockOutputStream.getTotalDataFlushedLength());
+      assertEquals(MAX_FLUSH_SIZE, 
blockOutputStream.getTotalDataFlushedLength());
 
       // since data equals to maxBufferSize is written, this will be a blocking
       // call and hence will wait for atleast flushSize worth of data to get
@@ -613,10 +522,10 @@ void testDatanodeFailureWithSingleNode(boolean 
flushDelay, boolean enablePiggyba
       assertThat(blockOutputStream.getTotalAckDataLength())
           .isGreaterThanOrEqualTo(FLUSH_SIZE);
 
-      // watchForCommit will clean up atleast flushSize worth of data buffer
-      // where each entry corresponds to flushSize worth of data
+      // watchForCommit will clean up atleast one entry from the map where each
+      // entry corresponds to flushSize worth of data
       assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
-          .isLessThanOrEqualTo(2);
+          .isLessThanOrEqualTo(1);
 
       // This will flush the data and update the flush length and the map.
       key.flush();
@@ -624,34 +533,124 @@ void testDatanodeFailureWithSingleNode(boolean 
flushDelay, boolean enablePiggyba
       // Since the data in the buffer is already flushed, flush here will have
       // no impact on the counters and data structures
 
-      assertEquals(4, blockOutputStream.getBufferPool().getSize());
+      assertThat(blockOutputStream.getBufferPool().getSize())
+          .isLessThanOrEqualTo(4);
       assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
       assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
-      //  flush will make sure one more entry gets updated in the map
-      assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
+      // flush will make sure one more entry gets updated in the map
+      assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+          .isLessThanOrEqualTo(2);
 
       XceiverClientRatis raftClient =
           (XceiverClientRatis) blockOutputStream.getXceiverClient();
       assertEquals(1, raftClient.getCommitInfoMap().size());
-      Pipeline pipeline = raftClient.getPipeline();
-      cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
-
-      // again write data with more than max buffer limit. This will call
-      // watchForCommit again. No write will happen in the current block and
-      // data will be rewritten to the next block.
-
+      // Close the containers on the Datanode and write more data
+      TestHelper.waitForContainerClose(key, cluster);
+      // 4 writeChunks = maxFlushSize + 2 putBlocks  will be discarded here
+      // once exception is hit
       key.write(data1);
+
+      // As a part of handling the exception, 4 failed writeChunks  will be
+      // rewritten plus one partial chunk plus two putBlocks for flushSize
+      // and one flush for partial chunk
       key.flush();
 
-      assertInstanceOf(RaftRetryFailureException.class,
+      assertInstanceOf(ContainerNotOpenException.class,
           checkForException(blockOutputStream.getIoException()));
-      assertEquals(1, raftClient.getCommitInfoMap().size());
       // Make sure the retryCount is reset after the exception is handled
       assertEquals(0, keyOutputStream.getRetryCount());
+      // commitInfoMap will remain intact as there is no server failure
+      assertEquals(1, raftClient.getCommitInfoMap().size());
       assertEquals(2, keyOutputStream.getStreamEntries().size());
       // now close the stream, It will update ack length after watchForCommit
-      key.close();
+    }
+    // make sure the bufferPool is empty
+    assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
+    assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
+    assertEquals(0, blockOutputStream.getCommitIndex2flushedDataMap().size());
+    assertEquals(0, keyOutputStream.getLocationInfoList().size());
+    // Written the same data twice
+    byte[] bytes = ArrayUtils.addAll(data1, data1);
+    validateData(keyName, bytes, client.getObjectStore(), VOLUME, BUCKET);
+  }
+
+  @ParameterizedTest
+  @MethodSource("clientParameters")
+  void testDatanodeFailureWithSingleNode(boolean flushDelay, boolean 
enablePiggybacking) throws Exception {
+    OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, 
enablePiggybacking);
+    try (OzoneClient client = newClient(cluster.getConf(), config)) {
+      String keyName = getKeyName();
+      int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
+      byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
+      KeyOutputStream keyOutputStream;
+      RatisBlockOutputStream blockOutputStream;
+      Pipeline pipeline;
+      try (OzoneOutputStream key = createKey(client, keyName, 0, 
ReplicationFactor.ONE)) {
+        key.write(data1);
+        // since its hitting the full bufferCondition, it will call 
watchForCommit
+        // and completes at least putBlock for first flushSize worth of data
+        keyOutputStream = assertInstanceOf(KeyOutputStream.class, 
key.getOutputStream());
+
+        assertEquals(1, keyOutputStream.getStreamEntries().size());
+        blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+            keyOutputStream.getStreamEntries().get(0).getOutputStream());
+
+        // we have just written data more than flush Size(2 chunks), at this 
time
+        // buffer pool will have 3 buffers allocated worth of chunk size
+
+        assertEquals(4, blockOutputStream.getBufferPool().getSize());
+        // writtenDataLength as well flushedDataLength will be updated here
+        assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+        assertEquals(MAX_FLUSH_SIZE,
+            blockOutputStream.getTotalDataFlushedLength());
+
+        // since data equals to maxBufferSize is written, this will be a 
blocking
+        // call and hence will wait for atleast flushSize worth of data to get
+        // ack'd by all servers right here
+        assertThat(blockOutputStream.getTotalAckDataLength())
+            .isGreaterThanOrEqualTo(FLUSH_SIZE);
+
+        // watchForCommit will clean up atleast flushSize worth of data buffer
+        // where each entry corresponds to flushSize worth of data
+        assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+            .isLessThanOrEqualTo(2);
+
+        // This will flush the data and update the flush length and the map.
+        key.flush();
+
+        // Since the data in the buffer is already flushed, flush here will 
have
+        // no impact on the counters and data structures
+
+        assertEquals(4, blockOutputStream.getBufferPool().getSize());
+        assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+
+        assertEquals(dataLength, 
blockOutputStream.getTotalDataFlushedLength());
+        //  flush will make sure one more entry gets updated in the map
+        assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
+
+        XceiverClientRatis raftClient =
+            (XceiverClientRatis) blockOutputStream.getXceiverClient();
+        assertEquals(1, raftClient.getCommitInfoMap().size());
+        pipeline = raftClient.getPipeline();
+        cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+
+        // again write data with more than max buffer limit. This will call
+        // watchForCommit again. No write will happen in the current block and
+        // data will be rewritten to the next block.
+
+        key.write(data1);
+        key.flush();
+
+        assertInstanceOf(RaftRetryFailureException.class,
+            checkForException(blockOutputStream.getIoException()));
+        assertEquals(1, raftClient.getCommitInfoMap().size());
+        // Make sure the retryCount is reset after the exception is handled
+        assertEquals(0, keyOutputStream.getRetryCount());
+        assertEquals(2, keyOutputStream.getStreamEntries().size());
+        // now close the stream, It will update ack length after watchForCommit
+      }
       assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
       // make sure the bufferPool is empty
       assertEquals(0, blockOutputStream.getBufferPool().computeBufferData());
@@ -672,78 +671,77 @@ void testDatanodeFailureWithPreAllocation(boolean 
flushDelay, boolean enablePigg
     OzoneClientConfig config = newClientConfig(cluster.getConf(), flushDelay, 
enablePiggybacking);
     try (OzoneClient client = newClient(cluster.getConf(), config)) {
       String keyName = getKeyName();
-      OzoneOutputStream key =
-          createKey(client, keyName, 3 * BLOCK_SIZE,
-              ReplicationFactor.ONE);
       int dataLength = MAX_FLUSH_SIZE + CHUNK_SIZE;
       byte[] data1 = RandomUtils.secure().randomBytes(dataLength);
-      key.write(data1);
-      // since its hitting the full bufferCondition, it will call 
watchForCommit
-      // and completes at least putBlock for first flushSize worth of data
-      KeyOutputStream keyOutputStream =
-          assertInstanceOf(KeyOutputStream.class, key.getOutputStream());
+      KeyOutputStream keyOutputStream;
+      RatisBlockOutputStream blockOutputStream;
+      Pipeline pipeline;
+      try (OzoneOutputStream key = createKey(client, keyName, 3 * BLOCK_SIZE, 
ReplicationFactor.ONE)) {
+        key.write(data1);
+        // since its hitting the full bufferCondition, it will call 
watchForCommit
+        // and completes at least putBlock for first flushSize worth of data
+        keyOutputStream = assertInstanceOf(KeyOutputStream.class, 
key.getOutputStream());
+
+        assertEquals(3, keyOutputStream.getStreamEntries().size());
+        blockOutputStream = assertInstanceOf(RatisBlockOutputStream.class,
+            keyOutputStream.getStreamEntries().get(0).getOutputStream());
 
-      assertEquals(3, keyOutputStream.getStreamEntries().size());
-      RatisBlockOutputStream blockOutputStream =
-          assertInstanceOf(RatisBlockOutputStream.class,
-              keyOutputStream.getStreamEntries().get(0).getOutputStream());
+        // we have just written data more than flush Size(2 chunks), at this 
time
+        // buffer pool will have 3 buffers allocated worth of chunk size
 
-      // we have just written data more than flush Size(2 chunks), at this time
-      // buffer pool will have 3 buffers allocated worth of chunk size
+        assertEquals(4, blockOutputStream.getBufferPool().getSize());
+        // writtenDataLength as well flushedDataLength will be updated here
+        assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
-      assertEquals(4, blockOutputStream.getBufferPool().getSize());
-      // writtenDataLength as well flushedDataLength will be updated here
-      assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+        assertEquals(MAX_FLUSH_SIZE,
+            blockOutputStream.getTotalDataFlushedLength());
 
-      assertEquals(MAX_FLUSH_SIZE,
-          blockOutputStream.getTotalDataFlushedLength());
+        // since data equals to maxBufferSize is written, this will be a 
blocking
+        // call and hence will wait for atleast flushSize worth of data to get
+        // ack'd by all servers right here
+        assertThat(blockOutputStream.getTotalAckDataLength())
+            .isGreaterThanOrEqualTo(FLUSH_SIZE);
 
-      // since data equals to maxBufferSize is written, this will be a blocking
-      // call and hence will wait for atleast flushSize worth of data to get
-      // ack'd by all servers right here
-      assertThat(blockOutputStream.getTotalAckDataLength())
-          .isGreaterThanOrEqualTo(FLUSH_SIZE);
-
-      // watchForCommit will clean up atleast flushSize worth of data buffer
-      // where each entry corresponds to flushSize worth of data
-      assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
-          .isLessThanOrEqualTo(2);
+        // watchForCommit will clean up atleast flushSize worth of data buffer
+        // where each entry corresponds to flushSize worth of data
+        assertThat(blockOutputStream.getCommitIndex2flushedDataMap().size())
+            .isLessThanOrEqualTo(2);
 
-      // This will flush the data and update the flush length and
-      // the map.
-      key.flush();
+        // This will flush the data and update the flush length and
+        // the map.
+        key.flush();
 
-      // Since the data in the buffer is already flushed, flush here will have
-      // no impact on the counters and data structures
+        // Since the data in the buffer is already flushed, flush here will 
have
+        // no impact on the counters and data structures
 
-      assertEquals(4, blockOutputStream.getBufferPool().getSize());
-      assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
+        assertEquals(4, blockOutputStream.getBufferPool().getSize());
+        assertEquals(dataLength, blockOutputStream.getWrittenDataLength());
 
-      assertEquals(dataLength, blockOutputStream.getTotalDataFlushedLength());
-      //  flush will make sure one more entry gets updated in the map
-      assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
+        assertEquals(dataLength, 
blockOutputStream.getTotalDataFlushedLength());
+        //  flush will make sure one more entry gets updated in the map
+        assertEquals(0, 
blockOutputStream.getCommitIndex2flushedDataMap().size());
 
-      XceiverClientRatis raftClient =
-          (XceiverClientRatis) blockOutputStream.getXceiverClient();
-      assertEquals(1, raftClient.getCommitInfoMap().size());
-      Pipeline pipeline = raftClient.getPipeline();
-      cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+        XceiverClientRatis raftClient =
+            (XceiverClientRatis) blockOutputStream.getXceiverClient();
+        assertEquals(1, raftClient.getCommitInfoMap().size());
+        pipeline = raftClient.getPipeline();
+        cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
 
-      // again write data with more than max buffer limit. This will call
-      // watchForCommit again. No write will happen and
+        // again write data with more than max buffer limit. This will call
+        // watchForCommit again. No write will happen and
 
-      key.write(data1);
-      key.flush();
+        key.write(data1);
+        key.flush();
 
-      assertInstanceOf(RaftRetryFailureException.class,
-          checkForException(blockOutputStream.getIoException()));
+        assertInstanceOf(RaftRetryFailureException.class,
+            checkForException(blockOutputStream.getIoException()));
 
-      // Make sure the retryCount is reset after the exception is handled
-      assertEquals(0, keyOutputStream.getRetryCount());
-      assertEquals(1, raftClient.getCommitInfoMap().size());
+        // Make sure the retryCount is reset after the exception is handled
+        assertEquals(0, keyOutputStream.getRetryCount());
+        assertEquals(1, raftClient.getCommitInfoMap().size());
 
-      // now close the stream, It will update ack length after watchForCommit
-      key.close();
+        // now close the stream, It will update ack length after watchForCommit
+      }
 
       assertEquals(dataLength, blockOutputStream.getTotalAckDataLength());
       // make sure the bufferPool is empty


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to