This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-5713 by this push:
     new 232d133e3b HDDS-13055. [DiskBalancer] Optimize 
DefaultContainerChoosingPolicy performance (#8512)
232d133e3b is described below

commit 232d133e3be61db15612f3cbee328423f441b10d
Author: Gargi Jaiswal <[email protected]>
AuthorDate: Fri Jun 13 08:20:01 2025 +0530

    HDDS-13055. [DiskBalancer] Optimize DefaultContainerChoosingPolicy 
performance (#8512)
    
    Co-authored-by: Gargi Jaiswal <[email protected]>
    Co-authored-by: Sammi Chen <[email protected]>
---
 .../policy/ContainerChoosingPolicy.java            |   5 +-
 .../policy/DefaultContainerChoosingPolicy.java     |  24 +-
 .../scm/node/TestContainerChoosingPolicy.java      | 269 +++++++++++++++++++++
 3 files changed, 295 insertions(+), 3 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java
index 5f9702297a..c3e89ab93d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java
@@ -28,7 +28,10 @@
 public interface ContainerChoosingPolicy {
   /**
    * Choose a container for balancing.
-   *
+   * @param ozoneContainer the OzoneContainer instance to get all containers 
of a particular volume.
+   * @param volume the HddsVolume instance to choose containers from.
+   * @param inProgressContainerIDs containerIDs present in this set should be
+   - avoided as these containers are already under move by diskBalancer.
    * @return a Container
    */
   ContainerData chooseContainer(OzoneContainer ozoneContainer,
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
index df35bf2a08..f75b94ad16 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
@@ -17,8 +17,13 @@
 
 package org.apache.hadoop.ozone.container.diskbalancer.policy;
 
+import static java.util.concurrent.TimeUnit.HOURS;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 import org.apache.hadoop.ozone.container.common.impl.ContainerData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
@@ -33,11 +38,22 @@ public class DefaultContainerChoosingPolicy implements 
ContainerChoosingPolicy {
   public static final Logger LOG = LoggerFactory.getLogger(
       DefaultContainerChoosingPolicy.class);
 
+  private static final ThreadLocal<Cache<HddsVolume, Iterator<Container<?>>>> 
CACHE =
+      ThreadLocal.withInitial(
+          () -> CacheBuilder.newBuilder().recordStats().expireAfterAccess(1, 
HOURS).build());
+
   @Override
   public ContainerData chooseContainer(OzoneContainer ozoneContainer,
       HddsVolume hddsVolume, Set<Long> inProgressContainerIDs) {
-    Iterator<Container<?>> itr = ozoneContainer.getController()
-        .getContainers(hddsVolume);
+    Iterator<Container<?>> itr;
+    try {
+      itr = CACHE.get().get(hddsVolume,
+          () -> ozoneContainer.getController().getContainers(hddsVolume));
+    } catch (ExecutionException e) {
+      LOG.warn("Failed to get container iterator for volume {}", hddsVolume, 
e);
+      return null;
+    }
+
     while (itr.hasNext()) {
       ContainerData containerData = itr.next().getContainerData();
       if (!inProgressContainerIDs.contains(
@@ -45,6 +61,10 @@ public ContainerData chooseContainer(OzoneContainer 
ozoneContainer,
         return containerData;
       }
     }
+
+    if (!itr.hasNext()) {
+      CACHE.get().invalidate(hddsVolume);
+    }
     return null;
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java
new file mode 100644
index 0000000000..68b5aa4e94
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.scm.node;
+
+import static 
org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory;
+import org.apache.hadoop.hdds.fs.MockSpaceUsageSource;
+import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
+import org.apache.hadoop.hdds.fs.SpaceUsagePersistence;
+import org.apache.hadoop.hdds.fs.SpaceUsageSource;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataScanOrder;
+import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import 
org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy;
+import 
org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+
+/**
+ * This class tests the ContainerChoosingPolicy.
+ */
+public class TestContainerChoosingPolicy {
+
+  private static final int NUM_VOLUMES = 20;
+  private static final int NUM_CONTAINERS = 100000;
+  private static final int NUM_THREADS = 10;
+  private static final int NUM_ITERATIONS = 10000;
+  private static final int MAX_IN_PROGRESS = 100;
+
+  private static final OzoneConfiguration CONF = new OzoneConfiguration();
+
+  @TempDir
+  private Path baseDir;
+
+  private List<HddsVolume> volumes;
+  private ContainerSet containerSet;
+  private OzoneContainer ozoneContainer;
+  private ContainerChoosingPolicy containerChoosingPolicy;
+  private ExecutorService executor;
+  private ContainerController containerController;
+
+  // Simulate containers currently being balanced (in progress)
+  private Set<Long> inProgressContainerIDs = ConcurrentHashMap.newKeySet();
+
+  @BeforeEach
+  public void setup() throws Exception {
+    containerSet = newContainerSet();
+    createVolumes();
+    createContainers();
+    ozoneContainer = mock(OzoneContainer.class);
+    containerController = new ContainerController(containerSet, null);
+    when(ozoneContainer.getController()).thenReturn(containerController);
+    when(ozoneContainer.getContainerSet()).thenReturn(containerSet);
+    containerChoosingPolicy = new DefaultContainerChoosingPolicy();
+    executor = Executors.newFixedThreadPool(NUM_THREADS);
+  }
+
+  @AfterEach
+  public void cleanUp() {
+    volumes.forEach(HddsVolume::shutdown);
+
+    // Shutdown executor service
+    if (executor != null && !executor.isShutdown()) {
+      executor.shutdownNow();
+    }
+
+    // Clear in-progress container IDs
+    inProgressContainerIDs.clear();
+
+    // Clear ContainerSet
+    containerSet = null;
+  }
+
+  @Test
+  @Timeout(300)
+  public void testConcurrentVolumeChoosingPerformance() throws Exception {
+    testPolicyPerformance("ContainerChoosingPolicy", containerChoosingPolicy);
+  }
+
+  @Test
+  public void testContainerDeletionAfterIteratorGeneration() throws Exception {
+    HddsVolume volume = volumes.get(0);
+    List<Container<?>> containerList = 
ozoneContainer.getContainerSet().getContainerMap().values().stream()
+        .filter(x -> 
volume.getStorageID().equals(x.getContainerData().getVolume().getStorageID()))
+        .filter(x -> x.getContainerData().isClosed())
+        .sorted(ContainerDataScanOrder.INSTANCE)
+        .collect(Collectors.toList());
+    inProgressContainerIDs.clear();
+    ContainerData container = 
containerChoosingPolicy.chooseContainer(ozoneContainer, volume, 
inProgressContainerIDs);
+    assertEquals(containerList.get(0).getContainerData().getContainerID(), 
container.getContainerID());
+    
ozoneContainer.getContainerSet().removeContainer(containerList.get(1).getContainerData().getContainerID());
+    inProgressContainerIDs.add(container.getContainerID());
+    container = containerChoosingPolicy.chooseContainer(ozoneContainer, 
volume, inProgressContainerIDs);
+    assertEquals(containerList.get(1).getContainerData().getContainerID(), 
container.getContainerID());
+  }
+
+  /**
+   * SuccessCount: Number of successful container choices from the policy.
+   * FailureCount: Failures due to any exceptions thrown during container 
choice.
+   */
+  private void testPolicyPerformance(String policyName, 
ContainerChoosingPolicy policy) throws Exception {
+    CountDownLatch latch = new CountDownLatch(NUM_THREADS);
+    AtomicInteger containerChosenCount = new AtomicInteger(0);
+    AtomicInteger containerNotChosenCount = new AtomicInteger(0);
+    AtomicInteger failureCount = new AtomicInteger(0);
+    AtomicLong totalTimeNanos = new AtomicLong(0);
+
+    Random rand = new Random();
+
+    for (int i = 0; i < NUM_THREADS; i++) {
+      executor.submit(() -> {
+        try {
+          long threadStart = System.nanoTime();
+          int containerChosen = 0;
+          int containerNotChosen = 0;
+          int failures = 0;
+          // Choose a random volume
+          HddsVolume volume = volumes.get(rand.nextInt(NUM_VOLUMES));
+
+          for (int j = 0; j < NUM_ITERATIONS; j++) {
+            try {
+              ContainerData c = policy.chooseContainer(ozoneContainer, volume, 
inProgressContainerIDs);
+              if (c == null) {
+                containerNotChosen++;
+              } else {
+                containerChosen++;
+                if (inProgressContainerIDs.size() < MAX_IN_PROGRESS) {
+                  inProgressContainerIDs.add(c.getContainerID());
+                }
+              }
+            } catch (Exception e) {
+              failures++;
+            }
+          }
+
+          long threadEnd = System.nanoTime();
+          totalTimeNanos.addAndGet(threadEnd - threadStart);
+          containerChosenCount.addAndGet(containerChosen);
+          containerNotChosenCount.addAndGet(containerNotChosen);
+          failureCount.addAndGet(failures);
+        } finally {
+          latch.countDown();
+        }
+      });
+    }
+
+    // Wait max 5 minutes for test completion
+    assertTrue(latch.await(5, TimeUnit.MINUTES), "Test timed out");
+
+    long totalOperations = (long) NUM_THREADS * NUM_ITERATIONS;
+    double avgTimePerOp = (double) totalTimeNanos.get() / totalOperations;
+    double opsPerSec = totalOperations / (totalTimeNanos.get() / 
1_000_000_000.0);
+
+    System.out.println("Performance results for " + policyName);
+    System.out.println("Total volumes: " + NUM_VOLUMES);
+    System.out.println("Total containers: " + NUM_CONTAINERS);
+    System.out.println("Total threads: " + NUM_THREADS);
+    System.out.println("Total operations: " + totalOperations);
+    System.out.println("Container Chosen operations: " + 
containerChosenCount.get());
+    System.out.println("Container Not Chosen operations: " + 
containerNotChosenCount.get());
+    System.out.println("Failed operations: " + failureCount.get());
+    System.out.println("Total time (ms): " + totalTimeNanos.get() / 1_000_000);
+    System.out.println("Average time per operation (ns): " + avgTimePerOp);
+    System.out.println("Operations per second: " + opsPerSec);
+  }
+
+  public void createVolumes() throws IOException {
+    // Create volumes with mocked space usage
+    volumes = new ArrayList<>();
+    for (int i = 0; i < NUM_VOLUMES; i++) {
+      String volumePath = baseDir.resolve("disk" + i).toString();
+      SpaceUsageSource source = MockSpaceUsageSource.fixed(1000000000, 
1000000000 - i * 50000);
+      SpaceUsageCheckFactory factory = MockSpaceUsageCheckFactory.of(
+          source, Duration.ZERO, SpaceUsagePersistence.None.INSTANCE);
+      HddsVolume volume = new HddsVolume.Builder(volumePath)
+          .conf(CONF)
+          .usageCheckFactory(factory)
+          .build();
+      volumes.add(volume);
+    }
+  }
+
+  public void createContainers() {
+    List<Long> closedContainerIDs = new ArrayList<>();
+    Random random = new Random();
+    long startTime = System.currentTimeMillis();
+
+    for (int i = 0; i < NUM_CONTAINERS; i++) {
+      boolean isOpen = i < 10; // First 10 containers are open
+      int volumeIndex = i % NUM_VOLUMES; // Distribute containers across 
volumes
+      HddsVolume volume = volumes.get(volumeIndex);
+
+      KeyValueContainerData containerData = new KeyValueContainerData(
+          i, ContainerLayoutVersion.FILE_PER_BLOCK, 
ContainerTestHelper.CONTAINER_MAX_SIZE,
+          UUID.randomUUID().toString(), UUID.randomUUID().toString());
+
+      containerData.setState(isOpen ? ContainerDataProto.State.OPEN : 
ContainerDataProto.State.CLOSED);
+      containerData.setVolume(volume);
+      KeyValueContainer container = new KeyValueContainer(containerData, CONF);
+
+      try {
+        containerSet.addContainer(container); // Add container to ContainerSet
+      } catch (Exception e) {
+        throw new RuntimeException("Failed to add container to ContainerSet", 
e);
+      }
+
+      // Collect IDs of closed containers
+      if (!isOpen) {
+        closedContainerIDs.add((long) i);
+      }
+    }
+
+    // Randomly select NUM_THREADS closed containers to be in-progress
+    Collections.shuffle(closedContainerIDs, random);
+    inProgressContainerIDs.addAll(closedContainerIDs.subList(0, NUM_THREADS));
+    System.out.println("Created " +  NUM_CONTAINERS + " containers in " +
+        (System.currentTimeMillis() - startTime) + " ms");
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to