This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 416ecfa0d [#1684] fix(server): use the diskSize obtained from periodic
check to determine whether is writable (#1685)
416ecfa0d is described below
commit 416ecfa0d2c0c6f12391e82110f20106a70bb005
Author: xianjingfeng <[email protected]>
AuthorDate: Sat May 11 09:56:41 2024 +0800
[#1684] fix(server): use the diskSize obtained from periodic check to
determine whether is writable (#1685)
### What changes were proposed in this pull request?
Use the disk size obtained from periodic check to determine whether the
disk can be written.
### Why are the changes needed?
The disk size obtained from metadata is unreliable.
Fix: #1684
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing UTs
(cherry picked from commit 40bd14b9368e748b4d8ac6585c645a02cafad100)
---
.../apache/uniffle/server/LocalStorageChecker.java | 29 ++++++++++++-------
.../server/storage/LocalStorageManager.java | 2 +-
.../uniffle/server/ShuffleFlushManagerTest.java | 5 ++++
.../uniffle/storage/common/LocalStorage.java | 33 +++++++++++++---------
.../uniffle/storage/common/LocalStorageTest.java | 24 ++++++++--------
5 files changed, 56 insertions(+), 37 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
index 0554035d6..d21e0bd4a 100644
--- a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
+++ b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
@@ -109,14 +109,15 @@ public class LocalStorageChecker extends Checker {
}
long total = getTotalSpace(storageInfo.storageDir);
- long free = getFreeSpace(storageInfo.storageDir);
+ long availableBytes = getFreeSpace(storageInfo.storageDir);
totalSpace.addAndGet(total);
- wholeDiskUsedSpace.addAndGet(total - free);
-
serviceUsedSpace.addAndGet(getServiceUsedSpace(storageInfo.storageDir));
-
- storageInfo.updateStorageFreeSpace(free);
- if (storageInfo.checkIsSpaceEnough(total, free)) {
+ wholeDiskUsedSpace.addAndGet(total - availableBytes);
+ long usedBytes = getServiceUsedSpace(storageInfo.storageDir);
+ serviceUsedSpace.addAndGet(usedBytes);
+ storageInfo.updateServiceUsedBytes(usedBytes);
+ storageInfo.updateStorageFreeSpace(availableBytes);
+ if (storageInfo.checkIsSpaceEnough(total, availableBytes)) {
num.incrementAndGet();
}
return null;
@@ -231,16 +232,20 @@ public class LocalStorageChecker extends Checker {
this.storage = storage;
}
- void updateStorageFreeSpace(long free) {
- storage.updateDiskFree(free);
+ void updateStorageFreeSpace(long availableBytes) {
+ storage.updateDiskAvailableBytes(availableBytes);
+ }
+
+ void updateServiceUsedBytes(long usedBytes) {
+ storage.updateServiceUsedBytes(usedBytes);
}
- boolean checkIsSpaceEnough(long total, long free) {
+ boolean checkIsSpaceEnough(long total, long availableBytes) {
if (Double.compare(0.0, total) == 0) {
this.isHealthy = false;
return false;
}
- double usagePercent = (total - free) * 100.0 / total;
+ double usagePercent = (total - availableBytes) * 100.0 / total;
if (isHealthy) {
if (Double.compare(usagePercent, diskMaxUsagePercentage) >= 0) {
isHealthy = false;
@@ -255,6 +260,10 @@ public class LocalStorageChecker extends Checker {
return isHealthy;
}
+ boolean canWrite() {
+ return storage.canWrite();
+ }
+
boolean checkStorageReadAndWrite() {
if (storage.isCorrupted()) {
return false;
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 772aae1d0..bd6ca2128 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -380,7 +380,7 @@ public class LocalStorageManager extends
SingleStorageManager {
for (LocalStorage storage : localStorages) {
String mountPoint = storage.getMountPoint();
long capacity = storage.getCapacity();
- long wroteBytes = storage.getDiskSize();
+ long wroteBytes = storage.getServiceUsedBytes();
StorageStatus status = StorageStatus.NORMAL;
if (storage.isCorrupted()) {
status = StorageStatus.UNHEALTHY;
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 274c2cbbb..2b21445e3 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -343,6 +343,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase
{
int storageIndex1 =
storagePaths.indexOf(flushEvent.getUnderStorage().getStoragePath());
validateLocalMetadata(storageManager, storageIndex1, 1010L);
+ storageManager.getStorageChecker().checkIsHealthy();
flushEvent = createShuffleDataFlushEvent(appId, 3, 1, 1, 10, 102, null);
manager.addToFlushQueue(flushEvent);
// wait for write data
@@ -737,6 +738,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase
{
Thread.sleep(1000);
assertTrue(event.getUnderStorage() instanceof LocalStorage);
+ storageManager.getStorageChecker().checkIsHealthy();
// case2: huge event is written to cold storage directly
event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100000);
flushManager.addToFlushQueue(event);
@@ -753,6 +755,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase
{
((HybridStorageManager)
storageManager).getWarmStorageManager().selectStorage(event));
((HybridStorageManager)
storageManager).getWarmStorageManager().updateWriteMetrics(bigEvent, 0);
+ storageManager.getStorageChecker().checkIsHealthy();
event = createShuffleDataFlushEvent(appId, 3, 1, 1, null, 100);
flushManager.addToFlushQueue(event);
Thread.sleep(1000);
@@ -793,6 +796,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase
{
assertEquals(0, event.getRetryTimes());
assertEquals(1, ShuffleServerMetrics.counterLocalFileEventFlush.get());
+ storageManager.getStorageChecker().checkIsHealthy();
// case2: huge event is written to cold storage directly
event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100000);
flushManager.addToFlushQueue(event);
@@ -809,6 +813,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase
{
((HybridStorageManager)
storageManager).getWarmStorageManager().selectStorage(event));
((HybridStorageManager)
storageManager).getWarmStorageManager().updateWriteMetrics(bigEvent, 0);
+ storageManager.getStorageChecker().checkIsHealthy();
event = createShuffleDataFlushEvent(appId, 2, 1, 1, null, 100);
flushManager.addToFlushQueue(event);
waitForFlush(flushManager, appId, 2, 5);
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index 76371fdb9..756170973 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -46,7 +46,8 @@ public class LocalStorage extends AbstractStorage {
public static final String STORAGE_HOST = "local";
private final long diskCapacity;
- private volatile long diskFree;
+ private volatile long diskAvailableBytes;
+ private volatile long serviceUsedBytes;
// for test cases
private boolean enableDiskCapacityCheck = false;
@@ -81,7 +82,7 @@ public class LocalStorage extends AbstractStorage {
throw new RssException(ioe);
}
this.diskCapacity = baseFolder.getTotalSpace();
- this.diskFree = baseFolder.getUsableSpace();
+ this.diskAvailableBytes = baseFolder.getUsableSpace();
if (capacity < 0L) {
this.capacity = (long) (diskCapacity * builder.ratio);
@@ -91,7 +92,7 @@ public class LocalStorage extends AbstractStorage {
builder.ratio,
diskCapacity);
} else {
- final long freeSpace = diskFree;
+ final long freeSpace = diskAvailableBytes;
if (freeSpace < capacity) {
throw new IllegalArgumentException(
"The Disk of "
@@ -167,14 +168,14 @@ public class LocalStorage extends AbstractStorage {
if (isSpaceEnough) {
serviceUsedCapacityCheck =
- metaData.getDiskSize().doubleValue() * 100 / capacity <
highWaterMarkOfWrite;
+ (double) (serviceUsedBytes * 100) / capacity < highWaterMarkOfWrite;
diskUsedCapacityCheck =
- ((double) (diskCapacity - diskFree)) * 100 / diskCapacity <
highWaterMarkOfWrite;
+ ((double) (diskCapacity - diskAvailableBytes)) * 100 / diskCapacity
+ < highWaterMarkOfWrite;
} else {
- serviceUsedCapacityCheck =
- metaData.getDiskSize().doubleValue() * 100 / capacity <
lowWaterMarkOfWrite;
+ serviceUsedCapacityCheck = (double) (serviceUsedBytes * 100) / capacity
< lowWaterMarkOfWrite;
diskUsedCapacityCheck =
- ((double) (diskCapacity - diskFree)) * 100 / diskCapacity <
lowWaterMarkOfWrite;
+ ((double) (diskCapacity - diskAvailableBytes)) * 100 / diskCapacity
< lowWaterMarkOfWrite;
}
isSpaceEnough =
serviceUsedCapacityCheck && (enableDiskCapacityCheck ?
diskUsedCapacityCheck : true);
@@ -203,10 +204,6 @@ public class LocalStorage extends AbstractStorage {
metaData.updateShuffleLastReadTs(shuffleKey);
}
- public long getDiskSize() {
- return metaData.getDiskSize().longValue();
- }
-
@VisibleForTesting
public LocalStorageMeta getMetaData() {
return metaData;
@@ -266,8 +263,16 @@ public class LocalStorage extends AbstractStorage {
return appIds;
}
- public void updateDiskFree(long free) {
- this.diskFree = free;
+ public void updateDiskAvailableBytes(long bytes) {
+ this.diskAvailableBytes = bytes;
+ }
+
+ public void updateServiceUsedBytes(long usedBytes) {
+ this.serviceUsedBytes = usedBytes;
+ }
+
+ public long getServiceUsedBytes() {
+ return serviceUsedBytes;
}
// Only for test
diff --git
a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
index 68b4aaf3a..4ce4d1b9d 100644
---
a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
+++
b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
@@ -76,15 +76,15 @@ public class LocalStorageTest {
public void canWriteTest() {
LocalStorage item = createTestStorage(testBaseDir);
- item.getMetaData().updateDiskSize(20);
+ item.updateServiceUsedBytes(20);
assertTrue(item.canWrite());
- item.getMetaData().updateDiskSize(65);
+ item.updateServiceUsedBytes(item.getServiceUsedBytes() + 65);
assertTrue(item.canWrite());
- item.getMetaData().updateDiskSize(10);
+ item.updateServiceUsedBytes(item.getServiceUsedBytes() + 10);
assertFalse(item.canWrite());
- item.getMetaData().updateDiskSize(-10);
+ item.updateServiceUsedBytes(item.getServiceUsedBytes() - 10);
assertFalse(item.canWrite());
- item.getMetaData().updateDiskSize(-10);
+ item.updateServiceUsedBytes(item.getServiceUsedBytes() - 10);
assertTrue(item.canWrite());
}
@@ -178,18 +178,18 @@ public class LocalStorageTest {
.enableDiskCapacityWatermarkCheck()
.build();
- localStorage.getMetaData().updateDiskSize(20);
+ localStorage.updateServiceUsedBytes(localStorage.getServiceUsedBytes() +
20);
assertTrue(localStorage.canWrite());
- localStorage.getMetaData().updateDiskSize(65);
+ localStorage.updateServiceUsedBytes(localStorage.getServiceUsedBytes() +
65);
assertTrue(localStorage.canWrite());
final long diskCapacity = localStorage.getDiskCapacity();
- localStorage.updateDiskFree((long) (diskCapacity * (1 - 0.96)));
+ localStorage.updateDiskAvailableBytes((long) (diskCapacity * (1 - 0.96)));
assertFalse(localStorage.canWrite());
- localStorage.updateDiskFree((long) (diskCapacity * (1 - 0.60)));
+ localStorage.updateDiskAvailableBytes((long) (diskCapacity * (1 - 0.60)));
assertFalse(localStorage.canWrite());
- localStorage.getMetaData().updateDiskSize(-10);
+ localStorage.updateServiceUsedBytes(localStorage.getServiceUsedBytes() -
10);
assertTrue(localStorage.canWrite());
// capacity = diskCapacity
@@ -203,10 +203,10 @@ public class LocalStorageTest {
.enableDiskCapacityWatermarkCheck()
.build();
- localStorage.updateDiskFree((long) (diskCapacity * (1 - 0.96)));
+ localStorage.updateDiskAvailableBytes((long) (diskCapacity * (1 - 0.96)));
assertFalse(localStorage.canWrite());
- localStorage.updateDiskFree((long) (diskCapacity * (1 - 0.60)));
+ localStorage.updateDiskAvailableBytes((long) (diskCapacity * (1 - 0.60)));
assertTrue(localStorage.canWrite());
}
}