This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new b428407768 HDDS-9842. Cache volume capacity and available space (#6383)
b428407768 is described below

commit b428407768dad4ddd10d265179fc24902e40ef26
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Jul 9 00:07:43 2024 +0200

    HDDS-9842. Cache volume capacity and available space (#6383)
---
 .../hadoop/hdds/fs/CachingSpaceUsageSource.java    | 134 ++++++++++++++++-----
 .../hdds/fs/TestCachingSpaceUsageSource.java       |  72 ++++++++---
 2 files changed, 160 insertions(+), 46 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/fs/CachingSpaceUsageSource.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/fs/CachingSpaceUsageSource.java
index b9a2f87a03..9b9719386b 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/fs/CachingSpaceUsageSource.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/fs/CachingSpaceUsageSource.java
@@ -17,10 +17,12 @@
  */
 package org.apache.hadoop.hdds.fs;
 
-import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
+import org.apache.ratis.util.AutoCloseableLock;
+import org.apache.ratis.util.AutoCloseableReadWriteLock;
+import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,7 +33,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
@@ -46,12 +47,16 @@ public class CachingSpaceUsageSource implements 
SpaceUsageSource {
       LoggerFactory.getLogger(CachingSpaceUsageSource.class);
 
   private final ScheduledExecutorService executor;
-  private final AtomicLong cachedValue = new AtomicLong();
+  private final AutoCloseableReadWriteLock lock;
+  private long cachedUsedSpace;
+  private long cachedAvailable;
+  private long cachedCapacity;
   private final Duration refresh;
   private final SpaceUsageSource source;
   private final SpaceUsagePersistence persistence;
   private boolean running;
-  private ScheduledFuture<?> scheduledFuture;
+  private ScheduledFuture<?> updateUsedSpaceFuture;
+  private ScheduledFuture<?> updateAvailableFuture;
   private final AtomicBoolean isRefreshRunning;
 
   public CachingSpaceUsageSource(SpaceUsageCheckParams params) {
@@ -60,15 +65,16 @@ public class CachingSpaceUsageSource implements 
SpaceUsageSource {
 
   CachingSpaceUsageSource(SpaceUsageCheckParams params,
       ScheduledExecutorService executor) {
-    Preconditions.checkArgument(params != null, "params == null");
+    Preconditions.assertNotNull(params, "params == null");
 
     refresh = params.getRefresh();
     source = params.getSource();
+    lock = new AutoCloseableReadWriteLock(source.toString());
     persistence = params.getPersistence();
     this.executor = executor;
     isRefreshRunning = new AtomicBoolean();
 
-    Preconditions.checkArgument(refresh.isZero() == (executor == null),
+    Preconditions.assertTrue(refresh.isZero() == (executor == null),
         "executor should be provided if and only if refresh is requested");
 
     loadInitialValue();
@@ -76,45 +82,81 @@ public class CachingSpaceUsageSource implements 
SpaceUsageSource {
 
   @Override
   public long getCapacity() {
-    return source.getCapacity();
+    try (AutoCloseableLock ignored = lock.readLock(null, null)) {
+      return cachedCapacity;
+    }
   }
 
   @Override
   public long getAvailable() {
-    return source.getAvailable();
+    try (AutoCloseableLock ignored = lock.readLock(null, null)) {
+      return cachedAvailable;
+    }
   }
 
   @Override
   public long getUsedSpace() {
-    return cachedValue.get();
+    try (AutoCloseableLock ignored = lock.readLock(null, null)) {
+      return cachedUsedSpace;
+    }
+  }
+
+  @Override
+  public SpaceUsageSource snapshot() {
+    try (AutoCloseableLock ignored = lock.readLock(null, null)) {
+      return new Fixed(cachedCapacity, cachedAvailable, cachedUsedSpace);
+    }
   }
 
   public void incrementUsedSpace(long usedSpace) {
-    cachedValue.addAndGet(usedSpace);
+    if (usedSpace == 0) {
+      return;
+    }
+    Preconditions.assertTrue(usedSpace > 0, () -> usedSpace + " < 0");
+    final long current, change;
+    try (AutoCloseableLock ignored = lock.writeLock(null, null)) {
+      current = cachedAvailable;
+      change = Math.min(current, usedSpace);
+      cachedAvailable -= change;
+      cachedUsedSpace += change;
+    }
+
+    if (change != usedSpace) {
+      LOG.warn("Attempted to decrement available space to a negative value. 
Current: {}, Decrement: {}, Source: {}",
+          current, usedSpace, source);
+    }
   }
 
   public void decrementUsedSpace(long reclaimedSpace) {
-    cachedValue.updateAndGet(current -> {
-      long newValue = current - reclaimedSpace;
-      if (newValue < 0) {
-        if (current > 0) {
-          LOG.warn("Attempted to decrement used space to a negative value. " +
-                  "Current: {}, Decrement: {}, Source: {}",
-              current, reclaimedSpace, source);
-        }
-        return 0;
-      } else {
-        return newValue;
-      }
-    });
+    if (reclaimedSpace == 0) {
+      return;
+    }
+    Preconditions.assertTrue(reclaimedSpace > 0, () -> reclaimedSpace + " < 
0");
+    final long current, change;
+    try (AutoCloseableLock ignored = lock.writeLock(null, null)) {
+      current = cachedUsedSpace;
+      change = Math.min(current, reclaimedSpace);
+      cachedUsedSpace -= change;
+      cachedAvailable += change;
+    }
+
+    if (change != reclaimedSpace) {
+      LOG.warn("Attempted to decrement used space to a negative value. 
Current: {}, Decrement: {}, Source: {}",
+          current, reclaimedSpace, source);
+    }
   }
 
   public void start() {
     if (executor != null) {
-      long initialDelay = cachedValue.get() > 0 ? refresh.toMillis() : 0;
+      long initialDelay = getUsedSpace() > 0 ? refresh.toMillis() : 0;
       if (!running) {
-        scheduledFuture = executor.scheduleWithFixedDelay(
+        updateUsedSpaceFuture = executor.scheduleWithFixedDelay(
             this::refresh, initialDelay, refresh.toMillis(), MILLISECONDS);
+
+        long availableUpdateDelay = Math.min(refresh.toMillis(), 
Duration.ofMinutes(1).toMillis());
+        updateAvailableFuture = executor.scheduleWithFixedDelay(
+            this::updateAvailable, availableUpdateDelay, availableUpdateDelay, 
MILLISECONDS);
+
         running = true;
       }
     } else {
@@ -126,8 +168,13 @@ public class CachingSpaceUsageSource implements 
SpaceUsageSource {
     persistence.save(this); // save cached value
 
     if (executor != null) {
-      if (running && scheduledFuture != null) {
-        scheduledFuture.cancel(true);
+      if (running) {
+        if (updateUsedSpaceFuture != null) {
+          updateUsedSpaceFuture.cancel(true);
+        }
+        if (updateAvailableFuture != null) {
+          updateAvailableFuture.cancel(true);
+        }
       }
       running = false;
 
@@ -135,21 +182,48 @@ public class CachingSpaceUsageSource implements 
SpaceUsageSource {
     }
   }
 
+  /** Schedule immediate refresh. */
   public void refreshNow() {
-    //refresh immediately
     executor.schedule(this::refresh, 0, MILLISECONDS);
   }
 
+  /** Loads {@code usedSpace} value from persistent source, if present.
+   * Also updates {@code available} and {@code capacity} from the {@code 
source}. */
   private void loadInitialValue() {
     final OptionalLong initialValue = persistence.load();
-    initialValue.ifPresent(cachedValue::set);
+    updateCachedValues(initialValue.orElse(0));
+  }
+
+  /** Updates {@code available} and {@code capacity} from the {@code source}. 
*/
+  private void updateAvailable() {
+    final long capacity = source.getCapacity();
+    final long available = source.getAvailable();
+
+    try (AutoCloseableLock ignored = lock.writeLock(null, null)) {
+      cachedAvailable = available;
+      cachedCapacity = capacity;
+    }
+  }
+
+  /** Updates {@code available} and {@code capacity} from the {@code source},
+   * sets {@code usedSpace} to the specified {@code used} value. */
+  private void updateCachedValues(long used) {
+    final long capacity = source.getCapacity();
+    final long available = source.getAvailable();
+
+    try (AutoCloseableLock ignored = lock.writeLock(null, null)) {
+      cachedAvailable = available;
+      cachedCapacity = capacity;
+      cachedUsedSpace = used;
+    }
   }
 
+  /** Refreshes all 3 values. */
   private void refresh() {
     //only one `refresh` can be running at a certain moment
     if (isRefreshRunning.compareAndSet(false, true)) {
       try {
-        cachedValue.set(source.getUsedSpace());
+        updateCachedValues(source.getUsedSpace());
       } catch (RuntimeException e) {
         LOG.warn("Error refreshing space usage for {}", source, e);
       } finally {
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/TestCachingSpaceUsageSource.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/TestCachingSpaceUsageSource.java
index 8523861000..b84ca130f2 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/TestCachingSpaceUsageSource.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/TestCachingSpaceUsageSource.java
@@ -36,19 +36,20 @@ import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link CachingSpaceUsageSource}.
  */
-public class TestCachingSpaceUsageSource {
+class TestCachingSpaceUsageSource {
 
   @TempDir
   private static File dir;
 
   @Test
-  public void providesInitialValueUntilStarted() {
+  void providesInitialValueUntilStarted() {
     final long initialValue = validInitialValue();
     SpaceUsageCheckParams params = paramsBuilder(new AtomicLong(initialValue))
         .withRefresh(Duration.ZERO)
@@ -57,10 +58,11 @@ public class TestCachingSpaceUsageSource {
     SpaceUsageSource subject = new CachingSpaceUsageSource(params);
 
     assertEquals(initialValue, subject.getUsedSpace());
+    assertAvailableWasUpdated(params.getSource(), subject);
   }
 
   @Test
-  public void ignoresMissingInitialValue() {
+  void ignoresMissingInitialValue() {
     SpaceUsageCheckParams params = paramsBuilder()
         .withRefresh(Duration.ZERO)
         .build();
@@ -68,10 +70,11 @@ public class TestCachingSpaceUsageSource {
     SpaceUsageSource subject = new CachingSpaceUsageSource(params);
 
     assertEquals(0, subject.getUsedSpace());
+    assertAvailableWasUpdated(params.getSource(), subject);
   }
 
   @Test
-  public void updatesValueFromSourceUponStartIfPeriodicRefreshNotConfigured() {
+  void updatesValueFromSourceUponStartIfPeriodicRefreshNotConfigured() {
     AtomicLong savedValue = new AtomicLong(validInitialValue());
     SpaceUsageCheckParams params = paramsBuilder(savedValue)
         .withRefresh(Duration.ZERO).build();
@@ -79,11 +82,11 @@ public class TestCachingSpaceUsageSource {
     CachingSpaceUsageSource subject = new CachingSpaceUsageSource(params);
     subject.start();
 
-    assertSubjectWasRefreshed(params.getSource().getUsedSpace(), subject);
+    assertSubjectWasRefreshed(params.getSource(), subject);
   }
 
   @Test
-  public void schedulesRefreshWithDelayIfConfigured() {
+  void schedulesRefreshWithDelayIfConfigured() {
     long initialValue = validInitialValue();
     AtomicLong savedValue = new AtomicLong(initialValue);
     SpaceUsageCheckParams params = paramsBuilder(savedValue)
@@ -96,13 +99,13 @@ public class TestCachingSpaceUsageSource {
     subject.start();
 
     verifyRefreshWasScheduled(executor, refresh.toMillis(), refresh);
-    assertSubjectWasRefreshed(params.getSource().getUsedSpace(), subject);
+    assertSubjectWasRefreshed(params.getSource(), subject);
     assertEquals(initialValue, savedValue.get(),
         "value should not have been saved to file yet");
   }
 
   @Test
-  public void schedulesImmediateRefreshIfInitialValueMissing() {
+  void schedulesImmediateRefreshIfInitialValueMissing() {
     final long initialValue = missingInitialValue();
     AtomicLong savedValue = new AtomicLong(initialValue);
     SpaceUsageCheckParams params = paramsBuilder(savedValue).build();
@@ -113,13 +116,13 @@ public class TestCachingSpaceUsageSource {
     subject.start();
 
     verifyRefreshWasScheduled(executor, 0L, params.getRefresh());
-    assertSubjectWasRefreshed(params.getSource().getUsedSpace(), subject);
+    assertSubjectWasRefreshed(params.getSource(), subject);
     assertEquals(initialValue, savedValue.get(),
         "value should not have been saved to file yet");
   }
 
   @Test
-  public void savesValueOnShutdown() {
+  void savesValueOnShutdown() {
     AtomicLong savedValue = new AtomicLong(validInitialValue());
     SpaceUsageSource source = mock(SpaceUsageSource.class);
     final long usedSpace = 4L;
@@ -138,22 +141,44 @@ public class TestCachingSpaceUsageSource {
         "value should have been saved to file");
     assertEquals(usedSpace, subject.getUsedSpace(),
         "no further updates from source expected");
-    verify(future).cancel(true);
+    verify(future, times(2)).cancel(true);
     verify(executor).shutdown();
   }
 
   @Test
-  public void testDecrementDoesNotGoNegative() {
+  void decrementUsedSpaceMoreThanCurrent() {
     SpaceUsageCheckParams params = paramsBuilder(new AtomicLong(50))
         .withRefresh(Duration.ZERO)
         .build();
     CachingSpaceUsageSource subject = new CachingSpaceUsageSource(params);
+    SpaceUsageSource original = subject.snapshot();
 
     // Try to decrement more than the current value
-    subject.decrementUsedSpace(100);
+    final long change = original.getUsedSpace() * 2;
+    subject.decrementUsedSpace(change);
 
-    // Check that the value has been set to 0
+    // should not drop below 0
     assertEquals(0, subject.getUsedSpace());
+    // available and used change by same amount (in opposite directions)
+    assertEquals(original.getAvailable() + original.getUsedSpace(), 
subject.getAvailable());
+  }
+
+  @Test
+  void decrementAvailableSpaceMoreThanCurrent() {
+    SpaceUsageCheckParams params = paramsBuilder(new AtomicLong(50))
+        .withRefresh(Duration.ZERO)
+        .build();
+    CachingSpaceUsageSource subject = new CachingSpaceUsageSource(params);
+    SpaceUsageSource original = subject.snapshot();
+
+    // Try to decrement more than the current value
+    final long change = original.getAvailable() * 2;
+    subject.incrementUsedSpace(change);
+
+    // should not drop below 0
+    assertEquals(0, subject.getAvailable());
+    // available and used change by same amount (in opposite directions)
+    assertEquals(original.getUsedSpace() + original.getAvailable(), 
subject.getUsedSpace());
   }
 
   private static long missingInitialValue() {
@@ -197,14 +222,29 @@ public class TestCachingSpaceUsageSource {
       ScheduledExecutorService executor, long expectedInitialDelay,
       Duration refresh) {
 
+    // refresh usedSpace
     verify(executor).scheduleWithFixedDelay(any(), eq(expectedInitialDelay),
         eq(refresh.toMillis()), eq(TimeUnit.MILLISECONDS));
+
+    // update available/capacity
+    final long oneMinute = Duration.ofMinutes(1).toMillis();
+    final long delay = Math.min(refresh.toMillis(), oneMinute);
+    verify(executor).scheduleWithFixedDelay(any(), eq(delay),
+        eq(delay), eq(TimeUnit.MILLISECONDS));
+  }
+
+  private static void assertAvailableWasUpdated(SpaceUsageSource source,
+      SpaceUsageSource subject) {
+
+    assertEquals(source.getCapacity(), subject.getCapacity());
+    assertEquals(source.getAvailable(), subject.getAvailable());
   }
 
-  private static void assertSubjectWasRefreshed(long expected,
+  private static void assertSubjectWasRefreshed(SpaceUsageSource source,
       SpaceUsageSource subject) {
 
-    assertEquals(expected, subject.getUsedSpace(),
+    assertAvailableWasUpdated(source, subject);
+    assertEquals(source.getUsedSpace(), subject.getUsedSpace(),
         "subject should have been refreshed");
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to