This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new fc8f79524f HDDS-7931. EC: ManagedChannelImpl not cleaned up properly 
(#4269)
fc8f79524f is described below

commit fc8f79524f95dd17e35d92d5d026982511f983ef
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Thu Feb 16 13:19:32 2023 +0100

    HDDS-7931. EC: ManagedChannelImpl not cleaned up properly (#4269)
---
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |  24 +-
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |   4 +-
 .../hadoop/hdds/scm/storage/ChunkInputStream.java  |   9 +-
 .../hdds/scm/storage/DummyChunkInputStream.java    |   2 +-
 .../hdds/scm/storage/TestBlockInputStream.java     |   2 +-
 .../ECReconstructionCoordinator.java               |  52 +--
 .../ozone/client/io/ECBlockOutputStreamEntry.java  |  23 +-
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  |  45 +--
 .../hadoop/ozone/client/io/KeyOutputStream.java    |   2 +-
 .../hadoop/fs/ozone/TestOzoneFileSystem.java       |   8 +-
 .../hadoop/fs/ozone/TestRootedOzoneFileSystem.java |   6 +-
 .../hdds/scm/storage/TestContainerCommandsEC.java  | 395 +++++++++++----------
 .../ozone/client/rpc/TestBlockOutputStream.java    |   2 +
 .../rpc/TestBlockOutputStreamFlushDelay.java       |   2 +
 .../rpc/TestBlockOutputStreamWithFailures.java     |   2 +
 ...estBlockOutputStreamWithFailuresFlushDelay.java |   2 +
 .../client/rpc/TestOzoneRpcClientAbstract.java     |  44 ++-
 .../client/rpc/read/TestChunkInputStream.java      | 109 +++---
 .../ozone/client/rpc/read/TestInputStreamBase.java |   2 +
 .../apache/hadoop/ozone/container/TestHelper.java  |  20 +-
 20 files changed, 409 insertions(+), 346 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index a0d848e251..f01afd76dd 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -226,7 +226,6 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
           .build();
     }
     acquireClient();
-    boolean success = false;
     List<ChunkInfo> chunks;
     try {
       if (LOG.isDebugEnabled()) {
@@ -247,18 +246,17 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
           .getBlock(xceiverClient, blkIDBuilder.build(), token);
 
       chunks = response.getBlockData().getChunksList();
-      success = true;
     } finally {
-      if (!success) {
-        xceiverClientFactory.releaseClientForReadData(xceiverClient, false);
-      }
+      releaseClient();
     }
 
     return chunks;
   }
 
   protected void acquireClient() throws IOException {
-    xceiverClient = xceiverClientFactory.acquireClientForReadData(pipeline);
+    if (xceiverClientFactory != null && xceiverClient == null) {
+      xceiverClient = xceiverClientFactory.acquireClientForReadData(pipeline);
+    }
   }
 
   /**
@@ -320,7 +318,7 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
           if (isConnectivityIssue(ex)) {
             handleReadError(ex);
           } else {
-            current.releaseClient(false);
+            current.releaseClient();
           }
           continue;
         } else {
@@ -435,7 +433,7 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
 
   @Override
   public synchronized void close() {
-    releaseClient(true);
+    releaseClient();
     xceiverClientFactory = null;
 
     final List<ChunkInputStream> inputStreams = this.chunkStreams;
@@ -446,9 +444,9 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
     }
   }
 
-  private void releaseClient(boolean invalidateClient) {
+  private void releaseClient() {
     if (xceiverClientFactory != null && xceiverClient != null) {
-      xceiverClientFactory.releaseClient(xceiverClient, invalidateClient);
+      xceiverClientFactory.releaseClientForReadData(xceiverClient, false);
       xceiverClient = null;
     }
   }
@@ -487,7 +485,7 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
   @Override
   public synchronized void unbuffer() {
     storePosition();
-    releaseClient(true);
+    releaseClient();
 
     final List<ChunkInputStream> inputStreams = this.chunkStreams;
     if (inputStreams != null) {
@@ -514,11 +512,11 @@ public class BlockInputStream extends 
BlockExtendedInputStream {
   }
 
   private void handleReadError(IOException cause) throws IOException {
-    releaseClient(false);
+    releaseClient();
     final List<ChunkInputStream> inputStreams = this.chunkStreams;
     if (inputStreams != null) {
       for (ChunkInputStream is : inputStreams) {
-        is.releaseClient(false);
+        is.releaseClient();
       }
     }
 
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index f72d537ba9..66c73cac33 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -554,7 +554,7 @@ public class BlockOutputStream extends OutputStream {
       throw e;
     } finally {
       if (close) {
-        cleanup(true);
+        cleanup(false);
       }
     }
   }
@@ -599,7 +599,7 @@ public class BlockOutputStream extends OutputStream {
         // Preconditions.checkArgument(buffer.position() == 0);
         // bufferPool.checkBufferPoolEmpty();
       } else {
-        cleanup(true);
+        cleanup(false);
       }
     }
   }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
index 554d605b3f..e30df34b85 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -282,13 +282,12 @@ public class ChunkInputStream extends InputStream
   @Override
   public synchronized void close() {
     releaseBuffers();
-    releaseClient(true);
+    releaseClient();
   }
 
-  protected synchronized void releaseClient(boolean invalidateClient) {
+  protected synchronized void releaseClient() {
     if (xceiverClientFactory != null && xceiverClient != null) {
-      xceiverClientFactory.releaseClientForReadData(
-          xceiverClient, invalidateClient);
+      xceiverClientFactory.releaseClientForReadData(xceiverClient, false);
       xceiverClient = null;
     }
   }
@@ -739,7 +738,7 @@ public class ChunkInputStream extends InputStream
   public synchronized void unbuffer() {
     storePosition();
     releaseBuffers();
-    releaseClient(true);
+    releaseClient();
   }
 
   @VisibleForTesting
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
index cfbba0df24..78d0c05bfe 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyChunkInputStream.java
@@ -82,7 +82,7 @@ public class DummyChunkInputStream extends ChunkInputStream {
   }
 
   @Override
-  protected void releaseClient(boolean invalidateClient) {
+  protected void releaseClient() {
     // no-op
   }
 
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index ef270731d5..4943d6ce1a 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -423,7 +423,7 @@ public class TestBlockInputStream {
       Assert.assertEquals(len, bytesRead);
       verify(refreshPipeline).apply(blockID);
       verify(clientFactory).acquireClientForReadData(pipeline);
-      verify(clientFactory).releaseClient(client, true);
+      verify(clientFactory).releaseClientForReadData(client, false);
     } finally {
       subject.close();
     }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index 4820fbcecb..943e19a719 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -271,35 +271,35 @@ public class ECReconstructionCoordinator implements 
Closeable {
               (int) (configuration.getStreamBufferMaxSize() / configuration
                   .getStreamBufferSize()),
               ByteStringConversion.createByteBufferConversion(false));
-      for (int i = 0; i < toReconstructIndexes.size(); i++) {
-        int replicaIndex = toReconstructIndexes.get(i);
-        DatanodeDetails datanodeDetails =
-            targetMap.get(replicaIndex);
-        targetBlockStreams[i] = getECBlockOutputstream(blockLocationInfo,
-                datanodeDetails, repConfig, replicaIndex, bufferPool,
-                configuration);
-        bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize());
-        // Make sure it's clean. Don't want to reuse the erroneously returned
-        // buffers from the pool.
-        bufs[i].clear();
-      }
-
-      sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1))
-          .collect(Collectors.toSet()));
-      long length = safeBlockGroupLength;
-      while (length > 0) {
-        int readLen = sis.recoverChunks(bufs);
-        // TODO: can be submitted in parallel
-        for (int i = 0; i < bufs.length; i++) {
-          CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
-              future = targetBlockStreams[i].write(bufs[i]);
-          checkFailures(targetBlockStreams[i], future);
+      try {
+        for (int i = 0; i < toReconstructIndexes.size(); i++) {
+          int replicaIndex = toReconstructIndexes.get(i);
+          DatanodeDetails datanodeDetails =
+              targetMap.get(replicaIndex);
+          targetBlockStreams[i] = getECBlockOutputstream(blockLocationInfo,
+              datanodeDetails, repConfig, replicaIndex, bufferPool,
+              configuration);
+          bufs[i] = byteBufferPool.getBuffer(false, 
repConfig.getEcChunkSize());
+          // Make sure it's clean. Don't want to reuse the erroneously returned
+          // buffers from the pool.
           bufs[i].clear();
         }
-        length -= readLen;
-      }
 
-      try {
+        sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1))
+            .collect(Collectors.toSet()));
+        long length = safeBlockGroupLength;
+        while (length > 0) {
+          int readLen = sis.recoverChunks(bufs);
+          // TODO: can be submitted in parallel
+          for (int i = 0; i < bufs.length; i++) {
+            CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+                future = targetBlockStreams[i].write(bufs[i]);
+            checkFailures(targetBlockStreams[i], future);
+            bufs[i].clear();
+          }
+          length -= readLen;
+        }
+
         for (ECBlockOutputStream targetStream : targetBlockStreams) {
           targetStream.executePutBlock(true, true,
               blockLocationInfo.getLength(), blockDataGroup);
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
index 5b9170774e..8a8b32faaa 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntry.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.slf4j.Logger;
@@ -103,6 +104,14 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry {
     }
   }
 
+  @Override
+  void cleanup(boolean invalidateClient) {
+    if (isInitialized()) {
+      IOUtils.close(LOG, blockOutputStreams);
+      blockOutputStreams = null;
+    }
+  }
+
   @Override
   public OutputStream getOutputStream() {
     if (!isInitialized()) {
@@ -132,7 +141,7 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry {
   }
 
   public void markFailed(Exception e) {
-    if (blockOutputStreams[currentStreamIdx] != null) {
+    if (isInitialized() && blockOutputStreams[currentStreamIdx] != null) {
       blockOutputStreams[currentStreamIdx].setIoException(e);
     }
   }
@@ -379,13 +388,17 @@ public class ECBlockOutputStreamEntry extends 
BlockOutputStreamEntry {
   }
 
   private Stream<ECBlockOutputStream> blockStreams() {
-    return Arrays.stream(blockOutputStreams).filter(Objects::nonNull);
+    return isInitialized()
+        ? Arrays.stream(blockOutputStreams).filter(Objects::nonNull)
+        : Stream.empty();
   }
 
   private Stream<ECBlockOutputStream> dataStreams() {
-    return Arrays.stream(blockOutputStreams)
-        .limit(replicationConfig.getData())
-        .filter(Objects::nonNull);
+    return isInitialized()
+        ? Arrays.stream(blockOutputStreams)
+            .limit(replicationConfig.getData())
+            .filter(Objects::nonNull)
+        : Stream.empty();
   }
 
   public ByteString calculateChecksum() throws IOException {
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index a19a8f762c..11fe92e5ed 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -83,7 +83,8 @@ public final class ECKeyOutputStream extends KeyOutputStream {
   public static final Logger LOG =
       LoggerFactory.getLogger(KeyOutputStream.class);
 
-  private boolean closed;
+  private volatile boolean closed;
+  private volatile boolean closing;
   // how much of data is actually written yet to underlying stream
   private long offset;
   // how much data has been ingested into the stream
@@ -408,6 +409,9 @@ public final class ECKeyOutputStream extends 
KeyOutputStream {
   private void writeToOutputStream(ECBlockOutputStreamEntry current,
       byte[] b, int writeLen, int off, boolean isParity)
       throws IOException {
+    if (closing) {
+      throw new IOException("Stream is closing, avoid re-opening streams");
+    }
     try {
       if (!isParity) {
         // In case if exception while writing, this length will be updated back
@@ -438,8 +442,7 @@ public final class ECKeyOutputStream extends 
KeyOutputStream {
   }
 
   private void markStreamClosed() {
-    blockOutputStreamEntryPool.cleanup();
-    closed = true;
+    closing = true;
   }
 
   private void markStreamAsFailed(Exception e) {
@@ -487,23 +490,23 @@ public final class ECKeyOutputStream extends 
KeyOutputStream {
     }
     closed = true;
     try {
-      // If stripe buffer is not empty, encode and flush the stripe.
-      if (ecChunkBufferCache.getFirstDataCell().position() > 0) {
-        generateParityCells();
-        addStripeToQueue(ecChunkBufferCache);
-      }
-      // Send EOF mark to flush thread.
-      addStripeToQueue(new EOFDummyStripe());
+      if (!closing) {
+        // If stripe buffer is not empty, encode and flush the stripe.
+        if (ecChunkBufferCache.getFirstDataCell().position() > 0) {
+          generateParityCells();
+          addStripeToQueue(ecChunkBufferCache);
+        }
+        // Send EOF mark to flush thread.
+        addStripeToQueue(new EOFDummyStripe());
 
-      // Wait for all the stripes to be written.
-      flushFuture.get();
-      flushExecutor.shutdownNow();
+        // Wait for all the stripes to be written.
+        flushFuture.get();
 
-      closeCurrentStreamEntry();
-      Preconditions.checkArgument(writeOffset == offset,
-          "Expected writeOffset= " + writeOffset
-              + " Expected offset=" + offset);
-      blockOutputStreamEntryPool.commitKey(offset);
+        Preconditions.checkArgument(writeOffset == offset,
+            "Expected writeOffset= " + writeOffset
+                + " Expected offset=" + offset);
+        blockOutputStreamEntryPool.commitKey(offset);
+      }
     } catch (ExecutionException e) {
       Throwable cause = e.getCause();
       if (cause instanceof IOException) {
@@ -516,6 +519,8 @@ public final class ECKeyOutputStream extends 
KeyOutputStream {
     } catch (InterruptedException e) {
       throw new IOException("Flushing thread was interrupted", e);
     } finally {
+      flushExecutor.shutdownNow();
+      closeCurrentStreamEntry();
       blockOutputStreamEntryPool.cleanup();
     }
   }
@@ -548,7 +553,7 @@ public final class ECKeyOutputStream extends 
KeyOutputStream {
   private boolean flushStripeFromQueue() throws IOException {
     try {
       ECChunkBuffers stripe = ecStripeQueue.take();
-      while (!(stripe instanceof EOFDummyStripe)) {
+      while (!closing && !(stripe instanceof EOFDummyStripe)) {
         if (stripe instanceof CheckpointDummyStripe) {
           flushCheckpoint.set(((CheckpointDummyStripe) stripe).version);
         } else {
@@ -639,7 +644,7 @@ public final class ECKeyOutputStream extends 
KeyOutputStream {
    * @throws IOException if the connection is closed.
    */
   private void checkNotClosed() throws IOException {
-    if (closed) {
+    if (closing || closed) {
       throw new IOException(
           ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Key: "
               + blockOutputStreamEntryPool.getKeyName());
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 0a64b9392d..00970788ea 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -335,7 +335,7 @@ public class KeyOutputStream extends OutputStream 
implements Syncable {
       excludeList.addPipeline(pipelineId);
     }
     // just clean up the current stream.
-    streamEntry.cleanup(!retryFailure);
+    streamEntry.cleanup(retryFailure);
 
     // discard all subsequent blocks the containers and pipelines which
     // are in the exclude list so that, the very next retry should never
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
index 10f326362c..fb7c04df59 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.TestDataUtil;
 import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OzonePrefixPathImpl;
@@ -150,6 +151,7 @@ public class TestOzoneFileSystem {
   private static boolean omRatisEnabled;
 
   private static MiniOzoneCluster cluster;
+  private static OzoneClient client;
   private static OzoneManagerProtocol writeClient;
   private static FileSystem fs;
   private static OzoneFileSystem o3fs;
@@ -177,7 +179,8 @@ public class TestOzoneFileSystem {
             .build();
     cluster.waitForClusterToBeReady();
 
-    writeClient = cluster.getRpcClient().getObjectStore()
+    client = cluster.getRpcClient();
+    writeClient = client.getObjectStore()
         .getClientProxy().getOzoneManagerClient();
     // create a volume and a bucket to be used by OzoneFileSystem
     ozoneBucket = TestDataUtil.createVolumeAndBucket(cluster, bucketLayout);
@@ -199,6 +202,7 @@ public class TestOzoneFileSystem {
 
   @AfterClass
   public static void teardown() {
+    IOUtils.closeQuietly(client);
     if (cluster != null) {
       cluster.shutdown();
     }
@@ -1443,7 +1447,7 @@ public class TestOzoneFileSystem {
     if (isDirectory) {
       key = key + "/";
     }
-    return cluster.getClient().getObjectStore().getVolume(volumeName)
+    return client.getObjectStore().getVolume(volumeName)
         .getBucket(bucketName).getKey(key);
   }
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
index 4aefd48fb2..482eb1d47c 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestRootedOzoneFileSystem.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.ozone.TestDataUtil;
 import org.apache.hadoop.ozone.client.BucketArgs;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.VolumeArgs;
@@ -135,6 +136,7 @@ public class TestRootedOzoneFileSystem {
       LoggerFactory.getLogger(TestRootedOzoneFileSystem.class);
 
   private static final float TRASH_INTERVAL = 0.05f; // 3 seconds
+  private static OzoneClient client;
 
   @Parameterized.Parameters
   public static Collection<Object[]> data() {
@@ -166,6 +168,7 @@ public class TestRootedOzoneFileSystem {
 
   @Parameterized.AfterParam
   public static void teardownParam() {
+    IOUtils.closeQuietly(client);
     // Tear down the cluster after EACH set of parameters
     if (cluster != null) {
       cluster.shutdown();
@@ -257,7 +260,8 @@ public class TestRootedOzoneFileSystem {
         .setNumDatanodes(5)
         .build();
     cluster.waitForClusterToBeReady();
-    objectStore = cluster.getClient().getObjectStore();
+    client = cluster.getClient();
+    objectStore = client.getObjectStore();
 
     rootPath = String.format("%s://%s/",
         OzoneConsts.OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY));
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
index 68dfd6fbb7..96c601cbd0 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
@@ -106,6 +106,7 @@ import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_TOKEN_ENABLED
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.READ;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.WRITE;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static 
org.apache.hadoop.ozone.container.ContainerTestHelper.newWriteChunkRequestBuilder;
 
 /**
  * This class tests container commands on EC containers.
@@ -252,88 +253,89 @@ public class TestContainerCommandsEC {
 
   @Test
   public void testCreateRecoveryContainer() throws Exception {
-    XceiverClientManager xceiverClientManager =
-        new XceiverClientManager(config);
-    ECReplicationConfig replicationConfig = new ECReplicationConfig(3, 2);
-    Pipeline newPipeline =
-        scm.getPipelineManager().createPipeline(replicationConfig);
-    scm.getPipelineManager().activatePipeline(newPipeline.getId());
-    final ContainerInfo container =
-        scm.getContainerManager().allocateContainer(replicationConfig, "test");
-    Token<ContainerTokenIdentifier> cToken = containerTokenGenerator
-        .generateToken(ANY_USER, container.containerID());
-    scm.getContainerManager().getContainerStateManager()
-        .addContainer(container.getProtobuf());
-
-    XceiverClientSpi dnClient = xceiverClientManager.acquireClient(
-        createSingleNodePipeline(newPipeline, newPipeline.getNodes().get(0),
-            2));
-    try {
-      // To create the actual situation, container would have been in closed
-      // state at SCM.
+    try (XceiverClientManager xceiverClientManager =
+        new XceiverClientManager(config)) {
+      ECReplicationConfig replicationConfig = new ECReplicationConfig(3, 2);
+      Pipeline newPipeline =
+          scm.getPipelineManager().createPipeline(replicationConfig);
+      scm.getPipelineManager().activatePipeline(newPipeline.getId());
+      final ContainerInfo container = scm.getContainerManager()
+          .allocateContainer(replicationConfig, "test");
+      Token<ContainerTokenIdentifier> cToken = containerTokenGenerator
+          .generateToken(ANY_USER, container.containerID());
       scm.getContainerManager().getContainerStateManager()
-          .updateContainerState(container.containerID().getProtobuf(),
-              HddsProtos.LifeCycleEvent.FINALIZE);
-      scm.getContainerManager().getContainerStateManager()
-          .updateContainerState(container.containerID().getProtobuf(),
-              HddsProtos.LifeCycleEvent.CLOSE);
-
-      //Create the recovering container in DN.
-      String encodedToken = cToken.encodeToUrlString();
-      ContainerProtocolCalls.createRecoveringContainer(dnClient,
-          container.containerID().getProtobuf().getId(),
-          encodedToken, 4);
-
-      BlockID blockID = ContainerTestHelper
-          .getTestBlockID(container.containerID().getProtobuf().getId());
-      Token<? extends TokenIdentifier> blockToken =
-          blockTokenGenerator.generateToken(ANY_USER, blockID,
-              EnumSet.of(READ, WRITE), Long.MAX_VALUE);
-      byte[] data = "TestData".getBytes(UTF_8);
-      ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
-          ContainerTestHelper.newWriteChunkRequestBuilder(newPipeline, blockID,
-              ChunkBuffer.wrap(ByteBuffer.wrap(data)), 0)
-              .setEncodedToken(blockToken.encodeToUrlString())
-              .build();
-      dnClient.sendCommand(writeChunkRequest);
-
-      // Now, explicitly make a putKey request for the block.
-      ContainerProtos.ContainerCommandRequestProto putKeyRequest =
-          ContainerTestHelper.getPutBlockRequest(newPipeline,
-              writeChunkRequest.getWriteChunk());
-      dnClient.sendCommand(putKeyRequest);
-
-      ContainerProtos.ReadContainerResponseProto readContainerResponseProto =
-          ContainerProtocolCalls.readContainer(dnClient,
-              container.containerID().getProtobuf().getId(), encodedToken);
-      Assert.assertEquals(ContainerProtos.ContainerDataProto.State.RECOVERING,
-          readContainerResponseProto.getContainerData().getState());
-      // Container at SCM should be still in closed state.
-      Assert.assertEquals(HddsProtos.LifeCycleState.CLOSED,
-          scm.getContainerManager().getContainerStateManager()
-              .getContainer(container.containerID()).getState());
-      // close container call
-      ContainerProtocolCalls.closeContainer(dnClient,
-          container.containerID().getProtobuf().getId(), encodedToken);
-      // Make sure we have the container and readable.
-      readContainerResponseProto = ContainerProtocolCalls
-          .readContainer(dnClient,
-              container.containerID().getProtobuf().getId(), encodedToken);
-      Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
-          readContainerResponseProto.getContainerData().getState());
-      ContainerProtos.ReadChunkResponseProto readChunkResponseProto =
-          ContainerProtocolCalls.readChunk(dnClient,
-              writeChunkRequest.getWriteChunk().getChunkData(), blockID, null,
-              blockToken);
-      ByteBuffer[] readOnlyByteBuffersArray = BufferUtils
-          .getReadOnlyByteBuffersArray(
-              readChunkResponseProto.getDataBuffers().getBuffersList());
-      Assert.assertEquals(readOnlyByteBuffersArray[0].limit(), data.length);
-      byte[] readBuff = new byte[readOnlyByteBuffersArray[0].limit()];
-      readOnlyByteBuffersArray[0].get(readBuff, 0, readBuff.length);
-      Assert.assertArrayEquals(data, readBuff);
-    } finally {
-      xceiverClientManager.releaseClient(dnClient, false);
+          .addContainer(container.getProtobuf());
+
+      XceiverClientSpi dnClient = xceiverClientManager.acquireClient(
+          createSingleNodePipeline(newPipeline, newPipeline.getNodes().get(0),
+              2));
+      try {
+        // To create the actual situation, container would have been in closed
+        // state at SCM.
+        scm.getContainerManager().getContainerStateManager()
+            .updateContainerState(container.containerID().getProtobuf(),
+                HddsProtos.LifeCycleEvent.FINALIZE);
+        scm.getContainerManager().getContainerStateManager()
+            .updateContainerState(container.containerID().getProtobuf(),
+                HddsProtos.LifeCycleEvent.CLOSE);
+
+        //Create the recovering container in DN.
+        String encodedToken = cToken.encodeToUrlString();
+        ContainerProtocolCalls.createRecoveringContainer(dnClient,
+            container.containerID().getProtobuf().getId(),
+            encodedToken, 4);
+
+        BlockID blockID = ContainerTestHelper
+            .getTestBlockID(container.containerID().getProtobuf().getId());
+        Token<? extends TokenIdentifier> blockToken =
+            blockTokenGenerator.generateToken(ANY_USER, blockID,
+                EnumSet.of(READ, WRITE), Long.MAX_VALUE);
+        byte[] data = "TestData".getBytes(UTF_8);
+        ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+            newWriteChunkRequestBuilder(newPipeline, blockID,
+                    ChunkBuffer.wrap(ByteBuffer.wrap(data)), 0)
+                .setEncodedToken(blockToken.encodeToUrlString())
+                .build();
+        dnClient.sendCommand(writeChunkRequest);
+
+        // Now, explicitly make a putKey request for the block.
+        ContainerProtos.ContainerCommandRequestProto putKeyRequest =
+            ContainerTestHelper.getPutBlockRequest(newPipeline,
+                writeChunkRequest.getWriteChunk());
+        dnClient.sendCommand(putKeyRequest);
+
+        ContainerProtos.ReadContainerResponseProto readContainerResponseProto =
+            ContainerProtocolCalls.readContainer(dnClient,
+                container.containerID().getProtobuf().getId(), encodedToken);
+        
Assert.assertEquals(ContainerProtos.ContainerDataProto.State.RECOVERING,
+            readContainerResponseProto.getContainerData().getState());
+        // Container at SCM should be still in closed state.
+        Assert.assertEquals(HddsProtos.LifeCycleState.CLOSED,
+            scm.getContainerManager().getContainerStateManager()
+                .getContainer(container.containerID()).getState());
+        // close container call
+        ContainerProtocolCalls.closeContainer(dnClient,
+            container.containerID().getProtobuf().getId(), encodedToken);
+        // Make sure we have the container and readable.
+        readContainerResponseProto = ContainerProtocolCalls
+            .readContainer(dnClient,
+                container.containerID().getProtobuf().getId(), encodedToken);
+        Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
+            readContainerResponseProto.getContainerData().getState());
+        ContainerProtos.ReadChunkResponseProto readChunkResponseProto =
+            ContainerProtocolCalls.readChunk(dnClient,
+                writeChunkRequest.getWriteChunk().getChunkData(), blockID, 
null,
+                blockToken);
+        ByteBuffer[] readOnlyByteBuffersArray = BufferUtils
+            .getReadOnlyByteBuffersArray(
+                readChunkResponseProto.getDataBuffers().getBuffersList());
+        Assert.assertEquals(readOnlyByteBuffersArray[0].limit(), data.length);
+        byte[] readBuff = new byte[readOnlyByteBuffersArray[0].limit()];
+        readOnlyByteBuffersArray[0].get(readBuff, 0, readBuff.length);
+        Assert.assertArrayEquals(data, readBuff);
+      } finally {
+        xceiverClientManager.releaseClient(dnClient, false);
+      }
     }
   }
 
@@ -394,116 +396,136 @@ public class TestContainerCommandsEC {
     objectStore.getVolume(volumeName).createBucket(bucketName);
     OzoneVolume volume = objectStore.getVolume(volumeName);
     OzoneBucket bucket = volume.getBucket(bucketName);
-    XceiverClientManager xceiverClientManager =
-        new XceiverClientManager(config);
     createKeyAndWriteData(keyString, bucket, numInputChunks);
-    ECReconstructionCoordinator coordinator =
-        new ECReconstructionCoordinator(config, certClient,
-            null, ECReconstructionMetrics.create());
-
-    ECReconstructionMetrics metrics = coordinator.getECReconstructionMetrics();
-    OzoneKeyDetails key = bucket.getKey(keyString);
-    long conID = key.getOzoneKeyLocations().get(0).getContainerID();
-    Token<ContainerTokenIdentifier> cToken = containerTokenGenerator
-        .generateToken(ANY_USER, new ContainerID(conID));
-
-    //Close the container first.
-    closeContainer(conID);
-
-    Pipeline containerPipeline = scm.getPipelineManager().getPipeline(
-        scm.getContainerManager().getContainer(ContainerID.valueOf(conID))
-            .getPipelineID());
-
-    SortedMap<Integer, DatanodeDetails> sourceNodeMap = new TreeMap<>();
 
-    List<DatanodeDetails> nodeSet = containerPipeline.getNodes();
-    List<Pipeline> containerToDeletePipeline = new ArrayList<>();
-    for (DatanodeDetails srcDn : nodeSet) {
-      int replIndex = containerPipeline.getReplicaIndex(srcDn);
-      if (missingIndexes.contains(replIndex)) {
-        containerToDeletePipeline
-            .add(createSingleNodePipeline(containerPipeline, srcDn, 
replIndex));
-        continue;
+    try (
+        XceiverClientManager xceiverClientManager =
+            new XceiverClientManager(config);
+        ECReconstructionCoordinator coordinator =
+            new ECReconstructionCoordinator(config, certClient,
+                 null, ECReconstructionMetrics.create())) {
+
+      ECReconstructionMetrics metrics =
+          coordinator.getECReconstructionMetrics();
+      OzoneKeyDetails key = bucket.getKey(keyString);
+      long conID = key.getOzoneKeyLocations().get(0).getContainerID();
+      Token<ContainerTokenIdentifier> cToken = containerTokenGenerator
+          .generateToken(ANY_USER, new ContainerID(conID));
+
+      //Close the container first.
+      closeContainer(conID);
+
+      Pipeline containerPipeline = scm.getPipelineManager().getPipeline(
+          scm.getContainerManager().getContainer(ContainerID.valueOf(conID))
+              .getPipelineID());
+
+      SortedMap<Integer, DatanodeDetails> sourceNodeMap = new TreeMap<>();
+
+      List<DatanodeDetails> nodeSet = containerPipeline.getNodes();
+      List<Pipeline> containerToDeletePipeline = new ArrayList<>();
+      for (DatanodeDetails srcDn : nodeSet) {
+        int replIndex = containerPipeline.getReplicaIndex(srcDn);
+        if (missingIndexes.contains(replIndex)) {
+          containerToDeletePipeline.add(
+              createSingleNodePipeline(containerPipeline, srcDn, replIndex));
+          continue;
+        }
+        sourceNodeMap.put(replIndex, srcDn);
       }
-      sourceNodeMap.put(replIndex, srcDn);
-    }
 
-    //Find nodes outside of pipeline
-    List<DatanodeDetails> clusterDnsList =
-        cluster.getHddsDatanodes().stream().map(k -> k.getDatanodeDetails())
-            .collect(Collectors.toList());
-    List<DatanodeDetails> targetNodes = new ArrayList<>();
-    for (DatanodeDetails clusterDN : clusterDnsList) {
-      if (!nodeSet.contains(clusterDN)) {
-        targetNodes.add(clusterDN);
-        if (targetNodes.size() == missingIndexes.size()) {
-          break;
+      //Find nodes outside of pipeline
+      List<DatanodeDetails> clusterDnsList =
+          cluster.getHddsDatanodes().stream().map(k -> k.getDatanodeDetails())
+              .collect(Collectors.toList());
+      List<DatanodeDetails> targetNodes = new ArrayList<>();
+      for (DatanodeDetails clusterDN : clusterDnsList) {
+        if (!nodeSet.contains(clusterDN)) {
+          targetNodes.add(clusterDN);
+          if (targetNodes.size() == missingIndexes.size()) {
+            break;
+          }
         }
       }
-    }
 
-    Assert.assertEquals(missingIndexes.size(), targetNodes.size());
-
-    List<org.apache.hadoop.ozone.container.common.helpers.BlockData[]>
-        blockDataArrList = new ArrayList<>();
-    for (int j = 0; j < containerToDeletePipeline.size(); j++) {
-      org.apache.hadoop.ozone.container.common.helpers.BlockData[] blockData =
-          new ECContainerOperationClient(new OzoneConfiguration(), certClient)
-              .listBlock(conID, 
containerToDeletePipeline.get(j).getFirstNode(),
-                  (ECReplicationConfig) containerToDeletePipeline.get(j)
-                      .getReplicationConfig(), cToken);
-      blockDataArrList.add(blockData);
-      // Delete the first index container
-      ContainerProtocolCalls.deleteContainer(
-          xceiverClientManager.acquireClient(containerToDeletePipeline.get(j)),
-          conID, true, cToken.encodeToUrlString());
-    }
+      Assert.assertEquals(missingIndexes.size(), targetNodes.size());
+
+      List<org.apache.hadoop.ozone.container.common.helpers.BlockData[]>
+          blockDataArrList = new ArrayList<>();
+      try (ECContainerOperationClient ecContainerOperationClient =
+               new ECContainerOperationClient(config, certClient)) {
+        for (int j = 0; j < containerToDeletePipeline.size(); j++) {
+          Pipeline p = containerToDeletePipeline.get(j);
+          org.apache.hadoop.ozone.container.common.helpers.BlockData[]
+              blockData = ecContainerOperationClient.listBlock(
+                  conID, p.getFirstNode(),
+                  (ECReplicationConfig) p.getReplicationConfig(),
+                  cToken);
+          blockDataArrList.add(blockData);
+          // Delete the first index container
+          XceiverClientSpi client = xceiverClientManager.acquireClient(
+              p);
+          try {
+            ContainerProtocolCalls.deleteContainer(
+                client,
+                conID, true, cToken.encodeToUrlString());
+          } finally {
+            xceiverClientManager.releaseClient(client, false);
+          }
+        }
 
-    //Give the new target to reconstruct the container
-    SortedMap<Integer, DatanodeDetails> targetNodeMap = new TreeMap<>();
-    for (int k = 0; k < missingIndexes.size(); k++) {
-      targetNodeMap.put(missingIndexes.get(k), targetNodes.get(k));
-    }
+        //Give the new target to reconstruct the container
+        SortedMap<Integer, DatanodeDetails> targetNodeMap = new TreeMap<>();
+        for (int k = 0; k < missingIndexes.size(); k++) {
+          targetNodeMap.put(missingIndexes.get(k), targetNodes.get(k));
+        }
 
-    coordinator.reconstructECContainerGroup(conID,
-        (ECReplicationConfig) containerPipeline.getReplicationConfig(),
-        sourceNodeMap, targetNodeMap);
-
-    // Assert the original container metadata with the new recovered container.
-    Iterator<Map.Entry<Integer, DatanodeDetails>> iterator =
-        targetNodeMap.entrySet().iterator();
-    int i = 0;
-    while (iterator.hasNext()) {
-      Map.Entry<Integer, DatanodeDetails> next = iterator.next();
-      DatanodeDetails targetDN = next.getValue();
-      Map<DatanodeDetails, Integer> indexes = new HashMap<>();
-      indexes.put(targetNodeMap.entrySet().iterator().next().getValue(),
-          targetNodeMap.entrySet().iterator().next().getKey());
-      Pipeline newTargetPipeline =
-          Pipeline.newBuilder().setId(PipelineID.randomId())
+        coordinator.reconstructECContainerGroup(conID,
+            (ECReplicationConfig) containerPipeline.getReplicationConfig(),
+            sourceNodeMap, targetNodeMap);
+
+        // Assert the original container metadata with the new recovered one
+        Iterator<Map.Entry<Integer, DatanodeDetails>> iterator =
+            targetNodeMap.entrySet().iterator();
+        int i = 0;
+        while (iterator.hasNext()) {
+          Map.Entry<Integer, DatanodeDetails> next = iterator.next();
+          DatanodeDetails targetDN = next.getValue();
+          Map<DatanodeDetails, Integer> indexes = new HashMap<>();
+          indexes.put(targetNodeMap.entrySet().iterator().next().getValue(),
+              targetNodeMap.entrySet().iterator().next().getKey());
+          Pipeline newTargetPipeline = Pipeline.newBuilder()
+              .setId(PipelineID.randomId())
               .setReplicationConfig(containerPipeline.getReplicationConfig())
               .setReplicaIndexes(indexes)
               .setState(Pipeline.PipelineState.CLOSED)
               .setNodes(ImmutableList.of(targetDN)).build();
 
-      org.apache.hadoop.ozone.container.common.helpers.BlockData[]
-          reconstructedBlockData =
-          new ECContainerOperationClient(new OzoneConfiguration(), certClient)
-              .listBlock(conID, newTargetPipeline.getFirstNode(),
-                  (ECReplicationConfig) newTargetPipeline
-                      .getReplicationConfig(), cToken);
-      Assert.assertEquals(blockDataArrList.get(i).length,
-          reconstructedBlockData.length);
-      checkBlockData(blockDataArrList.get(i), reconstructedBlockData);
-      ContainerProtos.ReadContainerResponseProto readContainerResponseProto =
-          ContainerProtocolCalls.readContainer(
-              xceiverClientManager.acquireClient(newTargetPipeline), conID,
-              cToken.encodeToUrlString());
-      Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
-          readContainerResponseProto.getContainerData().getState());
-      i++;
+          org.apache.hadoop.ozone.container.common.helpers.BlockData[]
+              reconstructedBlockData =
+              ecContainerOperationClient
+                  .listBlock(conID, newTargetPipeline.getFirstNode(),
+                      (ECReplicationConfig) newTargetPipeline
+                          .getReplicationConfig(), cToken);
+          Assert.assertEquals(blockDataArrList.get(i).length,
+              reconstructedBlockData.length);
+          checkBlockData(blockDataArrList.get(i), reconstructedBlockData);
+          XceiverClientSpi client = xceiverClientManager.acquireClient(
+              newTargetPipeline);
+          try {
+            ContainerProtos.ReadContainerResponseProto readContainerResponse =
+                ContainerProtocolCalls.readContainer(
+                    client, conID,
+                    cToken.encodeToUrlString());
+            
Assert.assertEquals(ContainerProtos.ContainerDataProto.State.CLOSED,
+                readContainerResponse.getContainerData().getState());
+          } finally {
+            xceiverClientManager.releaseClient(client, false);
+          }
+          i++;
+        }
+        Assertions.assertEquals(metrics.getReconstructionTotal(), 1L);
+      }
     }
-    Assertions.assertEquals(metrics.getReconstructionTotal(), 1L);
   }
 
   private void createKeyAndWriteData(String keyString, OzoneBucket bucket,
@@ -574,21 +596,22 @@ public class TestContainerCommandsEC {
     targetNodeMap.put(3, invalidTargetNode);
 
     Assert.assertThrows(IOException.class, () -> {
-      ECReconstructionCoordinator coordinator =
+      try (ECReconstructionCoordinator coordinator =
           new ECReconstructionCoordinator(config, certClient,
-              null, ECReconstructionMetrics.create());
-      coordinator.reconstructECContainerGroup(conID,
-          (ECReplicationConfig) containerPipeline.getReplicationConfig(),
-          sourceNodeMap, targetNodeMap);
+              null, ECReconstructionMetrics.create())) {
+        coordinator.reconstructECContainerGroup(conID,
+            (ECReplicationConfig) containerPipeline.getReplicationConfig(),
+            sourceNodeMap, targetNodeMap);
+      }
     });
     final DatanodeDetails targetDNToCheckContainerCLeaned = goodTargetNode;
     StorageContainerException ex =
         Assert.assertThrows(StorageContainerException.class, () -> {
-          ECContainerOperationClient client =
-              new ECContainerOperationClient(new OzoneConfiguration(),
-                  certClient);
-          client.listBlock(conID, targetDNToCheckContainerCLeaned,
-              new ECReplicationConfig(3, 2), cToken);
+          try (ECContainerOperationClient client =
+              new ECContainerOperationClient(config, certClient)) {
+            client.listBlock(conID, targetDNToCheckContainerCLeaned,
+                new ECReplicationConfig(3, 2), cToken);
+          }
         });
     Assert.assertEquals("ContainerID 1 does not exist", ex.getMessage());
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index f99ea72ff7..35cab2689f 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.RatisBlockOutputStream;
+import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.ObjectStore;
@@ -123,6 +124,7 @@ public class TestBlockOutputStream {
    */
   @AfterAll
   public static void shutdown() {
+    IOUtils.close(null, client);
     if (cluster != null) {
       cluster.shutdown();
     }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
index 8ca49631e9..2a7b4bcf43 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.RatisBlockOutputStream;
+import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.ObjectStore;
@@ -115,6 +116,7 @@ public class TestBlockOutputStreamFlushDelay {
    */
   @AfterAll
   public static void shutdown() {
+    IOUtils.close(null, client);
     if (cluster != null) {
       cluster.shutdown();
     }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index af128dc527..688fe57e86 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -38,6 +38,7 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenExcep
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.RatisBlockOutputStream;
+import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.ObjectStore;
@@ -150,6 +151,7 @@ public class TestBlockOutputStreamWithFailures {
    */
   @AfterEach
   public void shutdown() {
+    IOUtils.close(null, client);
     if (cluster != null) {
       cluster.shutdown();
     }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
index 2aa35e01e4..b7c2f4433f 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
@@ -38,6 +38,7 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenExcep
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
 import org.apache.hadoop.hdds.scm.storage.RatisBlockOutputStream;
+import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.ObjectStore;
@@ -150,6 +151,7 @@ public class TestBlockOutputStreamWithFailuresFlushDelay {
    */
   @AfterEach
   public void shutdown() {
+    IOUtils.close(null, client);
     if (cluster != null) {
       cluster.shutdown();
     }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index e9f3a9130f..69f3c92311 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -1207,25 +1207,29 @@ public abstract class TestOzoneRpcClientAbstract {
     Assert.assertEquals(0L, getBucketUsedNamespace(volumeName, bucketName));
 
     RpcClient client = new RpcClient(cluster.getConf(), null);
-    String directoryName1 = UUID.randomUUID().toString();
-    String directoryName2 = UUID.randomUUID().toString();
-
-    client.createDirectory(volumeName, bucketName, directoryName1);
-    Assert.assertEquals(1L, getBucketUsedNamespace(volumeName, bucketName));
-    // Test create a directory twice will not increase usedNamespace twice
-    client.createDirectory(volumeName, bucketName, directoryName2);
-    Assert.assertEquals(2L, getBucketUsedNamespace(volumeName, bucketName));
-    client.deleteKey(volumeName, bucketName,
-        OzoneFSUtils.addTrailingSlashIfNeeded(directoryName1), false);
-    Assert.assertEquals(1L, getBucketUsedNamespace(volumeName, bucketName));
-    client.deleteKey(volumeName, bucketName,
-        OzoneFSUtils.addTrailingSlashIfNeeded(directoryName2), false);
-    Assert.assertEquals(0L, getBucketUsedNamespace(volumeName, bucketName));
-
-    String multiComponentsDir = "dir1/dir2/dir3/dir4";
-    client.createDirectory(volumeName, bucketName, multiComponentsDir);
-    Assert.assertEquals(OzoneFSUtils.getFileCount(multiComponentsDir),
-        getBucketUsedNamespace(volumeName, bucketName));
+    try {
+      String directoryName1 = UUID.randomUUID().toString();
+      String directoryName2 = UUID.randomUUID().toString();
+
+      client.createDirectory(volumeName, bucketName, directoryName1);
+      Assert.assertEquals(1L, getBucketUsedNamespace(volumeName, bucketName));
+      // Test create a directory twice will not increase usedNamespace twice
+      client.createDirectory(volumeName, bucketName, directoryName2);
+      Assert.assertEquals(2L, getBucketUsedNamespace(volumeName, bucketName));
+      client.deleteKey(volumeName, bucketName,
+          OzoneFSUtils.addTrailingSlashIfNeeded(directoryName1), false);
+      Assert.assertEquals(1L, getBucketUsedNamespace(volumeName, bucketName));
+      client.deleteKey(volumeName, bucketName,
+          OzoneFSUtils.addTrailingSlashIfNeeded(directoryName2), false);
+      Assert.assertEquals(0L, getBucketUsedNamespace(volumeName, bucketName));
+
+      String multiComponentsDir = "dir1/dir2/dir3/dir4";
+      client.createDirectory(volumeName, bucketName, multiComponentsDir);
+      Assert.assertEquals(OzoneFSUtils.getFileCount(multiComponentsDir),
+          getBucketUsedNamespace(volumeName, bucketName));
+    } finally {
+      client.close();
+    }
   }
 
   @ParameterizedTest
@@ -1674,6 +1678,8 @@ public abstract class TestOzoneRpcClientAbstract {
       RpcClient client = new RpcClient(configuration, null);
       try (InputStream is = client.getKey(volumeName, bucketName, keyName)) {
         is.read(new byte[100]);
+      } finally {
+        client.close();
       }
       if (verifyChecksum) {
         fail("Reading corrupted data should fail, as verify checksum is " +
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
index 13c8a5911b..cc675ecce0 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestChunkInputStream.java
@@ -57,60 +57,61 @@ public class TestChunkInputStream extends 
TestInputStreamBase {
     int dataLength = (2 * BLOCK_SIZE) + (CHUNK_SIZE);
     byte[] inputData = writeRandomBytes(keyName, dataLength);
 
-    KeyInputStream keyInputStream = getKeyInputStream(keyName);
-
-    BlockInputStream block0Stream =
-        (BlockInputStream)keyInputStream.getPartStreams().get(0);
-    block0Stream.initialize();
-
-    ChunkInputStream chunk0Stream = block0Stream.getChunkStreams().get(0);
-
-    // To read 1 byte of chunk data, ChunkInputStream should get one full
-    // checksum boundary worth of data from Container and store it in buffers.
-    chunk0Stream.read(new byte[1]);
-    checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
-        BYTES_PER_CHECKSUM);
-
-    // Read > checksum boundary of data from chunk0
-    int readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
-    byte[] readData = readDataFromChunk(chunk0Stream, 0, readDataLen);
-    validateData(inputData, 0, readData);
-
-    // The first checksum boundary size of data was already existing in the
-    // ChunkStream buffers. Once that data is read, the next checksum
-    // boundary size of data will be fetched again to read the remaining data.
-    // Hence there should be 1 checksum boundary size of data stored in the
-    // ChunkStreams buffers at the end of the read.
-    checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
-        BYTES_PER_CHECKSUM);
-
-    // Seek to a position in the third checksum boundary (so that current
-    // buffers do not have the seeked position) and read > BYTES_PER_CHECKSUM
-    // bytes of data. This should result in 2 * BYTES_PER_CHECKSUM amount of
-    // data being read into the buffers. There should be 2 buffers in the
-    // stream but the the first buffer should be released after it is read
-    // and the second buffer should have BYTES_PER_CHECKSUM capacity.
-    readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
-    int offset = 2 * BYTES_PER_CHECKSUM + 1;
-    readData = readDataFromChunk(chunk0Stream, offset, readDataLen);
-    validateData(inputData, offset, readData);
-    checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 2, 1,
-        BYTES_PER_CHECKSUM);
-
-
-    // Read the full chunk data - 1 and verify that all chunk data is read into
-    // buffers. We read CHUNK_SIZE - 1 as otherwise all the buffers will be
-    // released once all chunk data is read.
-    readData = readDataFromChunk(chunk0Stream, 0, CHUNK_SIZE - 1);
-    validateData(inputData, 0, readData);
-    int expectedNumBuffers = CHUNK_SIZE / BYTES_PER_CHECKSUM;
-    checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(),
-        expectedNumBuffers, expectedNumBuffers - 1, BYTES_PER_CHECKSUM);
-
-    // Read the last byte of chunk and verify that the buffers are released.
-    chunk0Stream.read(new byte[1]);
-    Assert.assertNull("ChunkInputStream did not release buffers after " +
-        "reaching EOF.", chunk0Stream.getCachedBuffers());
+    try (KeyInputStream keyInputStream = getKeyInputStream(keyName)) {
+
+      BlockInputStream block0Stream =
+          (BlockInputStream)keyInputStream.getPartStreams().get(0);
+      block0Stream.initialize();
+
+      ChunkInputStream chunk0Stream = block0Stream.getChunkStreams().get(0);
+
+      // To read 1 byte of chunk data, ChunkInputStream should get one full
+      // checksum boundary worth of data from Container and store it in 
buffers.
+      chunk0Stream.read(new byte[1]);
+      checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
+          BYTES_PER_CHECKSUM);
+
+      // Read > checksum boundary of data from chunk0
+      int readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
+      byte[] readData = readDataFromChunk(chunk0Stream, 0, readDataLen);
+      validateData(inputData, 0, readData);
+
+      // The first checksum boundary size of data was already existing in the
+      // ChunkStream buffers. Once that data is read, the next checksum
+      // boundary size of data will be fetched again to read the remaining 
data.
+      // Hence there should be 1 checksum boundary size of data stored in the
+      // ChunkStreams buffers at the end of the read.
+      checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 1, 0,
+          BYTES_PER_CHECKSUM);
+
+      // Seek to a position in the third checksum boundary (so that current
+      // buffers do not have the seeked position) and read > BYTES_PER_CHECKSUM
+      // bytes of data. This should result in 2 * BYTES_PER_CHECKSUM amount of
+      // data being read into the buffers. There should be 2 buffers in the
+      // stream but the the first buffer should be released after it is read
+      // and the second buffer should have BYTES_PER_CHECKSUM capacity.
+      readDataLen = BYTES_PER_CHECKSUM + (BYTES_PER_CHECKSUM / 2);
+      int offset = 2 * BYTES_PER_CHECKSUM + 1;
+      readData = readDataFromChunk(chunk0Stream, offset, readDataLen);
+      validateData(inputData, offset, readData);
+      checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(), 2, 1,
+          BYTES_PER_CHECKSUM);
+
+
+      // Read the full chunk data -1 and verify that all chunk data is read 
into
+      // buffers. We read CHUNK_SIZE - 1 as otherwise all the buffers will be
+      // released once all chunk data is read.
+      readData = readDataFromChunk(chunk0Stream, 0, CHUNK_SIZE - 1);
+      validateData(inputData, 0, readData);
+      int expectedNumBuffers = CHUNK_SIZE / BYTES_PER_CHECKSUM;
+      checkBufferSizeAndCapacity(chunk0Stream.getCachedBuffers(),
+          expectedNumBuffers, expectedNumBuffers - 1, BYTES_PER_CHECKSUM);
+
+      // Read the last byte of chunk and verify that the buffers are released.
+      chunk0Stream.read(new byte[1]);
+      Assert.assertNull("ChunkInputStream did not release buffers after " +
+          "reaching EOF.", chunk0Stream.getCachedBuffers());
+    }
   }
 
   private void testCloseReleasesBuffers() throws Exception {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
index 11dab74bba..325ea9ba54 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import 
org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.ObjectStore;
@@ -140,6 +141,7 @@ public abstract class TestInputStreamBase {
    */
   @After
   public void shutdown() {
+    IOUtils.close(null, client);
     if (cluster != null) {
       cluster.shutdown();
     }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
index 14cd1b66f4..2a7423b15d 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
@@ -177,17 +177,17 @@ public final class TestHelper {
   public static void validateData(String keyName, byte[] data,
       ObjectStore objectStore, String volumeName, String bucketName)
       throws Exception {
-    byte[] readData = new byte[data.length];
-    OzoneInputStream is =
+    try (OzoneInputStream is =
         objectStore.getVolume(volumeName).getBucket(bucketName)
-            .readKey(keyName);
-    is.read(readData);
-    MessageDigest sha1 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-    sha1.update(data);
-    MessageDigest sha2 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-    sha2.update(readData);
-    Assert.assertTrue(Arrays.equals(sha1.digest(), sha2.digest()));
-    is.close();
+            .readKey(keyName)) {
+      byte[] readData = new byte[data.length];
+      is.read(readData);
+      MessageDigest sha1 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+      sha1.update(data);
+      MessageDigest sha2 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+      sha2.update(readData);
+      Assert.assertTrue(Arrays.equals(sha1.digest(), sha2.digest()));
+    }
   }
 
   public static void waitForContainerClose(OzoneOutputStream outputStream,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to