This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b428407768 HDDS-9842. Cache volume capacity and available space (#6383)
b428407768 is described below
commit b428407768dad4ddd10d265179fc24902e40ef26
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Jul 9 00:07:43 2024 +0200
HDDS-9842. Cache volume capacity and available space (#6383)
---
.../hadoop/hdds/fs/CachingSpaceUsageSource.java | 134 ++++++++++++++++-----
.../hdds/fs/TestCachingSpaceUsageSource.java | 72 ++++++++---
2 files changed, 160 insertions(+), 46 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/fs/CachingSpaceUsageSource.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/fs/CachingSpaceUsageSource.java
index b9a2f87a03..9b9719386b 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/fs/CachingSpaceUsageSource.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/fs/CachingSpaceUsageSource.java
@@ -17,10 +17,12 @@
*/
package org.apache.hadoop.hdds.fs;
-import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
+import org.apache.ratis.util.AutoCloseableLock;
+import org.apache.ratis.util.AutoCloseableReadWriteLock;
+import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +33,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -46,12 +47,16 @@ public class CachingSpaceUsageSource implements
SpaceUsageSource {
LoggerFactory.getLogger(CachingSpaceUsageSource.class);
private final ScheduledExecutorService executor;
- private final AtomicLong cachedValue = new AtomicLong();
+ private final AutoCloseableReadWriteLock lock;
+ private long cachedUsedSpace;
+ private long cachedAvailable;
+ private long cachedCapacity;
private final Duration refresh;
private final SpaceUsageSource source;
private final SpaceUsagePersistence persistence;
private boolean running;
- private ScheduledFuture<?> scheduledFuture;
+ private ScheduledFuture<?> updateUsedSpaceFuture;
+ private ScheduledFuture<?> updateAvailableFuture;
private final AtomicBoolean isRefreshRunning;
public CachingSpaceUsageSource(SpaceUsageCheckParams params) {
@@ -60,15 +65,16 @@ public class CachingSpaceUsageSource implements
SpaceUsageSource {
CachingSpaceUsageSource(SpaceUsageCheckParams params,
ScheduledExecutorService executor) {
- Preconditions.checkArgument(params != null, "params == null");
+ Preconditions.assertNotNull(params, "params == null");
refresh = params.getRefresh();
source = params.getSource();
+ lock = new AutoCloseableReadWriteLock(source.toString());
persistence = params.getPersistence();
this.executor = executor;
isRefreshRunning = new AtomicBoolean();
- Preconditions.checkArgument(refresh.isZero() == (executor == null),
+ Preconditions.assertTrue(refresh.isZero() == (executor == null),
"executor should be provided if and only if refresh is requested");
loadInitialValue();
@@ -76,45 +82,81 @@ public class CachingSpaceUsageSource implements
SpaceUsageSource {
@Override
public long getCapacity() {
- return source.getCapacity();
+ try (AutoCloseableLock ignored = lock.readLock(null, null)) {
+ return cachedCapacity;
+ }
}
@Override
public long getAvailable() {
- return source.getAvailable();
+ try (AutoCloseableLock ignored = lock.readLock(null, null)) {
+ return cachedAvailable;
+ }
}
@Override
public long getUsedSpace() {
- return cachedValue.get();
+ try (AutoCloseableLock ignored = lock.readLock(null, null)) {
+ return cachedUsedSpace;
+ }
+ }
+
+ @Override
+ public SpaceUsageSource snapshot() {
+ try (AutoCloseableLock ignored = lock.readLock(null, null)) {
+ return new Fixed(cachedCapacity, cachedAvailable, cachedUsedSpace);
+ }
}
public void incrementUsedSpace(long usedSpace) {
- cachedValue.addAndGet(usedSpace);
+ if (usedSpace == 0) {
+ return;
+ }
+ Preconditions.assertTrue(usedSpace > 0, () -> usedSpace + " < 0");
+ final long current, change;
+ try (AutoCloseableLock ignored = lock.writeLock(null, null)) {
+ current = cachedAvailable;
+ change = Math.min(current, usedSpace);
+ cachedAvailable -= change;
+ cachedUsedSpace += change;
+ }
+
+ if (change != usedSpace) {
+ LOG.warn("Attempted to decrement available space to a negative value.
Current: {}, Decrement: {}, Source: {}",
+ current, usedSpace, source);
+ }
}
public void decrementUsedSpace(long reclaimedSpace) {
- cachedValue.updateAndGet(current -> {
- long newValue = current - reclaimedSpace;
- if (newValue < 0) {
- if (current > 0) {
- LOG.warn("Attempted to decrement used space to a negative value. " +
- "Current: {}, Decrement: {}, Source: {}",
- current, reclaimedSpace, source);
- }
- return 0;
- } else {
- return newValue;
- }
- });
+ if (reclaimedSpace == 0) {
+ return;
+ }
+ Preconditions.assertTrue(reclaimedSpace > 0, () -> reclaimedSpace + " <
0");
+ final long current, change;
+ try (AutoCloseableLock ignored = lock.writeLock(null, null)) {
+ current = cachedUsedSpace;
+ change = Math.min(current, reclaimedSpace);
+ cachedUsedSpace -= change;
+ cachedAvailable += change;
+ }
+
+ if (change != reclaimedSpace) {
+ LOG.warn("Attempted to decrement used space to a negative value.
Current: {}, Decrement: {}, Source: {}",
+ current, reclaimedSpace, source);
+ }
}
public void start() {
if (executor != null) {
- long initialDelay = cachedValue.get() > 0 ? refresh.toMillis() : 0;
+ long initialDelay = getUsedSpace() > 0 ? refresh.toMillis() : 0;
if (!running) {
- scheduledFuture = executor.scheduleWithFixedDelay(
+ updateUsedSpaceFuture = executor.scheduleWithFixedDelay(
this::refresh, initialDelay, refresh.toMillis(), MILLISECONDS);
+
+ long availableUpdateDelay = Math.min(refresh.toMillis(),
Duration.ofMinutes(1).toMillis());
+ updateAvailableFuture = executor.scheduleWithFixedDelay(
+ this::updateAvailable, availableUpdateDelay, availableUpdateDelay,
MILLISECONDS);
+
running = true;
}
} else {
@@ -126,8 +168,13 @@ public class CachingSpaceUsageSource implements
SpaceUsageSource {
persistence.save(this); // save cached value
if (executor != null) {
- if (running && scheduledFuture != null) {
- scheduledFuture.cancel(true);
+ if (running) {
+ if (updateUsedSpaceFuture != null) {
+ updateUsedSpaceFuture.cancel(true);
+ }
+ if (updateAvailableFuture != null) {
+ updateAvailableFuture.cancel(true);
+ }
}
running = false;
@@ -135,21 +182,48 @@ public class CachingSpaceUsageSource implements
SpaceUsageSource {
}
}
+ /** Schedule immediate refresh. */
public void refreshNow() {
- //refresh immediately
executor.schedule(this::refresh, 0, MILLISECONDS);
}
+ /** Loads {@code usedSpace} value from persistent source, if present.
+ * Also updates {@code available} and {@code capacity} from the {@code
source}. */
private void loadInitialValue() {
final OptionalLong initialValue = persistence.load();
- initialValue.ifPresent(cachedValue::set);
+ updateCachedValues(initialValue.orElse(0));
+ }
+
+ /** Updates {@code available} and {@code capacity} from the {@code source}.
*/
+ private void updateAvailable() {
+ final long capacity = source.getCapacity();
+ final long available = source.getAvailable();
+
+ try (AutoCloseableLock ignored = lock.writeLock(null, null)) {
+ cachedAvailable = available;
+ cachedCapacity = capacity;
+ }
+ }
+
+ /** Updates {@code available} and {@code capacity} from the {@code source},
+ * sets {@code usedSpace} to the specified {@code used} value. */
+ private void updateCachedValues(long used) {
+ final long capacity = source.getCapacity();
+ final long available = source.getAvailable();
+
+ try (AutoCloseableLock ignored = lock.writeLock(null, null)) {
+ cachedAvailable = available;
+ cachedCapacity = capacity;
+ cachedUsedSpace = used;
+ }
}
+ /** Refreshes all 3 values. */
private void refresh() {
//only one `refresh` can be running at a certain moment
if (isRefreshRunning.compareAndSet(false, true)) {
try {
- cachedValue.set(source.getUsedSpace());
+ updateCachedValues(source.getUsedSpace());
} catch (RuntimeException e) {
LOG.warn("Error refreshing space usage for {}", source, e);
} finally {
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/TestCachingSpaceUsageSource.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/TestCachingSpaceUsageSource.java
index 8523861000..b84ca130f2 100644
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/TestCachingSpaceUsageSource.java
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/fs/TestCachingSpaceUsageSource.java
@@ -36,19 +36,20 @@ import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Tests for {@link CachingSpaceUsageSource}.
*/
-public class TestCachingSpaceUsageSource {
+class TestCachingSpaceUsageSource {
@TempDir
private static File dir;
@Test
- public void providesInitialValueUntilStarted() {
+ void providesInitialValueUntilStarted() {
final long initialValue = validInitialValue();
SpaceUsageCheckParams params = paramsBuilder(new AtomicLong(initialValue))
.withRefresh(Duration.ZERO)
@@ -57,10 +58,11 @@ public class TestCachingSpaceUsageSource {
SpaceUsageSource subject = new CachingSpaceUsageSource(params);
assertEquals(initialValue, subject.getUsedSpace());
+ assertAvailableWasUpdated(params.getSource(), subject);
}
@Test
- public void ignoresMissingInitialValue() {
+ void ignoresMissingInitialValue() {
SpaceUsageCheckParams params = paramsBuilder()
.withRefresh(Duration.ZERO)
.build();
@@ -68,10 +70,11 @@ public class TestCachingSpaceUsageSource {
SpaceUsageSource subject = new CachingSpaceUsageSource(params);
assertEquals(0, subject.getUsedSpace());
+ assertAvailableWasUpdated(params.getSource(), subject);
}
@Test
- public void updatesValueFromSourceUponStartIfPeriodicRefreshNotConfigured() {
+ void updatesValueFromSourceUponStartIfPeriodicRefreshNotConfigured() {
AtomicLong savedValue = new AtomicLong(validInitialValue());
SpaceUsageCheckParams params = paramsBuilder(savedValue)
.withRefresh(Duration.ZERO).build();
@@ -79,11 +82,11 @@ public class TestCachingSpaceUsageSource {
CachingSpaceUsageSource subject = new CachingSpaceUsageSource(params);
subject.start();
- assertSubjectWasRefreshed(params.getSource().getUsedSpace(), subject);
+ assertSubjectWasRefreshed(params.getSource(), subject);
}
@Test
- public void schedulesRefreshWithDelayIfConfigured() {
+ void schedulesRefreshWithDelayIfConfigured() {
long initialValue = validInitialValue();
AtomicLong savedValue = new AtomicLong(initialValue);
SpaceUsageCheckParams params = paramsBuilder(savedValue)
@@ -96,13 +99,13 @@ public class TestCachingSpaceUsageSource {
subject.start();
verifyRefreshWasScheduled(executor, refresh.toMillis(), refresh);
- assertSubjectWasRefreshed(params.getSource().getUsedSpace(), subject);
+ assertSubjectWasRefreshed(params.getSource(), subject);
assertEquals(initialValue, savedValue.get(),
"value should not have been saved to file yet");
}
@Test
- public void schedulesImmediateRefreshIfInitialValueMissing() {
+ void schedulesImmediateRefreshIfInitialValueMissing() {
final long initialValue = missingInitialValue();
AtomicLong savedValue = new AtomicLong(initialValue);
SpaceUsageCheckParams params = paramsBuilder(savedValue).build();
@@ -113,13 +116,13 @@ public class TestCachingSpaceUsageSource {
subject.start();
verifyRefreshWasScheduled(executor, 0L, params.getRefresh());
- assertSubjectWasRefreshed(params.getSource().getUsedSpace(), subject);
+ assertSubjectWasRefreshed(params.getSource(), subject);
assertEquals(initialValue, savedValue.get(),
"value should not have been saved to file yet");
}
@Test
- public void savesValueOnShutdown() {
+ void savesValueOnShutdown() {
AtomicLong savedValue = new AtomicLong(validInitialValue());
SpaceUsageSource source = mock(SpaceUsageSource.class);
final long usedSpace = 4L;
@@ -138,22 +141,44 @@ public class TestCachingSpaceUsageSource {
"value should have been saved to file");
assertEquals(usedSpace, subject.getUsedSpace(),
"no further updates from source expected");
- verify(future).cancel(true);
+ verify(future, times(2)).cancel(true);
verify(executor).shutdown();
}
@Test
- public void testDecrementDoesNotGoNegative() {
+ void decrementUsedSpaceMoreThanCurrent() {
SpaceUsageCheckParams params = paramsBuilder(new AtomicLong(50))
.withRefresh(Duration.ZERO)
.build();
CachingSpaceUsageSource subject = new CachingSpaceUsageSource(params);
+ SpaceUsageSource original = subject.snapshot();
// Try to decrement more than the current value
- subject.decrementUsedSpace(100);
+ final long change = original.getUsedSpace() * 2;
+ subject.decrementUsedSpace(change);
- // Check that the value has been set to 0
+ // should not drop below 0
assertEquals(0, subject.getUsedSpace());
+ // available and used change by same amount (in opposite directions)
+ assertEquals(original.getAvailable() + original.getUsedSpace(),
subject.getAvailable());
+ }
+
+ @Test
+ void decrementAvailableSpaceMoreThanCurrent() {
+ SpaceUsageCheckParams params = paramsBuilder(new AtomicLong(50))
+ .withRefresh(Duration.ZERO)
+ .build();
+ CachingSpaceUsageSource subject = new CachingSpaceUsageSource(params);
+ SpaceUsageSource original = subject.snapshot();
+
+ // Try to decrement more than the current value
+ final long change = original.getAvailable() * 2;
+ subject.incrementUsedSpace(change);
+
+ // should not drop below 0
+ assertEquals(0, subject.getAvailable());
+ // available and used change by same amount (in opposite directions)
+ assertEquals(original.getUsedSpace() + original.getAvailable(),
subject.getUsedSpace());
}
private static long missingInitialValue() {
@@ -197,14 +222,29 @@ public class TestCachingSpaceUsageSource {
ScheduledExecutorService executor, long expectedInitialDelay,
Duration refresh) {
+ // refresh usedSpace
verify(executor).scheduleWithFixedDelay(any(), eq(expectedInitialDelay),
eq(refresh.toMillis()), eq(TimeUnit.MILLISECONDS));
+
+ // update available/capacity
+ final long oneMinute = Duration.ofMinutes(1).toMillis();
+ final long delay = Math.min(refresh.toMillis(), oneMinute);
+ verify(executor).scheduleWithFixedDelay(any(), eq(delay),
+ eq(delay), eq(TimeUnit.MILLISECONDS));
+ }
+
+ private static void assertAvailableWasUpdated(SpaceUsageSource source,
+ SpaceUsageSource subject) {
+
+ assertEquals(source.getCapacity(), subject.getCapacity());
+ assertEquals(source.getAvailable(), subject.getAvailable());
}
- private static void assertSubjectWasRefreshed(long expected,
+ private static void assertSubjectWasRefreshed(SpaceUsageSource source,
SpaceUsageSource subject) {
- assertEquals(expected, subject.getUsedSpace(),
+ assertAvailableWasUpdated(source, subject);
+ assertEquals(source.getUsedSpace(), subject.getUsedSpace(),
"subject should have been refreshed");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]