This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a9fcef48c HDDS-8913. ContainerManagerImpl: reduce processing while 
locked (#4967)
9a9fcef48c is described below

commit 9a9fcef48cb3976cbee12a2d6c002397bb258caf
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Jun 27 20:16:12 2023 +0200

    HDDS-8913. ContainerManagerImpl: reduce processing while locked (#4967)
---
 .../hadoop/hdds/scm/container/ContainerID.java     |  2 +
 .../apache/hadoop/hdds/utils/CollectionUtils.java  | 52 ++++++++++++
 .../hadoop/hdds/utils/TestCollectionUtils.java     | 60 ++++++++++++++
 .../hdds/scm/container/ContainerManagerImpl.java   | 94 +++++++++++-----------
 .../scm/container/states/ContainerAttribute.java   |  3 +-
 .../scm/container/states/ContainerStateMap.java    |  3 +-
 .../scm/container/TestContainerManagerImpl.java    | 83 ++++++++++++-------
 7 files changed, 219 insertions(+), 78 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java
index 7f21366435..88522f2f9f 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java
@@ -39,6 +39,8 @@ public final class ContainerID implements 
Comparable<ContainerID> {
       LongCodec.get(), ContainerID::valueOf, c -> c.id,
       DelegatedCodec.CopyType.SHALLOW);
 
+  public static final ContainerID MIN = ContainerID.valueOf(0);
+
   public static Codec<ContainerID> getCodec() {
     return CODEC;
   }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/CollectionUtils.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/CollectionUtils.java
index 5d46bcc957..d8188a9cf1 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/CollectionUtils.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/CollectionUtils.java
@@ -20,14 +20,20 @@ package org.apache.hadoop.hdds.utils;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import static java.util.Comparator.naturalOrder;
+
 /** Utility methods for Java Collections. */
 public interface CollectionUtils {
   static <KEY, VALUE> Map<KEY, VALUE> newUnmodifiableMap(
@@ -89,4 +95,50 @@ public interface CollectionUtils {
       }
     };
   }
+
+  static <T extends Comparable<T>> List<T> findTopN(Iterable<T> input, int n) {
+    return findTopN(input, n, any -> true);
+  }
+
+  static <T extends Comparable<T>> List<T> findTopN(
+      Iterable<T> input,
+      int n,
+      Predicate<? super T> filter
+  ) {
+    return findTopN(input, n, naturalOrder(), filter);
+  }
+
+  static <T> List<T> findTopN(
+      Iterable<T> input,
+      int n,
+      Comparator<T> comparator
+  ) {
+    return findTopN(input, n, comparator, any -> true);
+  }
+
+  static <T> List<T> findTopN(
+      Iterable<T> input,
+      int n,
+      Comparator<T> comparator,
+      Predicate<? super T> filter
+  ) {
+    PriorityQueue<T> heap = new PriorityQueue<>(comparator);
+
+    for (T item : input) {
+      if (filter.test(item)) {
+        heap.add(item);
+
+        if (heap.size() > n) {
+          heap.poll();
+        }
+      }
+    }
+
+    LinkedList<T> result = new LinkedList<>();
+    while (!heap.isEmpty()) {
+      result.addFirst(heap.poll());
+    }
+
+    return result;
+  }
 }
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestCollectionUtils.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestCollectionUtils.java
index 2fd773d878..b921a227db 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestCollectionUtils.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestCollectionUtils.java
@@ -21,11 +21,18 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
+import java.util.function.Predicate;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.emptyList;
+import static java.util.Collections.reverseOrder;
 import static java.util.Collections.singletonList;
+import static java.util.Comparator.naturalOrder;
+import static java.util.stream.Collectors.toList;
 
 /** Test for {@link CollectionUtils}. */
 public class TestCollectionUtils {
@@ -85,4 +92,57 @@ public class TestCollectionUtils {
     CollectionUtils.newIterator(listOfLists).forEachRemaining(actual::add);
     Assertions.assertEquals(expected, actual);
   }
+
+  @Test
+  void topN() {
+    testTopNInts(Arrays.asList(5, 8, 10, 20, 1, 2, 3, 42));
+    testTopNStrings(Arrays.asList("abc", "QWERTY", "hello world", "mayday",
+        "a new day", "\n", "LICENSE"));
+  }
+
+  private void testTopNStrings(List<String> strings) {
+    testTopN(strings,
+        s -> s.startsWith("a"),
+        s -> s.equals(s.toLowerCase()));
+  }
+
+  private static void testTopNInts(List<Integer> ints) {
+    testTopN(ints,
+        i -> (i % 2 == 0),
+        i -> (i < 20));
+  }
+
+  @SafeVarargs
+  private static <T extends Comparable<T>> void testTopN(List<T> items,
+      Predicate<T>... predicates) {
+
+    testTopN(items, naturalOrder(), any -> true);
+    testTopN(items, reverseOrder(), any -> true);
+
+    for (Predicate<T> predicate : predicates) {
+      testTopN(items, naturalOrder(), predicate);
+      testTopN(items, reverseOrder(), predicate);
+    }
+  }
+
+  private static <T> void testTopN(List<T> items, Comparator<T> comparator,
+      Predicate<T> predicate) {
+    List<T> shuffled = new ArrayList<>(items);
+    Collections.shuffle(shuffled);
+
+    List<T> sorted = new ArrayList<>(items);
+    sorted.sort(comparator.reversed()); // descending order
+
+    for (int i = 0; i <= items.size() + 1; i++) {
+      assertTopN(items, comparator, predicate, sorted, i);
+    }
+    assertTopN(items, comparator, predicate, sorted, Integer.MAX_VALUE);
+  }
+
+  private static <T> void assertTopN(List<T> items, Comparator<T> comparator,
+      Predicate<T> filter, List<T> sorted, int limit) {
+    Assertions.assertEquals(
+        sorted.stream().filter(filter).limit(limit).collect(toList()),
+        CollectionUtils.findTopN(items, limit, comparator, filter));
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index a4d6f36c59..2a62e1c71d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@ -19,19 +19,18 @@ package org.apache.hadoop.hdds.scm.container;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -55,7 +54,9 @@ import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.Comparator.reverseOrder;
 import static org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator.CONTAINER_ID;
+import static org.apache.hadoop.hdds.utils.CollectionUtils.findTopN;
 
 /**
  * {@link ContainerManager} implementation in SCM server.
@@ -144,34 +145,14 @@ public class ContainerManagerImpl implements 
ContainerManager {
   public List<ContainerInfo> getContainers(final ContainerID startID,
                                            final int count) {
     scmContainerManagerMetrics.incNumListContainersOps();
-    final List<ContainerID> containersIds =
-        new ArrayList<>(containerStateManager.getContainerIDs());
-    Collections.sort(containersIds);
-    List<ContainerInfo> containers;
-    lock.lock();
-    try {
-      containers = containersIds.stream()
-          .filter(id -> id.compareTo(startID) >= 0).limit(count)
-          .map(containerStateManager::getContainer)
-          .collect(Collectors.toList());
-    } finally {
-      lock.unlock();
-    }
-    return containers;
+    return toContainers(filterSortAndLimit(startID, count,
+        containerStateManager.getContainerIDs()));
   }
 
   @Override
   public List<ContainerInfo> getContainers(final LifeCycleState state) {
-    List<ContainerInfo> containers;
-    lock.lock();
-    try {
-      containers = containerStateManager.getContainerIDs(state).stream()
-          .map(containerStateManager::getContainer)
-          .filter(Objects::nonNull).collect(Collectors.toList());
-    } finally {
-      lock.unlock();
-    }
-    return containers;
+    scmContainerManagerMetrics.incNumListContainersOps();
+    return toContainers(containerStateManager.getContainerIDs(state));
   }
 
   @Override
@@ -179,20 +160,8 @@ public class ContainerManagerImpl implements 
ContainerManager {
                                            final int count,
                                            final LifeCycleState state) {
     scmContainerManagerMetrics.incNumListContainersOps();
-    final List<ContainerID> containersIds =
-        new ArrayList<>(containerStateManager.getContainerIDs(state));
-    Collections.sort(containersIds);
-    List<ContainerInfo> containers;
-    lock.lock();
-    try {
-      containers = containersIds.stream()
-          .filter(id -> id.compareTo(startID) >= 0).limit(count)
-          .map(containerStateManager::getContainer)
-          .collect(Collectors.toList());
-    } finally {
-      lock.unlock();
-    }
-    return containers;
+    return toContainers(filterSortAndLimit(startID, count,
+        containerStateManager.getContainerIDs(state)));
   }
 
   @Override
@@ -431,18 +400,24 @@ public class ContainerManagerImpl implements 
ContainerManager {
   public void deleteContainer(final ContainerID cid)
       throws IOException, TimeoutException {
     HddsProtos.ContainerID protoId = cid.getProtobuf();
+
+    final boolean found;
     lock.lock();
     try {
-      if (containerExist(cid)) {
+      found = containerExist(cid);
+      if (found) {
         containerStateManager.removeContainer(protoId);
-        scmContainerManagerMetrics.incNumSuccessfulDeleteContainers();
-      } else {
-        scmContainerManagerMetrics.incNumFailureDeleteContainers();
-        throwContainerNotFoundException(cid);
       }
     } finally {
       lock.unlock();
     }
+
+    if (found) {
+      scmContainerManagerMetrics.incNumSuccessfulDeleteContainers();
+    } else {
+      scmContainerManagerMetrics.incNumFailureDeleteContainers();
+      throwContainerNotFoundException(cid);
+    }
   }
 
   @Override
@@ -471,4 +446,33 @@ public class ContainerManagerImpl implements 
ContainerManager {
   public SCMHAManager getSCMHAManager() {
     return haManager;
   }
+
+  private static List<ContainerID> filterSortAndLimit(
+      ContainerID startID, int count, Set<ContainerID> set) {
+
+    if (ContainerID.MIN.equals(startID) && count >= set.size()) {
+      List<ContainerID> list = new ArrayList<>(set);
+      Collections.sort(list);
+      return list;
+    }
+
+    return findTopN(set, count, reverseOrder(),
+        id -> id.compareTo(startID) >= 0);
+  }
+
+  /**
+   * Returns a list of all containers identified by {@code ids}.
+   */
+  private List<ContainerInfo> toContainers(Collection<ContainerID> ids) {
+    List<ContainerInfo> containers = new ArrayList<>(ids.size());
+
+    for (ContainerID id : ids) {
+      ContainerInfo container = containerStateManager.getContainer(id);
+      if (container != null) {
+        containers.add(container);
+      }
+    }
+
+    return containers;
+  }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java
index 1e4eee0d66..c6f15be5d2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerAttribute.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdds.scm.container.states;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSortedSet;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.slf4j.Logger;
@@ -189,7 +190,7 @@ public class ContainerAttribute<T> {
     Preconditions.checkNotNull(key);
 
     if (this.attributeMap.containsKey(key)) {
-      return Collections.unmodifiableNavigableSet(this.attributeMap.get(key));
+      return ImmutableSortedSet.copyOf(this.attributeMap.get(key));
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("No such Key. Key {}", key);
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index 2d74160fc4..438e9709bf 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.base.Preconditions;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
@@ -295,7 +296,7 @@ public class ContainerStateMap {
   }
 
   public Set<ContainerID> getAllContainerIDs() {
-    return Collections.unmodifiableSet(containerMap.keySet());
+    return ImmutableSet.copyOf(containerMap.keySet());
   }
 
   /**
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
index ffe725ea2b..dca8498e38 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.container;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 
@@ -30,6 +32,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import 
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
@@ -49,6 +52,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
+import static java.util.Collections.emptyList;
+import static java.util.stream.Collectors.toList;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.OPEN;
 
 
@@ -118,66 +123,82 @@ public class TestContainerManagerImpl {
         RatisReplicationConfig.getInstance(
             ReplicationFactor.THREE), "admin");
     final ContainerID cid = container.containerID();
-    Assertions.assertEquals(HddsProtos.LifeCycleState.OPEN,
+    Assertions.assertEquals(LifeCycleState.OPEN,
         containerManager.getContainer(cid).getState());
     containerManager.updateContainerState(cid,
         HddsProtos.LifeCycleEvent.FINALIZE);
-    Assertions.assertEquals(HddsProtos.LifeCycleState.CLOSING,
+    Assertions.assertEquals(LifeCycleState.CLOSING,
         containerManager.getContainer(cid).getState());
     containerManager.updateContainerState(cid,
         HddsProtos.LifeCycleEvent.QUASI_CLOSE);
-    Assertions.assertEquals(HddsProtos.LifeCycleState.QUASI_CLOSED,
+    Assertions.assertEquals(LifeCycleState.QUASI_CLOSED,
         containerManager.getContainer(cid).getState());
     containerManager.updateContainerState(cid,
         HddsProtos.LifeCycleEvent.FORCE_CLOSE);
-    Assertions.assertEquals(HddsProtos.LifeCycleState.CLOSED,
+    Assertions.assertEquals(LifeCycleState.CLOSED,
         containerManager.getContainer(cid).getState());
   }
 
   @Test
   void testGetContainers() throws Exception {
-    Assertions.assertTrue(
-        containerManager.getContainers().isEmpty());
+    Assertions.assertEquals(emptyList(), containerManager.getContainers());
 
-    ContainerID[] cidArray = new ContainerID[10];
+    List<ContainerID> ids = new ArrayList<>();
     for (int i = 0; i < 10; i++) {
       ContainerInfo container = containerManager.allocateContainer(
-          RatisReplicationConfig.getInstance(
-              ReplicationFactor.THREE), "admin");
-      cidArray[i] = container.containerID();
+          RatisReplicationConfig.getInstance(ReplicationFactor.THREE), 
"admin");
+      ids.add(container.containerID());
     }
 
-    Assertions.assertEquals(10,
-        containerManager.getContainers(cidArray[0], 10).size());
-    Assertions.assertEquals(10,
-        containerManager.getContainers(cidArray[0], 100).size());
+    assertIds(ids,
+        containerManager.getContainers(ContainerID.MIN, 10));
+    assertIds(ids.subList(0, 5),
+        containerManager.getContainers(ContainerID.MIN, 5));
 
-    containerManager.updateContainerState(cidArray[0],
+    assertIds(ids, containerManager.getContainers(ids.get(0), 10));
+    assertIds(ids, containerManager.getContainers(ids.get(0), 100));
+    assertIds(ids.subList(5, ids.size()),
+        containerManager.getContainers(ids.get(5), 100));
+    assertIds(emptyList(),
+        containerManager.getContainers(ids.get(5), 100, 
LifeCycleState.CLOSED));
+
+    containerManager.updateContainerState(ids.get(0),
         HddsProtos.LifeCycleEvent.FINALIZE);
-    Assertions.assertEquals(9,
-        containerManager.getContainers(HddsProtos.LifeCycleState.OPEN).size());
-    Assertions.assertEquals(1, containerManager
-        .getContainers(HddsProtos.LifeCycleState.CLOSING).size());
-    containerManager.updateContainerState(cidArray[1],
+    assertIds(ids.subList(0, 1),
+        containerManager.getContainers(LifeCycleState.CLOSING));
+    assertIds(ids.subList(1, ids.size()),
+        containerManager.getContainers(LifeCycleState.OPEN));
+
+    containerManager.updateContainerState(ids.get(1),
         HddsProtos.LifeCycleEvent.FINALIZE);
-    Assertions.assertEquals(8,
-        containerManager.getContainers(HddsProtos.LifeCycleState.OPEN).size());
-    Assertions.assertEquals(2, containerManager
-        .getContainers(HddsProtos.LifeCycleState.CLOSING).size());
-    containerManager.updateContainerState(cidArray[1],
+    assertIds(ids.subList(0, 2),
+        containerManager.getContainers(LifeCycleState.CLOSING));
+    assertIds(ids.subList(2, ids.size()),
+        containerManager.getContainers(LifeCycleState.OPEN));
+
+    containerManager.updateContainerState(ids.get(1),
         HddsProtos.LifeCycleEvent.QUASI_CLOSE);
-    containerManager.updateContainerState(cidArray[2],
+    containerManager.updateContainerState(ids.get(2),
         HddsProtos.LifeCycleEvent.FINALIZE);
-    containerManager.updateContainerState(cidArray[2],
+    containerManager.updateContainerState(ids.get(2),
         HddsProtos.LifeCycleEvent.CLOSE);
     Assertions.assertEquals(7, containerManager.
-        getContainerStateCount(HddsProtos.LifeCycleState.OPEN));
+        getContainerStateCount(LifeCycleState.OPEN));
     Assertions.assertEquals(1, containerManager
-        .getContainerStateCount(HddsProtos.LifeCycleState.CLOSING));
+        .getContainerStateCount(LifeCycleState.CLOSING));
     Assertions.assertEquals(1, containerManager
-        .getContainerStateCount(HddsProtos.LifeCycleState.QUASI_CLOSED));
+        .getContainerStateCount(LifeCycleState.QUASI_CLOSED));
     Assertions.assertEquals(1, containerManager
-        .getContainerStateCount(HddsProtos.LifeCycleState.CLOSED));
+        .getContainerStateCount(LifeCycleState.CLOSED));
+  }
+
+  private static void assertIds(
+      List<ContainerID> expected,
+      List<ContainerInfo> containers
+  ) {
+    Assertions.assertEquals(expected, containers.stream()
+        .map(ContainerInfo::containerID)
+        .collect(toList()));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to