This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 94814e5 HDDS-5425: Client side pipeline cache key should use host and
port combination (#2510)
94814e5 is described below
commit 94814e5dbae3fa5052e3451f54120c3c21fc9052
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Tue Aug 10 10:49:07 2021 -0700
HDDS-5425: Client side pipeline cache key should use host and port
combination (#2510)
---
.../hadoop/hdds/scm/XceiverClientManager.java | 10 +++-
.../ozone/client/rpc/TestECKeyOutputStream.java | 62 ++++++++++++++++++++++
2 files changed, 71 insertions(+), 1 deletion(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index 8d16094..cb8bef1 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -249,9 +250,16 @@ public class XceiverClientManager implements Closeable,
XceiverClientFactory {
private String getPipelineCacheKey(Pipeline pipeline, boolean forRead) {
String key = pipeline.getId().getId().toString() + pipeline.getType();
- if (topologyAwareRead && forRead) {
+ boolean isEC = pipeline.getReplicationConfig()
+ .getReplicationType() == HddsProtos.ReplicationType.EC;
+ if (topologyAwareRead && forRead || isEC) {
try {
key += pipeline.getClosestNode().getHostName();
+ if (isEC) {
+ // Currently EC uses standalone client.
+ key += pipeline.getClosestNode()
+ .getPort(DatanodeDetails.Port.Name.STANDALONE);
+ }
} catch (IOException e) {
LOG.error("Failed to get closest node to create pipeline cache key:" +
e.getMessage());
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index 8e571f8..06f0646 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -16,16 +16,21 @@
*/
package org.apache.hadoop.ozone.client.rpc;
+import com.google.common.cache.Cache;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.TestHelper;
@@ -34,6 +39,10 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -110,4 +119,57 @@ public class TestECKeyOutputStream {
Assert.assertTrue(key.getOutputStream() instanceof ECKeyOutputStream);
}
}
+
+ @Test
+ public void testECKeyXceiverClientShouldNotUseCachedKeysForDifferentStreams()
+ throws Exception {
+ int data = 3;
+ int parity = 2;
+ try (OzoneOutputStream key = TestHelper
+ .createKey(keyString, new ECReplicationConfig(data, parity), 1024,
+ objectStore, volumeName, bucketName)) {
+ final List<BlockOutputStreamEntry> streamEntries =
+ ((ECKeyOutputStream) key.getOutputStream()).getStreamEntries();
+ Assert.assertEquals(data + parity, streamEntries.size());
+ final Pipeline firstStreamPipeline = streamEntries.get(0).getPipeline();
+ XceiverClientSpi xceiverClientSpi =
+ ((ECKeyOutputStream) key.getOutputStream()).getXceiverClientFactory()
+ .acquireClient(firstStreamPipeline);
+ Assert.assertNotNull(xceiverClientSpi);
+ final Cache<String, XceiverClientSpi> clientCache =
+ ((XceiverClientManager) ((ECKeyOutputStream) key.getOutputStream())
+ .getXceiverClientFactory()).getClientCache();
+ final String firstCacheKey =
+ clientCache.asMap().entrySet().iterator().next().getKey();
+ List<String> prevVisitedKeys = new ArrayList<>();
+ prevVisitedKeys.add(firstCacheKey);
+ // Lets look at all underlying EC Block group streams and make sure
+ // xceiver client entry is not repeating for all.
+ for (int i = 1; i < streamEntries.size(); i++) {
+ Pipeline pipeline = streamEntries.get(i).getPipeline();
+ Assert.assertEquals(i, clientCache.asMap().size());
+ xceiverClientSpi = ((ECKeyOutputStream) key.getOutputStream())
+ .getXceiverClientFactory().acquireClient(pipeline);
+ Assert.assertNotNull(xceiverClientSpi);
+ Assert.assertEquals(i + 1, clientCache.asMap().size());
+ final String newCacheKey =
+ getNewKey(clientCache.asMap().entrySet().iterator(),
+ prevVisitedKeys);
+ prevVisitedKeys.add(newCacheKey);
+ Assert.assertNotEquals(firstCacheKey, newCacheKey);
+ }
+ }
+ }
+
+ private String getNewKey(
+ Iterator<Map.Entry<String, XceiverClientSpi>> iterator,
+ List<String> prevVisitedKeys) {
+ while (iterator.hasNext()) {
+ final String key = iterator.next().getKey();
+ if (!prevVisitedKeys.contains(key)) {
+ return key;
+ }
+ }
+ return null;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]