This is an automated email from the ASF dual-hosted git repository.

xianjingfeng pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 416ecfa0d [#1684] fix(server): use the diskSize obtained from periodic 
check to determine whether is writable (#1685)
416ecfa0d is described below

commit 416ecfa0d2c0c6f12391e82110f20106a70bb005
Author: xianjingfeng <[email protected]>
AuthorDate: Sat May 11 09:56:41 2024 +0800

    [#1684] fix(server): use the diskSize obtained from periodic check to 
determine whether is writable (#1685)
    
    ### What changes were proposed in this pull request?
    Use the disk size obtained from periodic check to determine whether the 
disk can be written.
    
    ### Why are the changes needed?
    The disk size obtained from metadata is unreliable.
    Fix: #1684
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Existing UTs
    
    (cherry picked from commit 40bd14b9368e748b4d8ac6585c645a02cafad100)
---
 .../apache/uniffle/server/LocalStorageChecker.java | 29 ++++++++++++-------
 .../server/storage/LocalStorageManager.java        |  2 +-
 .../uniffle/server/ShuffleFlushManagerTest.java    |  5 ++++
 .../uniffle/storage/common/LocalStorage.java       | 33 +++++++++++++---------
 .../uniffle/storage/common/LocalStorageTest.java   | 24 ++++++++--------
 5 files changed, 56 insertions(+), 37 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java 
b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
index 0554035d6..d21e0bd4a 100644
--- a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
+++ b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
@@ -109,14 +109,15 @@ public class LocalStorageChecker extends Checker {
                 }
 
                 long total = getTotalSpace(storageInfo.storageDir);
-                long free = getFreeSpace(storageInfo.storageDir);
+                long availableBytes = getFreeSpace(storageInfo.storageDir);
 
                 totalSpace.addAndGet(total);
-                wholeDiskUsedSpace.addAndGet(total - free);
-                
serviceUsedSpace.addAndGet(getServiceUsedSpace(storageInfo.storageDir));
-
-                storageInfo.updateStorageFreeSpace(free);
-                if (storageInfo.checkIsSpaceEnough(total, free)) {
+                wholeDiskUsedSpace.addAndGet(total - availableBytes);
+                long usedBytes = getServiceUsedSpace(storageInfo.storageDir);
+                serviceUsedSpace.addAndGet(usedBytes);
+                storageInfo.updateServiceUsedBytes(usedBytes);
+                storageInfo.updateStorageFreeSpace(availableBytes);
+                if (storageInfo.checkIsSpaceEnough(total, availableBytes)) {
                   num.incrementAndGet();
                 }
                 return null;
@@ -231,16 +232,20 @@ public class LocalStorageChecker extends Checker {
       this.storage = storage;
     }
 
-    void updateStorageFreeSpace(long free) {
-      storage.updateDiskFree(free);
+    void updateStorageFreeSpace(long availableBytes) {
+      storage.updateDiskAvailableBytes(availableBytes);
+    }
+
+    void updateServiceUsedBytes(long usedBytes) {
+      storage.updateServiceUsedBytes(usedBytes);
     }
 
-    boolean checkIsSpaceEnough(long total, long free) {
+    boolean checkIsSpaceEnough(long total, long availableBytes) {
       if (Double.compare(0.0, total) == 0) {
         this.isHealthy = false;
         return false;
       }
-      double usagePercent = (total - free) * 100.0 / total;
+      double usagePercent = (total - availableBytes) * 100.0 / total;
       if (isHealthy) {
         if (Double.compare(usagePercent, diskMaxUsagePercentage) >= 0) {
           isHealthy = false;
@@ -255,6 +260,10 @@ public class LocalStorageChecker extends Checker {
       return isHealthy;
     }
 
+    boolean canWrite() {
+      return storage.canWrite();
+    }
+
     boolean checkStorageReadAndWrite() {
       if (storage.isCorrupted()) {
         return false;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 772aae1d0..bd6ca2128 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -380,7 +380,7 @@ public class LocalStorageManager extends 
SingleStorageManager {
     for (LocalStorage storage : localStorages) {
       String mountPoint = storage.getMountPoint();
       long capacity = storage.getCapacity();
-      long wroteBytes = storage.getDiskSize();
+      long wroteBytes = storage.getServiceUsedBytes();
       StorageStatus status = StorageStatus.NORMAL;
       if (storage.isCorrupted()) {
         status = StorageStatus.UNHEALTHY;
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 274c2cbbb..2b21445e3 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -343,6 +343,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase 
{
     int storageIndex1 = 
storagePaths.indexOf(flushEvent.getUnderStorage().getStoragePath());
     validateLocalMetadata(storageManager, storageIndex1, 1010L);
 
+    storageManager.getStorageChecker().checkIsHealthy();
     flushEvent = createShuffleDataFlushEvent(appId, 3, 1, 1, 10, 102, null);
     manager.addToFlushQueue(flushEvent);
     // wait for write data
@@ -737,6 +738,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase 
{
     Thread.sleep(1000);
     assertTrue(event.getUnderStorage() instanceof LocalStorage);
 
+    storageManager.getStorageChecker().checkIsHealthy();
     // case2: huge event is written to cold storage directly
     event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100000);
     flushManager.addToFlushQueue(event);
@@ -753,6 +755,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase 
{
         ((HybridStorageManager) 
storageManager).getWarmStorageManager().selectStorage(event));
     ((HybridStorageManager) 
storageManager).getWarmStorageManager().updateWriteMetrics(bigEvent, 0);
 
+    storageManager.getStorageChecker().checkIsHealthy();
     event = createShuffleDataFlushEvent(appId, 3, 1, 1, null, 100);
     flushManager.addToFlushQueue(event);
     Thread.sleep(1000);
@@ -793,6 +796,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase 
{
     assertEquals(0, event.getRetryTimes());
     assertEquals(1, ShuffleServerMetrics.counterLocalFileEventFlush.get());
 
+    storageManager.getStorageChecker().checkIsHealthy();
     // case2: huge event is written to cold storage directly
     event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100000);
     flushManager.addToFlushQueue(event);
@@ -809,6 +813,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase 
{
         ((HybridStorageManager) 
storageManager).getWarmStorageManager().selectStorage(event));
     ((HybridStorageManager) 
storageManager).getWarmStorageManager().updateWriteMetrics(bigEvent, 0);
 
+    storageManager.getStorageChecker().checkIsHealthy();
     event = createShuffleDataFlushEvent(appId, 2, 1, 1, null, 100);
     flushManager.addToFlushQueue(event);
     waitForFlush(flushManager, appId, 2, 5);
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java 
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index 76371fdb9..756170973 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -46,7 +46,8 @@ public class LocalStorage extends AbstractStorage {
   public static final String STORAGE_HOST = "local";
 
   private final long diskCapacity;
-  private volatile long diskFree;
+  private volatile long diskAvailableBytes;
+  private volatile long serviceUsedBytes;
   // for test cases
   private boolean enableDiskCapacityCheck = false;
 
@@ -81,7 +82,7 @@ public class LocalStorage extends AbstractStorage {
       throw new RssException(ioe);
     }
     this.diskCapacity = baseFolder.getTotalSpace();
-    this.diskFree = baseFolder.getUsableSpace();
+    this.diskAvailableBytes = baseFolder.getUsableSpace();
 
     if (capacity < 0L) {
       this.capacity = (long) (diskCapacity * builder.ratio);
@@ -91,7 +92,7 @@ public class LocalStorage extends AbstractStorage {
           builder.ratio,
           diskCapacity);
     } else {
-      final long freeSpace = diskFree;
+      final long freeSpace = diskAvailableBytes;
       if (freeSpace < capacity) {
         throw new IllegalArgumentException(
             "The Disk of "
@@ -167,14 +168,14 @@ public class LocalStorage extends AbstractStorage {
 
     if (isSpaceEnough) {
       serviceUsedCapacityCheck =
-          metaData.getDiskSize().doubleValue() * 100 / capacity < 
highWaterMarkOfWrite;
+          (double) (serviceUsedBytes * 100) / capacity < highWaterMarkOfWrite;
       diskUsedCapacityCheck =
-          ((double) (diskCapacity - diskFree)) * 100 / diskCapacity < 
highWaterMarkOfWrite;
+          ((double) (diskCapacity - diskAvailableBytes)) * 100 / diskCapacity
+              < highWaterMarkOfWrite;
     } else {
-      serviceUsedCapacityCheck =
-          metaData.getDiskSize().doubleValue() * 100 / capacity < 
lowWaterMarkOfWrite;
+      serviceUsedCapacityCheck = (double) (serviceUsedBytes * 100) / capacity 
< lowWaterMarkOfWrite;
       diskUsedCapacityCheck =
-          ((double) (diskCapacity - diskFree)) * 100 / diskCapacity < 
lowWaterMarkOfWrite;
+          ((double) (diskCapacity - diskAvailableBytes)) * 100 / diskCapacity 
< lowWaterMarkOfWrite;
     }
     isSpaceEnough =
         serviceUsedCapacityCheck && (enableDiskCapacityCheck ? 
diskUsedCapacityCheck : true);
@@ -203,10 +204,6 @@ public class LocalStorage extends AbstractStorage {
     metaData.updateShuffleLastReadTs(shuffleKey);
   }
 
-  public long getDiskSize() {
-    return metaData.getDiskSize().longValue();
-  }
-
   @VisibleForTesting
   public LocalStorageMeta getMetaData() {
     return metaData;
@@ -266,8 +263,16 @@ public class LocalStorage extends AbstractStorage {
     return appIds;
   }
 
-  public void updateDiskFree(long free) {
-    this.diskFree = free;
+  public void updateDiskAvailableBytes(long bytes) {
+    this.diskAvailableBytes = bytes;
+  }
+
+  public void updateServiceUsedBytes(long usedBytes) {
+    this.serviceUsedBytes = usedBytes;
+  }
+
+  public long getServiceUsedBytes() {
+    return serviceUsedBytes;
   }
 
   // Only for test
diff --git 
a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java 
b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
index 68b4aaf3a..4ce4d1b9d 100644
--- 
a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
+++ 
b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
@@ -76,15 +76,15 @@ public class LocalStorageTest {
   public void canWriteTest() {
     LocalStorage item = createTestStorage(testBaseDir);
 
-    item.getMetaData().updateDiskSize(20);
+    item.updateServiceUsedBytes(20);
     assertTrue(item.canWrite());
-    item.getMetaData().updateDiskSize(65);
+    item.updateServiceUsedBytes(item.getServiceUsedBytes() + 65);
     assertTrue(item.canWrite());
-    item.getMetaData().updateDiskSize(10);
+    item.updateServiceUsedBytes(item.getServiceUsedBytes() + 10);
     assertFalse(item.canWrite());
-    item.getMetaData().updateDiskSize(-10);
+    item.updateServiceUsedBytes(item.getServiceUsedBytes() - 10);
     assertFalse(item.canWrite());
-    item.getMetaData().updateDiskSize(-10);
+    item.updateServiceUsedBytes(item.getServiceUsedBytes() - 10);
     assertTrue(item.canWrite());
   }
 
@@ -178,18 +178,18 @@ public class LocalStorageTest {
             .enableDiskCapacityWatermarkCheck()
             .build();
 
-    localStorage.getMetaData().updateDiskSize(20);
+    localStorage.updateServiceUsedBytes(localStorage.getServiceUsedBytes() + 
20);
     assertTrue(localStorage.canWrite());
-    localStorage.getMetaData().updateDiskSize(65);
+    localStorage.updateServiceUsedBytes(localStorage.getServiceUsedBytes() + 
65);
     assertTrue(localStorage.canWrite());
 
     final long diskCapacity = localStorage.getDiskCapacity();
-    localStorage.updateDiskFree((long) (diskCapacity * (1 - 0.96)));
+    localStorage.updateDiskAvailableBytes((long) (diskCapacity * (1 - 0.96)));
     assertFalse(localStorage.canWrite());
 
-    localStorage.updateDiskFree((long) (diskCapacity * (1 - 0.60)));
+    localStorage.updateDiskAvailableBytes((long) (diskCapacity * (1 - 0.60)));
     assertFalse(localStorage.canWrite());
-    localStorage.getMetaData().updateDiskSize(-10);
+    localStorage.updateServiceUsedBytes(localStorage.getServiceUsedBytes() - 
10);
     assertTrue(localStorage.canWrite());
 
     // capacity = diskCapacity
@@ -203,10 +203,10 @@ public class LocalStorageTest {
             .enableDiskCapacityWatermarkCheck()
             .build();
 
-    localStorage.updateDiskFree((long) (diskCapacity * (1 - 0.96)));
+    localStorage.updateDiskAvailableBytes((long) (diskCapacity * (1 - 0.96)));
     assertFalse(localStorage.canWrite());
 
-    localStorage.updateDiskFree((long) (diskCapacity * (1 - 0.60)));
+    localStorage.updateDiskAvailableBytes((long) (diskCapacity * (1 - 0.60)));
     assertTrue(localStorage.canWrite());
   }
 }

Reply via email to