This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-5713 by this push:
new 6ab87743145 HDDS-13602. Delay delete source container replica to avoid
read failure due to read thread holds the old containerData (#8965)
6ab87743145 is described below
commit 6ab87743145b5189c2f4a859d458c54e56f2a518
Author: Sammi Chen <[email protected]>
AuthorDate: Tue Dec 2 10:19:48 2025 +0800
HDDS-13602. Delay delete source container replica to avoid read failure due
to read thread holds the old containerData (#8965)
---
.../diskbalancer/DiskBalancerService.java | 63 ++++++++++++++++++----
.../diskbalancer/TestDiskBalancerTask.java | 35 ++++++++++--
2 files changed, 85 insertions(+), 13 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
index 04bd959eb4d..37984848e9f 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java
@@ -32,6 +32,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -85,6 +86,7 @@ public class DiskBalancerService extends BackgroundService {
LoggerFactory.getLogger(DiskBalancerService.class);
public static final String DISK_BALANCER_DIR = "diskBalancer";
+ private static long replicaDeletionDelayMills = 60 * 60 * 1000L; // 60
minutes
private OzoneContainer ozoneContainer;
private final ConfigurationSource conf;
@@ -104,6 +106,7 @@ public class DiskBalancerService extends BackgroundService {
private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
private Set<ContainerID> inProgressContainers;
+ private ConcurrentSkipListMap<Long, Container> pendingDeletionContainers =
new ConcurrentSkipListMap();
private static FaultInjector injector;
/**
@@ -339,12 +342,14 @@ public BackgroundTaskQueue getTasks() {
if (this.operationalState == DiskBalancerRunningStatus.STOPPED ||
this.operationalState == DiskBalancerRunningStatus.PAUSED) {
+ cleanupPendingDeletionContainers();
return queue;
}
metrics.incrRunningLoopCount();
if (shouldDelay()) {
metrics.incrIdleLoopExceedsBandwidthCount();
+ cleanupPendingDeletionContainers();
return queue;
}
@@ -391,6 +396,7 @@ public BackgroundTaskQueue getTasks() {
}
}
metrics.incrIdleLoopNoAvailableVolumePairCount();
+ cleanupPendingDeletionContainers();
}
return queue;
@@ -554,20 +560,15 @@ public BackgroundTaskResult call() {
container.readUnlock();
}
if (moveSucceeded) {
- // Remove the old container from the KeyValueContainerUtil.
- try {
- KeyValueContainerUtil.removeContainer(
- (KeyValueContainerData) container.getContainerData(), conf);
- container.delete();
-
container.getContainerData().getVolume().decrementUsedSpace(containerSize);
- } catch (IOException ex) {
- LOG.warn("Failed to move or delete old container {} after it's
marked as DELETED. " +
- "It will be handled by background scanners.", containerId,
ex);
- }
+ // Add current old container to pendingDeletionContainers.
+ pendingDeletionContainers.put(System.currentTimeMillis() +
replicaDeletionDelayMills, container);
ContainerLogger.logMoveSuccess(containerId, sourceVolume,
destVolume, containerSize, Time.monotonicNow() - startTime);
}
postCall(moveSucceeded, startTime);
+
+ // pick one expired container from pendingDeletionContainers to delete
+ tryCleanupOnePendingDeletionContainer();
}
return BackgroundTaskResult.EmptyTaskResult.newResult();
}
@@ -593,6 +594,43 @@ private void postCall(boolean success, long startTime) {
}
}
+ private void deleteContainer(Container container) {
+ try {
+ KeyValueContainerData containerData = (KeyValueContainerData)
container.getContainerData();
+ KeyValueContainerUtil.removeContainer(containerData, conf);
+ container.delete();
+
container.getContainerData().getVolume().decrementUsedSpace(containerData.getBytesUsed());
+ LOG.info("Deleted expired container {} after delay {} ms.",
+ containerData.getContainerID(), replicaDeletionDelayMills);
+ } catch (IOException ex) {
+ LOG.warn("Failed to delete old container {} after it's marked as
DELETED. " +
+ "It will be handled by background scanners.",
container.getContainerData().getContainerID(), ex);
+ }
+ }
+
+ private void cleanupPendingDeletionContainers() {
+ // delete all pending deletion containers before stop the service
+ boolean ret;
+ do {
+ ret = tryCleanupOnePendingDeletionContainer();
+ } while (ret);
+ }
+
+ private boolean tryCleanupOnePendingDeletionContainer() {
+ Map.Entry<Long, Container> entry =
pendingDeletionContainers.pollFirstEntry();
+ if (entry != null) {
+ if (entry.getKey() <= System.currentTimeMillis()) {
+ // entry container is expired
+ deleteContainer(entry.getValue());
+ return true;
+ } else {
+ // put back the container
+ pendingDeletionContainers.put(entry.getKey(), entry.getValue());
+ }
+ }
+ return false;
+ }
+
public DiskBalancerInfo getDiskBalancerInfo() {
ImmutableList<HddsVolume> immutableVolumeSet =
DiskBalancerVolumeCalculation.getImmutableVolumeSet(volumeSet);
@@ -737,4 +775,9 @@ public void shutdown() {
public static void setInjector(FaultInjector instance) {
injector = instance;
}
+
+ @VisibleForTesting
+ public static void setReplicaDeletionDelayMills(long durationMills) {
+ replicaDeletionDelayMills = durationMills;
+ }
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
index 60fb52eba10..76a3992658b 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/TestDiskBalancerTask.java
@@ -19,6 +19,7 @@
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR;
import static
org.apache.hadoop.ozone.container.common.ContainerTestUtils.createDbInstancesForTestIfNeeded;
+import static
org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerService.DISK_BALANCER_DIR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -243,7 +244,7 @@ public void setup() throws Exception {
conf.setFromObject(diskBalancerConfiguration);
diskBalancerService = new DiskBalancerServiceTestImpl(ozoneContainer,
100, conf, 1);
-
+ DiskBalancerService.setReplicaDeletionDelayMills(0);
KeyValueContainer.setInjector(kvFaultInjector);
}
@@ -320,7 +321,7 @@ public void moveFailsAfterCopy(ContainerTestVersionInfo
versionInfo)
// verify temp container directory doesn't exist before task execution
Path tempContainerDir = destVolume.getTmpDir().toPath()
-
.resolve(DiskBalancerService.DISK_BALANCER_DIR).resolve(String.valueOf(CONTAINER_ID));
+ .resolve(DISK_BALANCER_DIR).resolve(String.valueOf(CONTAINER_ID));
File dir = new File(String.valueOf(tempContainerDir));
assertFalse(dir.exists(), "Temp container directory should not exist
before task starts");
@@ -370,7 +371,7 @@ public void moveFailsOnAtomicMove(ContainerTestVersionInfo
versionInfo)
0L : diskBalancerService.getDeltaSizes().get(sourceVolume);
String oldContainerPath = container.getContainerData().getContainerPath();
Path tempDir = destVolume.getTmpDir().toPath()
- .resolve(DiskBalancerService.DISK_BALANCER_DIR)
+ .resolve(DISK_BALANCER_DIR)
.resolve(String.valueOf(CONTAINER_ID));
assertFalse(Files.exists(tempDir), "Temp container directory should not
exist");
Path destDirPath = Paths.get(
@@ -573,6 +574,34 @@ public void
testDestVolumeCommittedSpaceReleased(ContainerTestVersionInfo versio
assertEquals(initialSourceDelta,
diskBalancerService.getDeltaSizes().get(sourceVolume));
}
+ @ContainerTestVersionInfo.ContainerTest
+ public void testOldReplicaDelayedDeletion(ContainerTestVersionInfo
versionInfo)
+ throws IOException, InterruptedException {
+ setLayoutAndSchemaForTest(versionInfo);
+ long delay = 2000L; // 2 second delay
+ DiskBalancerService.setReplicaDeletionDelayMills(delay);
+
+ Container container = createContainer(CONTAINER_ID, sourceVolume,
State.CLOSED);
+ KeyValueContainerData keyValueContainerData = (KeyValueContainerData)
container.getContainerData();
+ File oldContainerDir = new File(keyValueContainerData.getContainerPath());
+ assertTrue(oldContainerDir.exists());
+
+ DiskBalancerService.DiskBalancerTask task = getTask();
+ task.call();
+ assertEquals(State.DELETED, container.getContainerState());
+ // Verify that the old container is not deleted immediately
+ assertTrue(oldContainerDir.exists());
+
+ // create another container to trigger the deletion of old replicas
+ createContainer(CONTAINER_ID + 1, sourceVolume, State.CLOSED);
+ task = getTask();
+ // Wait for the delay to pass
+ Thread.sleep(delay);
+ task.call();
+ // Verify that the old container is deleted
+ assertFalse(oldContainerDir.exists());
+ }
+
private KeyValueContainer createContainer(long containerId, HddsVolume vol,
State state)
throws IOException {
KeyValueContainerData containerData = new KeyValueContainerData(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]