Repository: hadoop
Updated Branches:
  refs/heads/trunk f63ee083d -> e12edb3d8


HDDS-555. RandomKeyGenerator runs not closing the XceiverClient properly. 
Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e12edb3d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e12edb3d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e12edb3d

Branch: refs/heads/trunk
Commit: e12edb3d8f05ff6206c6de1b81ecfa1a03abb193
Parents: f63ee08
Author: Xiaoyu Yao <[email protected]>
Authored: Fri Oct 12 10:36:30 2018 -0700
Committer: Xiaoyu Yao <[email protected]>
Committed: Fri Oct 12 10:36:57 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hdds/scm/XceiverClientManager.java   | 17 +++++------
 .../scm/client/ContainerOperationClient.java    | 15 ++++------
 .../ozone/client/io/ChunkGroupInputStream.java  |  2 +-
 .../ozone/client/io/ChunkGroupOutputStream.java |  3 +-
 .../ozone/scm/TestContainerSmallFile.java       |  9 ++----
 .../TestGetCommittedBlockLengthAndPutKey.java   | 12 ++++----
 .../ozone/scm/TestXceiverClientManager.java     | 30 ++++++++------------
 7 files changed, 37 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12edb3d/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index 9762406..d542abc 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -27,6 +27,7 @@ import com.google.common.cache.RemovalNotification;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -58,7 +59,7 @@ public class XceiverClientManager implements Closeable {
 
   //TODO : change this to SCM configuration class
   private final Configuration conf;
-  private final Cache<Long, XceiverClientSpi> clientCache;
+  private final Cache<PipelineID, XceiverClientSpi> clientCache;
   private final boolean useRatis;
 
   private static XceiverClientMetrics metrics;
@@ -82,10 +83,10 @@ public class XceiverClientManager implements Closeable {
         .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
         .maximumSize(maxSize)
         .removalListener(
-            new RemovalListener<Long, XceiverClientSpi>() {
+            new RemovalListener<PipelineID, XceiverClientSpi>() {
             @Override
             public void onRemoval(
-                RemovalNotification<Long, XceiverClientSpi>
+                RemovalNotification<PipelineID, XceiverClientSpi>
                   removalNotification) {
               synchronized (clientCache) {
                 // Mark the entry as evicted
@@ -97,7 +98,7 @@ public class XceiverClientManager implements Closeable {
   }
 
   @VisibleForTesting
-  public Cache<Long, XceiverClientSpi> getClientCache() {
+  public Cache<PipelineID, XceiverClientSpi> getClientCache() {
     return clientCache;
   }
 
@@ -112,14 +113,14 @@ public class XceiverClientManager implements Closeable {
    * @return XceiverClientSpi connected to a container
    * @throws IOException if a XceiverClientSpi cannot be acquired
    */
-  public XceiverClientSpi acquireClient(Pipeline pipeline, long containerID)
+  public XceiverClientSpi acquireClient(Pipeline pipeline)
       throws IOException {
     Preconditions.checkNotNull(pipeline);
     Preconditions.checkArgument(pipeline.getMachines() != null);
     Preconditions.checkArgument(!pipeline.getMachines().isEmpty());
 
     synchronized (clientCache) {
-      XceiverClientSpi info = getClient(pipeline, containerID);
+      XceiverClientSpi info = getClient(pipeline);
       info.incrementReference();
       return info;
     }
@@ -137,10 +138,10 @@ public class XceiverClientManager implements Closeable {
     }
   }
 
-  private XceiverClientSpi getClient(Pipeline pipeline, long containerID)
+  private XceiverClientSpi getClient(Pipeline pipeline)
       throws IOException {
     try {
-      return clientCache.get(containerID,
+      return clientCache.get(pipeline.getId(),
           new Callable<XceiverClientSpi>() {
           @Override
           public XceiverClientSpi call() throws Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12edb3d/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
index fed589c..c2bfb42 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
@@ -96,8 +96,7 @@ public class ContainerOperationClient implements ScmClient {
               xceiverClientManager.getType(),
               xceiverClientManager.getFactor(), owner);
       Pipeline pipeline = containerWithPipeline.getPipeline();
-      client = xceiverClientManager.acquireClient(pipeline,
-          containerWithPipeline.getContainerInfo().getContainerID());
+      client = xceiverClientManager.acquireClient(pipeline);
 
       // Allocated State means that SCM has allocated this pipeline in its
       // namespace. The client needs to create the pipeline on the machines
@@ -207,8 +206,7 @@ public class ContainerOperationClient implements ScmClient {
           storageContainerLocationClient.allocateContainer(type, factor,
               owner);
       Pipeline pipeline = containerWithPipeline.getPipeline();
-      client = xceiverClientManager.acquireClient(pipeline,
-          containerWithPipeline.getContainerInfo().getContainerID());
+      client = xceiverClientManager.acquireClient(pipeline);
 
       // Allocated State means that SCM has allocated this pipeline in its
       // namespace. The client needs to create the pipeline on the machines
@@ -217,8 +215,7 @@ public class ContainerOperationClient implements ScmClient {
         createPipeline(client, pipeline);
       }
       // connect to pipeline leader and allocate container on leader datanode.
-      client = xceiverClientManager.acquireClient(pipeline,
-          containerWithPipeline.getContainerInfo().getContainerID());
+      client = xceiverClientManager.acquireClient(pipeline);
       createContainer(client,
           containerWithPipeline.getContainerInfo().getContainerID());
       return containerWithPipeline;
@@ -279,7 +276,7 @@ public class ContainerOperationClient implements ScmClient {
       boolean force) throws IOException {
     XceiverClientSpi client = null;
     try {
-      client = xceiverClientManager.acquireClient(pipeline, containerId);
+      client = xceiverClientManager.acquireClient(pipeline);
       String traceID = UUID.randomUUID().toString();
       ContainerProtocolCalls
           .deleteContainer(client, containerId, force, traceID);
@@ -334,7 +331,7 @@ public class ContainerOperationClient implements ScmClient {
       Pipeline pipeline) throws IOException {
     XceiverClientSpi client = null;
     try {
-      client = xceiverClientManager.acquireClient(pipeline, containerID);
+      client = xceiverClientManager.acquireClient(pipeline);
       String traceID = UUID.randomUUID().toString();
       ReadContainerResponseProto response =
           ContainerProtocolCalls.readContainer(client, containerID, traceID);
@@ -421,7 +418,7 @@ public class ContainerOperationClient implements ScmClient {
       For now, take the #2 way.
        */
       // Actually close the container on Datanode
-      client = xceiverClientManager.acquireClient(pipeline, containerId);
+      client = xceiverClientManager.acquireClient(pipeline);
       String traceID = UUID.randomUUID().toString();
 
       storageContainerLocationClient.notifyObjectStageChange(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12edb3d/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
index 2f17035..7791613 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupInputStream.java
@@ -273,7 +273,7 @@ public class ChunkGroupInputStream extends InputStream 
implements Seekable {
       ContainerWithPipeline containerWithPipeline =
           storageContainerLocationClient.getContainerWithPipeline(containerID);
       XceiverClientSpi xceiverClient = xceiverClientManager
-          .acquireClient(containerWithPipeline.getPipeline(), containerID);
+          .acquireClient(containerWithPipeline.getPipeline());
       boolean success = false;
       containerKey = omKeyLocationInfo.getLocalID();
       try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12edb3d/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
index de666ce..0a38a5a 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -189,8 +189,7 @@ public class ChunkGroupOutputStream extends OutputStream {
     ContainerInfo container = containerWithPipeline.getContainerInfo();
 
     XceiverClientSpi xceiverClient =
-        xceiverClientManager.acquireClient(containerWithPipeline.getPipeline(),
-            container.getContainerID());
+        
xceiverClientManager.acquireClient(containerWithPipeline.getPipeline());
     // create container if needed
     if (subKeyInfo.getShouldCreateContainer()) {
       try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12edb3d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
index 84a4028..90dc2c4 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
@@ -86,8 +86,7 @@ public class TestContainerSmallFile {
             xceiverClientManager.getType(),
             HddsProtos.ReplicationFactor.ONE, containerOwner);
     XceiverClientSpi client = xceiverClientManager
-        .acquireClient(container.getPipeline(),
-            container.getContainerInfo().getContainerID());
+        .acquireClient(container.getPipeline());
     ContainerProtocolCalls.createContainer(client,
         container.getContainerInfo().getContainerID(), traceID);
 
@@ -110,8 +109,7 @@ public class TestContainerSmallFile {
             xceiverClientManager.getType(),
             HddsProtos.ReplicationFactor.ONE, containerOwner);
     XceiverClientSpi client = xceiverClientManager
-        .acquireClient(container.getPipeline(),
-            container.getContainerInfo().getContainerID());
+        .acquireClient(container.getPipeline());
     ContainerProtocolCalls.createContainer(client,
         container.getContainerInfo().getContainerID(), traceID);
 
@@ -135,8 +133,7 @@ public class TestContainerSmallFile {
             xceiverClientManager.getType(),
             HddsProtos.ReplicationFactor.ONE, containerOwner);
     XceiverClientSpi client = xceiverClientManager
-        .acquireClient(container.getPipeline(),
-            container.getContainerInfo().getContainerID());
+        .acquireClient(container.getPipeline());
     ContainerProtocolCalls.createContainer(client,
         container.getContainerInfo().getContainerID(), traceID);
     BlockID blockID = ContainerTestHelper.getTestBlockID(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12edb3d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
index 42047aa..03a0b8a 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
@@ -91,8 +91,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
             HddsProtos.ReplicationFactor.ONE, containerOwner);
     long containerID = container.getContainerInfo().getContainerID();
     Pipeline pipeline = container.getPipeline();
-    XceiverClientSpi client =
-        xceiverClientManager.acquireClient(pipeline, containerID);
+    XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
     //create the container
     ContainerProtocolCalls.createContainer(client, containerID, traceID);
 
@@ -128,7 +127,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
     long containerID = container.getContainerInfo().getContainerID();
     Pipeline pipeline = container.getPipeline();
     XceiverClientSpi client =
-        xceiverClientManager.acquireClient(pipeline, containerID);
+        xceiverClientManager.acquireClient(pipeline);
     // create the container
     ContainerProtocolCalls.createContainer(client, containerID, traceID);
 
@@ -162,7 +161,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
             HddsProtos.ReplicationFactor.ONE, containerOwner);
     long containerID = container.getContainerInfo().getContainerID();
     XceiverClientSpi client = xceiverClientManager
-        .acquireClient(container.getPipeline(), containerID);
+        .acquireClient(container.getPipeline());
     ContainerProtocolCalls.createContainer(client, containerID, traceID);
 
     BlockID blockID = ContainerTestHelper.getTestBlockID(containerID);
@@ -187,7 +186,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
             HddsProtos.ReplicationFactor.ONE, containerOwner);
     long containerID = container.getContainerInfo().getContainerID();
     XceiverClientSpi client = xceiverClientManager
-        .acquireClient(container.getPipeline(), containerID);
+        .acquireClient(container.getPipeline());
     ContainerProtocolCalls
         .createContainer(client, containerID, traceID);
 
@@ -223,8 +222,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
             HddsProtos.ReplicationFactor.ONE, containerOwner);
     long containerID = container.getContainerInfo().getContainerID();
     Pipeline pipeline = container.getPipeline();
-    XceiverClientSpi client =
-        xceiverClientManager.acquireClient(pipeline, containerID);
+    XceiverClientSpi client = xceiverClientManager.acquireClient(pipeline);
     //create the container
     ContainerProtocolCalls.createContainer(client, containerID, traceID);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12edb3d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
index 0d363de..da445bf 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.scm;
 import com.google.common.cache.Cache;
 import org.apache.commons.lang3.RandomStringUtils;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -81,21 +82,18 @@ public class TestXceiverClientManager {
         .allocateContainer(clientManager.getType(), clientManager.getFactor(),
             containerOwner);
     XceiverClientSpi client1 = clientManager
-        .acquireClient(container1.getPipeline(),
-            container1.getContainerInfo().getContainerID());
+        .acquireClient(container1.getPipeline());
     Assert.assertEquals(1, client1.getRefcount());
 
     ContainerWithPipeline container2 = storageContainerLocationClient
         .allocateContainer(clientManager.getType(), clientManager.getFactor(),
             containerOwner);
     XceiverClientSpi client2 = clientManager
-        .acquireClient(container2.getPipeline(),
-            container2.getContainerInfo().getContainerID());
+        .acquireClient(container2.getPipeline());
     Assert.assertEquals(1, client2.getRefcount());
 
     XceiverClientSpi client3 = clientManager
-        .acquireClient(container1.getPipeline(),
-            container1.getContainerInfo().getContainerID());
+        .acquireClient(container1.getPipeline());
     Assert.assertEquals(2, client3.getRefcount());
     Assert.assertEquals(2, client1.getRefcount());
     Assert.assertEquals(client1, client3);
@@ -109,7 +107,7 @@ public class TestXceiverClientManager {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
     XceiverClientManager clientManager = new XceiverClientManager(conf);
-    Cache<Long, XceiverClientSpi> cache =
+    Cache<PipelineID, XceiverClientSpi> cache =
         clientManager.getClientCache();
 
     ContainerWithPipeline container1 =
@@ -117,8 +115,7 @@ public class TestXceiverClientManager {
             clientManager.getType(), HddsProtos.ReplicationFactor.ONE,
             containerOwner);
     XceiverClientSpi client1 = clientManager
-        .acquireClient(container1.getPipeline(),
-            container1.getContainerInfo().getContainerID());
+        .acquireClient(container1.getPipeline());
     Assert.assertEquals(1, client1.getRefcount());
     Assert.assertEquals(container1.getPipeline(),
         client1.getPipeline());
@@ -128,14 +125,13 @@ public class TestXceiverClientManager {
             clientManager.getType(),
             HddsProtos.ReplicationFactor.ONE, containerOwner);
     XceiverClientSpi client2 = clientManager
-        .acquireClient(container2.getPipeline(),
-            container2.getContainerInfo().getContainerID());
+        .acquireClient(container2.getPipeline());
     Assert.assertEquals(1, client2.getRefcount());
     Assert.assertNotEquals(client1, client2);
 
     // least recent container (i.e containerName1) is evicted
     XceiverClientSpi nonExistent1 = cache
-        .getIfPresent(container1.getContainerInfo().getContainerID());
+        .getIfPresent(container1.getContainerInfo().getPipelineID());
     Assert.assertEquals(null, nonExistent1);
     // However container call should succeed because of refcount on the client.
     String traceID1 = "trace" + RandomStringUtils.randomNumeric(4);
@@ -164,7 +160,7 @@ public class TestXceiverClientManager {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
     XceiverClientManager clientManager = new XceiverClientManager(conf);
-    Cache<Long, XceiverClientSpi> cache =
+    Cache<PipelineID, XceiverClientSpi> cache =
         clientManager.getClientCache();
 
     ContainerWithPipeline container1 =
@@ -172,8 +168,7 @@ public class TestXceiverClientManager {
             clientManager.getType(),
             clientManager.getFactor(), containerOwner);
     XceiverClientSpi client1 = clientManager
-        .acquireClient(container1.getPipeline(),
-            container1.getContainerInfo().getContainerID());
+        .acquireClient(container1.getPipeline());
     Assert.assertEquals(1, client1.getRefcount());
 
     clientManager.releaseClient(client1);
@@ -183,14 +178,13 @@ public class TestXceiverClientManager {
         .allocateContainer(clientManager.getType(), clientManager.getFactor(),
             containerOwner);
     XceiverClientSpi client2 = clientManager
-        .acquireClient(container2.getPipeline(),
-            container2.getContainerInfo().getContainerID());
+        .acquireClient(container2.getPipeline());
     Assert.assertEquals(1, client2.getRefcount());
     Assert.assertNotEquals(client1, client2);
 
     // now client 1 should be evicted
     XceiverClientSpi nonExistent = cache
-        .getIfPresent(container1.getContainerInfo().getContainerID());
+        .getIfPresent(container1.getContainerInfo().getPipelineID());
     Assert.assertEquals(null, nonExistent);
 
     // Any container operation should now fail


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to