This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 94814e5  HDDS-5425: Client side pipeline cache key should use host and 
port combination (#2510)
94814e5 is described below

commit 94814e5dbae3fa5052e3451f54120c3c21fc9052
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Tue Aug 10 10:49:07 2021 -0700

    HDDS-5425: Client side pipeline cache key should use host and port 
combination (#2510)
---
 .../hadoop/hdds/scm/XceiverClientManager.java      | 10 +++-
 .../ozone/client/rpc/TestECKeyOutputStream.java    | 62 ++++++++++++++++++++++
 2 files changed, 71 insertions(+), 1 deletion(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index 8d16094..cb8bef1 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigType;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -249,9 +250,16 @@ public class XceiverClientManager implements Closeable, 
XceiverClientFactory {
 
   private String getPipelineCacheKey(Pipeline pipeline, boolean forRead) {
     String key = pipeline.getId().getId().toString() + pipeline.getType();
-    if (topologyAwareRead && forRead) {
+    boolean isEC = pipeline.getReplicationConfig()
+        .getReplicationType() == HddsProtos.ReplicationType.EC;
+    if (topologyAwareRead && forRead || isEC) {
       try {
         key += pipeline.getClosestNode().getHostName();
+        if (isEC) {
+          // Currently EC uses standalone client.
+          key += pipeline.getClosestNode()
+              .getPort(DatanodeDetails.Port.Name.STANDALONE);
+        }
       } catch (IOException e) {
         LOG.error("Failed to get closest node to create pipeline cache key:" +
             e.getMessage());
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index 8e571f8..06f0646 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -16,16 +16,21 @@
  */
 package org.apache.hadoop.ozone.client.rpc;
 
+import com.google.common.cache.Cache;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
 import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.TestHelper;
@@ -34,6 +39,10 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -110,4 +119,57 @@ public class TestECKeyOutputStream {
       Assert.assertTrue(key.getOutputStream() instanceof ECKeyOutputStream);
     }
   }
+
+  @Test
+  public void testECKeyXceiverClientShouldNotUseCachedKeysForDifferentStreams()
+      throws Exception {
+    int data = 3;
+    int parity = 2;
+    try (OzoneOutputStream key = TestHelper
+        .createKey(keyString, new ECReplicationConfig(data, parity), 1024,
+            objectStore, volumeName, bucketName)) {
+      final List<BlockOutputStreamEntry> streamEntries =
+          ((ECKeyOutputStream) key.getOutputStream()).getStreamEntries();
+      Assert.assertEquals(data + parity, streamEntries.size());
+      final Pipeline firstStreamPipeline = streamEntries.get(0).getPipeline();
+      XceiverClientSpi xceiverClientSpi =
+          ((ECKeyOutputStream) key.getOutputStream()).getXceiverClientFactory()
+              .acquireClient(firstStreamPipeline);
+      Assert.assertNotNull(xceiverClientSpi);
+      final Cache<String, XceiverClientSpi> clientCache =
+          ((XceiverClientManager) ((ECKeyOutputStream) key.getOutputStream())
+              .getXceiverClientFactory()).getClientCache();
+      final String firstCacheKey =
+          clientCache.asMap().entrySet().iterator().next().getKey();
+      List<String> prevVisitedKeys = new ArrayList<>();
+      prevVisitedKeys.add(firstCacheKey);
+      // Lets look at all underlying EC Block group streams and make sure
+      // xceiver client entry is not repeating for all.
+      for (int i = 1; i < streamEntries.size(); i++) {
+        Pipeline pipeline = streamEntries.get(i).getPipeline();
+        Assert.assertEquals(i, clientCache.asMap().size());
+        xceiverClientSpi = ((ECKeyOutputStream) key.getOutputStream())
+            .getXceiverClientFactory().acquireClient(pipeline);
+        Assert.assertNotNull(xceiverClientSpi);
+        Assert.assertEquals(i + 1, clientCache.asMap().size());
+        final String newCacheKey =
+            getNewKey(clientCache.asMap().entrySet().iterator(),
+                prevVisitedKeys);
+        prevVisitedKeys.add(newCacheKey);
+        Assert.assertNotEquals(firstCacheKey, newCacheKey);
+      }
+    }
+  }
+
+  private String getNewKey(
+      Iterator<Map.Entry<String, XceiverClientSpi>> iterator,
+      List<String> prevVisitedKeys) {
+    while (iterator.hasNext()) {
+      final String key = iterator.next().getKey();
+      if (!prevVisitedKeys.contains(key)) {
+        return key;
+      }
+    }
+    return null;
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to