This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new e3361d6d3 [#901] feat(server): respect disk capacity watermark rather
than uniffle capacity (#1337)
e3361d6d3 is described below
commit e3361d6d3c069eec10c56954b714ec674e341166
Author: Junfan Zhang <[email protected]>
AuthorDate: Sat Dec 2 20:25:09 2023 +0800
[#901] feat(server): respect disk capacity watermark rather than uniffle
capacity (#1337)
### What changes were proposed in this pull request?
respect disk capacity watermark rather than uniffle capacity
### Why are the changes needed?
Fix: #901
### Does this PR introduce _any_ user-facing change?
Yes.
`rss.server.disk-capacity.watermark.check.enabled` is introduced.
### How was this patch tested?
UTs
---
docs/server_guide.md | 1 +
.../apache/uniffle/server/LocalStorageChecker.java | 25 ++++++-----
.../apache/uniffle/server/ShuffleServerConf.java | 10 +++++
.../server/storage/LocalStorageManager.java | 11 +++--
.../apache/uniffle/server/StorageCheckerTest.java | 6 ++-
.../uniffle/storage/common/LocalStorage.java | 51 +++++++++++++++++++---
.../uniffle/storage/common/LocalStorageTest.java | 44 +++++++++++++++++++
7 files changed, 128 insertions(+), 20 deletions(-)
diff --git a/docs/server_guide.md b/docs/server_guide.md
index f4b5efb82..68c97bc2d 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -100,6 +100,7 @@ This document will introduce how to deploy Uniffle shuffle
servers.
| rss.server.max.concurrency.limit.of.per-partition.write | -
| The limit for max
concurrency per-partition write specified by client, this won't be enabled by
default.
[...]
| rss.metrics.reporter.class | -
| The class of metrics
reporter.
[...]
| rss.server.hyrbrid.storage.manager.selector.class |
org.apache.uniffle.server.storage.hybrid.DefaultStorageManagerSelector | The
manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is
`DefaultStorageManagerSelector`, and another
`HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's
data to cold storage.
[...]
+| rss.server.disk-capacity.watermark.check.enabled | false
| If it is co-located with
other services, the high-low watermark check based on the uniffle used is not
correct. Due to this, the whole disk capacity watermark check is necessary,
which will reuse the current watermark value. It will be disabled by default.
[...]
### Advanced Configurations
|Property Name|Default| Description
|
diff --git
a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
index 8f30663ab..ac30baff9 100644
--- a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
+++ b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
@@ -93,11 +93,15 @@ public class LocalStorageChecker extends Checker {
return;
}
- totalSpace.addAndGet(getTotalSpace(storageInfo.storageDir));
-
wholeDiskUsedSpace.addAndGet(getWholeDiskUsedSpace(storageInfo.storageDir));
+ long total = getTotalSpace(storageInfo.storageDir);
+ long free = getFreeSpace(storageInfo.storageDir);
+
+ totalSpace.addAndGet(total);
+ wholeDiskUsedSpace.addAndGet(total - free);
serviceUsedSpace.addAndGet(getServiceUsedSpace(storageInfo.storageDir));
- if (storageInfo.checkIsSpaceEnough()) {
+ storageInfo.updateStorageFreeSpace(free);
+ if (storageInfo.checkIsSpaceEnough(total, free)) {
num.incrementAndGet();
}
cdl.countDown();
@@ -144,10 +148,8 @@ public class LocalStorageChecker extends Checker {
return file.getTotalSpace();
}
- // Only for testing
- @VisibleForTesting
- long getWholeDiskUsedSpace(File file) {
- return file.getTotalSpace() - file.getUsableSpace();
+ long getFreeSpace(File file) {
+ return file.getUsableSpace();
}
protected static long getServiceUsedSpace(File storageDir) {
@@ -190,13 +192,16 @@ public class LocalStorageChecker extends Checker {
this.storage = storage;
}
- boolean checkIsSpaceEnough() {
+ void updateStorageFreeSpace(long free) {
+ storage.updateDiskFree(free);
+ }
- if (Double.compare(0.0, getTotalSpace(storageDir)) == 0) {
+ boolean checkIsSpaceEnough(long total, long free) {
+ if (Double.compare(0.0, total) == 0) {
this.isHealthy = false;
return false;
}
- double usagePercent = getWholeDiskUsedSpace(storageDir) * 100.0 /
getTotalSpace(storageDir);
+ double usagePercent = (total - free) * 100.0 / total;
if (isHealthy) {
if (Double.compare(usagePercent, diskMaxUsagePercentage) >= 0) {
isHealthy = false;
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 5237da481..fe45c2360 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -189,6 +189,16 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(85.0)
.withDescription("If disk usage is smaller than this value, disk can
been written again");
+ public static final ConfigOption<Boolean>
DISK_CAPACITY_WATERMARK_CHECK_ENABLED =
+ ConfigOptions.key("rss.server.disk-capacity.watermark.check.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If it is co-located with other services, the high-low watermark
check "
+ + "based on the uniffle used is not correct. Due to this,
the whole disk capacity "
+ + "watermark check is necessary, which will reuse the
current watermark value. "
+ + "It will be disabled by default.");
+
public static final ConfigOption<Long> PENDING_EVENT_TIMEOUT_SEC =
ConfigOptions.key("rss.server.pending.event.timeout.sec")
.longType()
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index ea47ae1ad..229b309a1 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -72,6 +72,7 @@ import
org.apache.uniffle.storage.request.CreateShuffleDeleteHandlerRequest;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
import org.apache.uniffle.storage.util.StorageType;
+import static
org.apache.uniffle.server.ShuffleServerConf.DISK_CAPACITY_WATERMARK_CHECK_ENABLED;
import static
org.apache.uniffle.server.ShuffleServerConf.LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER;
public class LocalStorageManager extends SingleStorageManager {
@@ -113,6 +114,7 @@ public class LocalStorageManager extends
SingleStorageManager {
}
ExecutorService executorService =
ThreadUtils.getDaemonCachedThreadPool("LocalStorage-check");
LocalStorage[] localStorageArray = new
LocalStorage[storageBasePaths.size()];
+ boolean isDiskCapacityWatermarkCheckEnabled =
conf.get(DISK_CAPACITY_WATERMARK_CHECK_ENABLED);
for (int i = 0; i < storageBasePaths.size(); i++) {
final int idx = i;
String storagePath = storageBasePaths.get(i);
@@ -120,15 +122,18 @@ public class LocalStorageManager extends
SingleStorageManager {
() -> {
try {
StorageMedia storageType =
getStorageTypeForBasePath(storagePath);
- localStorageArray[idx] =
+ LocalStorage.Builder builder =
LocalStorage.newBuilder()
.basePath(storagePath)
.capacity(capacity)
.ratio(ratio)
.lowWaterMarkOfWrite(lowWaterMarkOfWrite)
.highWaterMarkOfWrite(highWaterMarkOfWrite)
- .localStorageMedia(storageType)
- .build();
+ .localStorageMedia(storageType);
+ if (isDiskCapacityWatermarkCheckEnabled) {
+ builder.enableDiskCapacityWatermarkCheck();
+ }
+ localStorageArray[idx] = builder.build();
successCount.incrementAndGet();
} catch (Exception e) {
LOG.error("LocalStorage init failed!", e);
diff --git
a/server/src/test/java/org/apache/uniffle/server/StorageCheckerTest.java
b/server/src/test/java/org/apache/uniffle/server/StorageCheckerTest.java
index cf543b693..4892a4343 100644
--- a/server/src/test/java/org/apache/uniffle/server/StorageCheckerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/StorageCheckerTest.java
@@ -130,9 +130,13 @@ public class StorageCheckerTest {
return 1000;
}
+ @Override
+ long getFreeSpace(File file) {
+ return getTotalSpace(file) - getWholeDiskUsedSpace(file);
+ }
+
// we mock this method, and will return different values according
// to call times.
- @Override
long getWholeDiskUsedSpace(File file) {
long result = 0;
switch (file.getName()) {
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index a58b449e1..76371fdb9 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -45,6 +45,11 @@ public class LocalStorage extends AbstractStorage {
private static final Logger LOG =
LoggerFactory.getLogger(LocalStorage.class);
public static final String STORAGE_HOST = "local";
+ private final long diskCapacity;
+ private volatile long diskFree;
+ // for test cases
+ private boolean enableDiskCapacityCheck = false;
+
private long capacity;
private final String basePath;
private final String mountPoint;
@@ -61,6 +66,7 @@ public class LocalStorage extends AbstractStorage {
this.lowWaterMarkOfWrite = builder.lowWaterMarkOfWrite;
this.capacity = builder.capacity;
this.media = builder.media;
+ this.enableDiskCapacityCheck = builder.enableDiskCapacityWatermarkCheck;
File baseFolder = new File(basePath);
try {
@@ -74,16 +80,18 @@ public class LocalStorage extends AbstractStorage {
LOG.warn("Init base directory " + basePath + " fail, the disk should be
corrupted", ioe);
throw new RssException(ioe);
}
+ this.diskCapacity = baseFolder.getTotalSpace();
+ this.diskFree = baseFolder.getUsableSpace();
+
if (capacity < 0L) {
- long totalSpace = baseFolder.getTotalSpace();
- this.capacity = (long) (totalSpace * builder.ratio);
+ this.capacity = (long) (diskCapacity * builder.ratio);
LOG.info(
"The `rss.server.disk.capacity` is not specified nor negative, the "
+ "ratio(`rss.server.disk.capacity.ratio`:{}) * disk space({})
is used, ",
builder.ratio,
- totalSpace);
+ diskCapacity);
} else {
- long freeSpace = baseFolder.getFreeSpace();
+ final long freeSpace = diskFree;
if (freeSpace < capacity) {
throw new IllegalArgumentException(
"The Disk of "
@@ -142,13 +150,34 @@ public class LocalStorage extends AbstractStorage {
basePath);
}
+ // only for tests.
+ @VisibleForTesting
+ public void enableDiskCapacityCheck() {
+ this.enableDiskCapacityCheck = true;
+ }
+
+ public long getDiskCapacity() {
+ return diskCapacity;
+ }
+
@Override
public boolean canWrite() {
+ boolean serviceUsedCapacityCheck;
+ boolean diskUsedCapacityCheck;
+
if (isSpaceEnough) {
- isSpaceEnough = metaData.getDiskSize().doubleValue() * 100 / capacity <
highWaterMarkOfWrite;
+ serviceUsedCapacityCheck =
+ metaData.getDiskSize().doubleValue() * 100 / capacity <
highWaterMarkOfWrite;
+ diskUsedCapacityCheck =
+ ((double) (diskCapacity - diskFree)) * 100 / diskCapacity <
highWaterMarkOfWrite;
} else {
- isSpaceEnough = metaData.getDiskSize().doubleValue() * 100 / capacity <
lowWaterMarkOfWrite;
+ serviceUsedCapacityCheck =
+ metaData.getDiskSize().doubleValue() * 100 / capacity <
lowWaterMarkOfWrite;
+ diskUsedCapacityCheck =
+ ((double) (diskCapacity - diskFree)) * 100 / diskCapacity <
lowWaterMarkOfWrite;
}
+ isSpaceEnough =
+ serviceUsedCapacityCheck && (enableDiskCapacityCheck ?
diskUsedCapacityCheck : true);
return isSpaceEnough && !isCorrupted;
}
@@ -237,6 +266,10 @@ public class LocalStorage extends AbstractStorage {
return appIds;
}
+ public void updateDiskFree(long free) {
+ this.diskFree = free;
+ }
+
// Only for test
@VisibleForTesting
public void markSpaceFull() {
@@ -250,6 +283,7 @@ public class LocalStorage extends AbstractStorage {
private double highWaterMarkOfWrite;
private String basePath;
private StorageMedia media;
+ private boolean enableDiskCapacityWatermarkCheck;
private Builder() {}
@@ -283,6 +317,11 @@ public class LocalStorage extends AbstractStorage {
return this;
}
+ public Builder enableDiskCapacityWatermarkCheck() {
+ this.enableDiskCapacityWatermarkCheck = true;
+ return this;
+ }
+
public LocalStorage build() {
return new LocalStorage(this);
}
diff --git
a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
index 3dcffda19..68b4aaf3a 100644
---
a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
+++
b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
@@ -165,4 +165,48 @@ public class LocalStorageTest {
item.getOrCreateWriteHandler(request);
assertTrue(item.containsWriteHandler(appId, 0, 1));
}
+
+ @Test
+ public void canWriteTestWithDiskCapacityCheck() {
+ // capacity < diskCapacity
+ LocalStorage localStorage =
+ LocalStorage.newBuilder()
+ .basePath(testBaseDir.getAbsolutePath())
+ .highWaterMarkOfWrite(95)
+ .lowWaterMarkOfWrite(80)
+ .capacity(100)
+ .enableDiskCapacityWatermarkCheck()
+ .build();
+
+ localStorage.getMetaData().updateDiskSize(20);
+ assertTrue(localStorage.canWrite());
+ localStorage.getMetaData().updateDiskSize(65);
+ assertTrue(localStorage.canWrite());
+
+ final long diskCapacity = localStorage.getDiskCapacity();
+ localStorage.updateDiskFree((long) (diskCapacity * (1 - 0.96)));
+ assertFalse(localStorage.canWrite());
+
+ localStorage.updateDiskFree((long) (diskCapacity * (1 - 0.60)));
+ assertFalse(localStorage.canWrite());
+ localStorage.getMetaData().updateDiskSize(-10);
+ assertTrue(localStorage.canWrite());
+
+ // capacity = diskCapacity
+ localStorage =
+ LocalStorage.newBuilder()
+ .basePath(testBaseDir.getAbsolutePath())
+ .highWaterMarkOfWrite(95)
+ .lowWaterMarkOfWrite(80)
+ .capacity(-1)
+ .ratio(1)
+ .enableDiskCapacityWatermarkCheck()
+ .build();
+
+ localStorage.updateDiskFree((long) (diskCapacity * (1 - 0.96)));
+ assertFalse(localStorage.canWrite());
+
+ localStorage.updateDiskFree((long) (diskCapacity * (1 - 0.60)));
+ assertTrue(localStorage.canWrite());
+ }
}