This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new aaf6f17  HDDS-6021: EC: Client side exclude nodes list should expire 
after certain time period or based on the list size. (#2973)
aaf6f17 is described below

commit aaf6f1704504732c19cb1bfba29c1341873dc8ad
Author: Uma Maheswara Rao G <[email protected]>
AuthorDate: Tue Jan 18 10:10:50 2022 -0800

    HDDS-6021: EC: Client side exclude nodes list should expire after certain 
time period or based on the list size. (#2973)
---
 .../apache/hadoop/hdds/scm/OzoneClientConfig.java  | 12 ++++
 .../scm/container/common/helpers/ExcludeList.java  | 48 ++++++++++++----
 .../container/common/helpers/TestExcludeList.java  | 67 ++++++++++++++++++++++
 .../client/io/BlockOutputStreamEntryPool.java      |  6 +-
 .../client/io/ECBlockOutputStreamEntryPool.java    | 10 ++++
 5 files changed, 132 insertions(+), 11 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
index 8861f4b..0d59e8f 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -131,6 +131,14 @@ public class OzoneClientConfig {
       tags = ConfigTag.CLIENT)
   private int maxECStripeWriteRetries = 10;
 
+  @Config(key = "exclude.nodes.expiry.time",
+      defaultValue = "600000",
+      description = "Time after which an excluded node is reconsidered for" +
+          " writes in EC. If the value is zero, the node is excluded for the" +
+          " life of the client",
+      tags = ConfigTag.CLIENT)
+  private long excludeNodesExpiryTime = 10 * 60 * 1000;
+
   @PostConstruct
   private void validate() {
     Preconditions.checkState(streamBufferSize > 0);
@@ -235,6 +243,10 @@ public class OzoneClientConfig {
     return this.maxECStripeWriteRetries;
   }
 
+  public long getExcludeNodesExpiryTime() {
+    return excludeNodesExpiryTime;
+  }
+
   public int getBufferIncrement() {
     return bufferIncrement;
   }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java
index 824a1f5..026b313 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/ExcludeList.java
@@ -17,16 +17,20 @@
 
 package org.apache.hadoop.hdds.scm.container.common.helpers;
 
-
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.ozone.common.MonotonicClock;
 
+import java.time.ZoneOffset;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * This class contains set of dns and containers which ozone client provides
@@ -34,15 +38,24 @@ import java.util.UUID;
  */
 public class ExcludeList {
 
-  private final Set<DatanodeDetails> datanodes;
+  private final Map<DatanodeDetails, Long> datanodes;
   private final Set<ContainerID> containerIds;
   private final Set<PipelineID> pipelineIds;
+  private long expiryTime = 0;
+  private java.time.Clock clock;
 
 
   public ExcludeList() {
-    datanodes = new HashSet<>();
+    datanodes = new ConcurrentHashMap<>();
     containerIds = new HashSet<>();
     pipelineIds = new HashSet<>();
+    clock = new MonotonicClock(ZoneOffset.UTC);
+  }
+
+  public ExcludeList(long autoExpiryTime, java.time.Clock clock) {
+    this();
+    this.expiryTime = autoExpiryTime;
+    this.clock = clock;
   }
 
   public Set<ContainerID> getContainerIds() {
@@ -50,7 +63,23 @@ public class ExcludeList {
   }
 
   public Set<DatanodeDetails> getDatanodes() {
-    return datanodes;
+    Set<DatanodeDetails> dns = new HashSet<>();
+    if (expiryTime > 0) {
+      Iterator<Map.Entry<DatanodeDetails, Long>> iterator =
+          datanodes.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<DatanodeDetails, Long> entry = iterator.next();
+        Long storedExpiryTime = entry.getValue();
+        if (clock.millis() > storedExpiryTime) {
+          iterator.remove(); // removing
+        } else {
+          dns.add(entry.getKey());
+        }
+      }
+    } else {
+      dns = datanodes.keySet();
+    }
+    return dns;
   }
 
   public void addDatanodes(Collection<DatanodeDetails> dns) {
@@ -58,7 +87,7 @@ public class ExcludeList {
   }
 
   public void addDatanode(DatanodeDetails dn) {
-    datanodes.add(dn);
+    datanodes.put(dn, clock.millis() + expiryTime);
   }
 
   public void addConatinerId(ContainerID containerId) {
@@ -78,9 +107,7 @@ public class ExcludeList {
         HddsProtos.ExcludeListProto.newBuilder();
     containerIds
         .forEach(id -> builder.addContainerIds(id.getId()));
-    datanodes.forEach(dn -> {
-      builder.addDatanodes(dn.getUuidString());
-    });
+    getDatanodes().forEach(dn -> builder.addDatanodes(dn.getUuidString()));
     pipelineIds.forEach(pipelineID -> {
       builder.addPipelineIds(pipelineID.getProtobuf());
     });
@@ -105,7 +132,7 @@ public class ExcludeList {
   }
 
   public boolean isEmpty() {
-    return datanodes.isEmpty() && containerIds.isEmpty() && pipelineIds
+    return getDatanodes().isEmpty() && containerIds.isEmpty() && pipelineIds
         .isEmpty();
   }
 
@@ -118,9 +145,10 @@ public class ExcludeList {
   @Override
   public String toString() {
     return "ExcludeList {" +
-        "datanodes = " + datanodes +
+        "datanodes = " + getDatanodes() +
         ", containerIds = " + containerIds +
         ", pipelineIds = " + pipelineIds +
         '}';
   }
+
 }
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/common/helpers/TestExcludeList.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/common/helpers/TestExcludeList.java
new file mode 100644
index 0000000..d3fb49b
--- /dev/null
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/common/helpers/TestExcludeList.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.common.helpers;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.ozone.test.TestClock;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.UUID;
+
+/**
+ * Tests the exclude nodes list behavior at client.
+ */
+public class TestExcludeList {
+  private TestClock clock = new TestClock(Instant.now(), ZoneOffset.UTC);
+
+  @Test
+  public void excludeNodesShouldBeCleanedBasedOnGivenTime() {
+    ExcludeList list = new ExcludeList(10, clock);
+    list.addDatanode(DatanodeDetails.newBuilder().setUuid(UUID.randomUUID())
+        .setIpAddress("127.0.0.1").setHostName("localhost").addPort(
+            DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, 
2001))
+        .build());
+    Assert.assertTrue(list.getDatanodes().size() == 1);
+    clock.fastForward(11);
+    Assert.assertTrue(list.getDatanodes().size() == 0);
+    list.addDatanode(DatanodeDetails.newBuilder().setUuid(UUID.randomUUID())
+        .setIpAddress("127.0.0.2").setHostName("localhost").addPort(
+            DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, 
2001))
+        .build());
+    list.addDatanode(DatanodeDetails.newBuilder().setUuid(UUID.randomUUID())
+        .setIpAddress("127.0.0.3").setHostName("localhost").addPort(
+            DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, 
2001))
+        .build());
+    Assert.assertTrue(list.getDatanodes().size() == 2);
+  }
+
+  @Test
+  public void excludeNodeShouldNotBeCleanedIfExpiryTimeIsZero() {
+    ExcludeList list = new ExcludeList(0, clock);
+    list.addDatanode(DatanodeDetails.newBuilder().setUuid(UUID.randomUUID())
+        .setIpAddress("127.0.0.1").setHostName("localhost").addPort(
+            DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, 
2001))
+        .build());
+    Assert.assertTrue(list.getDatanodes().size() == 1);
+    clock.fastForward(1);
+    Assert.assertTrue(list.getDatanodes().size() == 1);
+  }
+}
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index 912ba3a..1c09823 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -102,7 +102,7 @@ public class BlockOutputStreamEntryPool {
         .setMultipartUploadPartNumber(partNumber).build();
     this.requestID = requestId;
     this.openID = openID;
-    this.excludeList = new ExcludeList();
+    this.excludeList = createExcludeList();
 
     this.bufferPool =
         new BufferPool(config.getStreamBufferSize(),
@@ -112,6 +112,10 @@ public class BlockOutputStreamEntryPool {
                 .createByteBufferConversion(unsafeByteBufferConversion));
   }
 
+  ExcludeList createExcludeList() {
+    return new ExcludeList();
+  }
+
   /**
    * A constructor for testing purpose only.
    *
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
index e379956..4651896 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockOutputStreamEntryPool.java
@@ -21,10 +21,14 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.ozone.common.MonotonicClock;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 
+import java.time.ZoneOffset;
+
 /**
  * {@link BlockOutputStreamEntryPool} is responsible to manage OM communication
  * regarding writing a block to Ozone in a non-EC write case.
@@ -58,6 +62,12 @@ public class ECBlockOutputStreamEntryPool extends 
BlockOutputStreamEntryPool {
   }
 
   @Override
+  ExcludeList createExcludeList() {
+    return new ExcludeList(getConfig().getExcludeNodesExpiryTime(),
+        new MonotonicClock(ZoneOffset.UTC));
+  }
+
+  @Override
   BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
     return
         new ECBlockOutputStreamEntry.Builder()

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to