This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-5713 by this push:
new 232d133e3b HDDS-13055. [DiskBalancer] Optimize
DefaultContainerChoosingPolicy performance (#8512)
232d133e3b is described below
commit 232d133e3be61db15612f3cbee328423f441b10d
Author: Gargi Jaiswal <[email protected]>
AuthorDate: Fri Jun 13 08:20:01 2025 +0530
HDDS-13055. [DiskBalancer] Optimize DefaultContainerChoosingPolicy
performance (#8512)
Co-authored-by: Gargi Jaiswal <[email protected]>
Co-authored-by: Sammi Chen <[email protected]>
---
.../policy/ContainerChoosingPolicy.java | 5 +-
.../policy/DefaultContainerChoosingPolicy.java | 24 +-
.../scm/node/TestContainerChoosingPolicy.java | 269 +++++++++++++++++++++
3 files changed, 295 insertions(+), 3 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java
index 5f9702297a..c3e89ab93d 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/ContainerChoosingPolicy.java
@@ -28,7 +28,10 @@
public interface ContainerChoosingPolicy {
/**
* Choose a container for balancing.
- *
+ * @param ozoneContainer the OzoneContainer instance to get all containers
of a particular volume.
+ * @param volume the HddsVolume instance to choose containers from.
+ * @param inProgressContainerIDs containerIDs present in this set should be
+ - avoided as these containers are already under move by diskBalancer.
* @return a Container
*/
ContainerData chooseContainer(OzoneContainer ozoneContainer,
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
index df35bf2a08..f75b94ad16 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java
@@ -17,8 +17,13 @@
package org.apache.hadoop.ozone.container.diskbalancer.policy;
+import static java.util.concurrent.TimeUnit.HOURS;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import java.util.Iterator;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
@@ -33,11 +38,22 @@ public class DefaultContainerChoosingPolicy implements
ContainerChoosingPolicy {
public static final Logger LOG = LoggerFactory.getLogger(
DefaultContainerChoosingPolicy.class);
+ private static final ThreadLocal<Cache<HddsVolume, Iterator<Container<?>>>>
CACHE =
+ ThreadLocal.withInitial(
+ () -> CacheBuilder.newBuilder().recordStats().expireAfterAccess(1,
HOURS).build());
+
@Override
public ContainerData chooseContainer(OzoneContainer ozoneContainer,
HddsVolume hddsVolume, Set<Long> inProgressContainerIDs) {
- Iterator<Container<?>> itr = ozoneContainer.getController()
- .getContainers(hddsVolume);
+ Iterator<Container<?>> itr;
+ try {
+ itr = CACHE.get().get(hddsVolume,
+ () -> ozoneContainer.getController().getContainers(hddsVolume));
+ } catch (ExecutionException e) {
+ LOG.warn("Failed to get container iterator for volume {}", hddsVolume,
e);
+ return null;
+ }
+
while (itr.hasNext()) {
ContainerData containerData = itr.next().getContainerData();
if (!inProgressContainerIDs.contains(
@@ -45,6 +61,10 @@ public ContainerData chooseContainer(OzoneContainer
ozoneContainer,
return containerData;
}
}
+
+ if (!itr.hasNext()) {
+ CACHE.get().invalidate(hddsVolume);
+ }
return null;
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java
new file mode 100644
index 0000000000..68b5aa4e94
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerChoosingPolicy.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.scm.node;
+
+import static
org.apache.hadoop.ozone.container.common.impl.ContainerImplTestUtils.newContainerSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.fs.MockSpaceUsageCheckFactory;
+import org.apache.hadoop.hdds.fs.MockSpaceUsageSource;
+import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
+import org.apache.hadoop.hdds.fs.SpaceUsagePersistence;
+import org.apache.hadoop.hdds.fs.SpaceUsageSource;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataScanOrder;
+import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import
org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy;
+import
org.apache.hadoop.ozone.container.diskbalancer.policy.DefaultContainerChoosingPolicy;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+
+/**
+ * This class tests the ContainerChoosingPolicy.
+ */
+public class TestContainerChoosingPolicy {
+
+ private static final int NUM_VOLUMES = 20;
+ private static final int NUM_CONTAINERS = 100000;
+ private static final int NUM_THREADS = 10;
+ private static final int NUM_ITERATIONS = 10000;
+ private static final int MAX_IN_PROGRESS = 100;
+
+ private static final OzoneConfiguration CONF = new OzoneConfiguration();
+
+ @TempDir
+ private Path baseDir;
+
+ private List<HddsVolume> volumes;
+ private ContainerSet containerSet;
+ private OzoneContainer ozoneContainer;
+ private ContainerChoosingPolicy containerChoosingPolicy;
+ private ExecutorService executor;
+ private ContainerController containerController;
+
+ // Simulate containers currently being balanced (in progress)
+ private Set<Long> inProgressContainerIDs = ConcurrentHashMap.newKeySet();
+
+ @BeforeEach
+ public void setup() throws Exception {
+ containerSet = newContainerSet();
+ createVolumes();
+ createContainers();
+ ozoneContainer = mock(OzoneContainer.class);
+ containerController = new ContainerController(containerSet, null);
+ when(ozoneContainer.getController()).thenReturn(containerController);
+ when(ozoneContainer.getContainerSet()).thenReturn(containerSet);
+ containerChoosingPolicy = new DefaultContainerChoosingPolicy();
+ executor = Executors.newFixedThreadPool(NUM_THREADS);
+ }
+
+ @AfterEach
+ public void cleanUp() {
+ volumes.forEach(HddsVolume::shutdown);
+
+ // Shutdown executor service
+ if (executor != null && !executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+
+ // Clear in-progress container IDs
+ inProgressContainerIDs.clear();
+
+ // Clear ContainerSet
+ containerSet = null;
+ }
+
+ @Test
+ @Timeout(300)
+ public void testConcurrentVolumeChoosingPerformance() throws Exception {
+ testPolicyPerformance("ContainerChoosingPolicy", containerChoosingPolicy);
+ }
+
+ @Test
+ public void testContainerDeletionAfterIteratorGeneration() throws Exception {
+ HddsVolume volume = volumes.get(0);
+ List<Container<?>> containerList =
ozoneContainer.getContainerSet().getContainerMap().values().stream()
+ .filter(x ->
volume.getStorageID().equals(x.getContainerData().getVolume().getStorageID()))
+ .filter(x -> x.getContainerData().isClosed())
+ .sorted(ContainerDataScanOrder.INSTANCE)
+ .collect(Collectors.toList());
+ inProgressContainerIDs.clear();
+ ContainerData container =
containerChoosingPolicy.chooseContainer(ozoneContainer, volume,
inProgressContainerIDs);
+ assertEquals(containerList.get(0).getContainerData().getContainerID(),
container.getContainerID());
+
ozoneContainer.getContainerSet().removeContainer(containerList.get(1).getContainerData().getContainerID());
+ inProgressContainerIDs.add(container.getContainerID());
+ container = containerChoosingPolicy.chooseContainer(ozoneContainer,
volume, inProgressContainerIDs);
+ assertEquals(containerList.get(1).getContainerData().getContainerID(),
container.getContainerID());
+ }
+
+ /**
+ * SuccessCount: Number of successful container choices from the policy.
+ * FailureCount: Failures due to any exceptions thrown during container
choice.
+ */
+ private void testPolicyPerformance(String policyName,
ContainerChoosingPolicy policy) throws Exception {
+ CountDownLatch latch = new CountDownLatch(NUM_THREADS);
+ AtomicInteger containerChosenCount = new AtomicInteger(0);
+ AtomicInteger containerNotChosenCount = new AtomicInteger(0);
+ AtomicInteger failureCount = new AtomicInteger(0);
+ AtomicLong totalTimeNanos = new AtomicLong(0);
+
+ Random rand = new Random();
+
+ for (int i = 0; i < NUM_THREADS; i++) {
+ executor.submit(() -> {
+ try {
+ long threadStart = System.nanoTime();
+ int containerChosen = 0;
+ int containerNotChosen = 0;
+ int failures = 0;
+ // Choose a random volume
+ HddsVolume volume = volumes.get(rand.nextInt(NUM_VOLUMES));
+
+ for (int j = 0; j < NUM_ITERATIONS; j++) {
+ try {
+ ContainerData c = policy.chooseContainer(ozoneContainer, volume,
inProgressContainerIDs);
+ if (c == null) {
+ containerNotChosen++;
+ } else {
+ containerChosen++;
+ if (inProgressContainerIDs.size() < MAX_IN_PROGRESS) {
+ inProgressContainerIDs.add(c.getContainerID());
+ }
+ }
+ } catch (Exception e) {
+ failures++;
+ }
+ }
+
+ long threadEnd = System.nanoTime();
+ totalTimeNanos.addAndGet(threadEnd - threadStart);
+ containerChosenCount.addAndGet(containerChosen);
+ containerNotChosenCount.addAndGet(containerNotChosen);
+ failureCount.addAndGet(failures);
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ // Wait max 5 minutes for test completion
+ assertTrue(latch.await(5, TimeUnit.MINUTES), "Test timed out");
+
+ long totalOperations = (long) NUM_THREADS * NUM_ITERATIONS;
+ double avgTimePerOp = (double) totalTimeNanos.get() / totalOperations;
+ double opsPerSec = totalOperations / (totalTimeNanos.get() /
1_000_000_000.0);
+
+ System.out.println("Performance results for " + policyName);
+ System.out.println("Total volumes: " + NUM_VOLUMES);
+ System.out.println("Total containers: " + NUM_CONTAINERS);
+ System.out.println("Total threads: " + NUM_THREADS);
+ System.out.println("Total operations: " + totalOperations);
+ System.out.println("Container Chosen operations: " +
containerChosenCount.get());
+ System.out.println("Container Not Chosen operations: " +
containerNotChosenCount.get());
+ System.out.println("Failed operations: " + failureCount.get());
+ System.out.println("Total time (ms): " + totalTimeNanos.get() / 1_000_000);
+ System.out.println("Average time per operation (ns): " + avgTimePerOp);
+ System.out.println("Operations per second: " + opsPerSec);
+ }
+
+ public void createVolumes() throws IOException {
+ // Create volumes with mocked space usage
+ volumes = new ArrayList<>();
+ for (int i = 0; i < NUM_VOLUMES; i++) {
+ String volumePath = baseDir.resolve("disk" + i).toString();
+ SpaceUsageSource source = MockSpaceUsageSource.fixed(1000000000,
1000000000 - i * 50000);
+ SpaceUsageCheckFactory factory = MockSpaceUsageCheckFactory.of(
+ source, Duration.ZERO, SpaceUsagePersistence.None.INSTANCE);
+ HddsVolume volume = new HddsVolume.Builder(volumePath)
+ .conf(CONF)
+ .usageCheckFactory(factory)
+ .build();
+ volumes.add(volume);
+ }
+ }
+
+ public void createContainers() {
+ List<Long> closedContainerIDs = new ArrayList<>();
+ Random random = new Random();
+ long startTime = System.currentTimeMillis();
+
+ for (int i = 0; i < NUM_CONTAINERS; i++) {
+ boolean isOpen = i < 10; // First 10 containers are open
+ int volumeIndex = i % NUM_VOLUMES; // Distribute containers across
volumes
+ HddsVolume volume = volumes.get(volumeIndex);
+
+ KeyValueContainerData containerData = new KeyValueContainerData(
+ i, ContainerLayoutVersion.FILE_PER_BLOCK,
ContainerTestHelper.CONTAINER_MAX_SIZE,
+ UUID.randomUUID().toString(), UUID.randomUUID().toString());
+
+ containerData.setState(isOpen ? ContainerDataProto.State.OPEN :
ContainerDataProto.State.CLOSED);
+ containerData.setVolume(volume);
+ KeyValueContainer container = new KeyValueContainer(containerData, CONF);
+
+ try {
+ containerSet.addContainer(container); // Add container to ContainerSet
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to add container to ContainerSet",
e);
+ }
+
+ // Collect IDs of closed containers
+ if (!isOpen) {
+ closedContainerIDs.add((long) i);
+ }
+ }
+
+ // Randomly select NUM_THREADS closed containers to be in-progress
+ Collections.shuffle(closedContainerIDs, random);
+ inProgressContainerIDs.addAll(closedContainerIDs.subList(0, NUM_THREADS));
+ System.out.println("Created " + NUM_CONTAINERS + " containers in " +
+ (System.currentTimeMillis() - startTime) + " ms");
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]