This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch branch-0.7
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/branch-0.7 by this push:
new 13391213 [#881][0.7] fix(followup): Ensure LocalStorageMeta disk size
is correctly updated when events are processed for 0.7.0 (#914)
13391213 is described below
commit 133912133687a7f9be10d195fa0c08c4a41607fb
Author: awdavidson <[email protected]>
AuthorDate: Tue Jun 6 16:39:35 2023 +0100
[#881][0.7] fix(followup): Ensure LocalStorageMeta disk size is correctly
updated when events are processed for 0.7.0 (#914)
What changes were proposed in this pull request?
Ensure all events are marked as understorage, this will result to the
LocalStorageMeta being updated when events are processed.
Why are the changes needed?
Currently LocalStorageMeta is only update with metrics from the first event
in a given shuffleId and partitionId, the first event updates metrics because
there is no entry in partitionsOfStorage and the event get marked as
underStorage, however, for future events in the same shuffleId and partitionId
selectStorage returns the storage and does not mark the event as underStorage
so when updateWriteMetrics is called, event.getUnderStorage() returns null and
storage.updateWriteMetrics(met [...]
As metrics are not updated correctly, LocalStorage.canWrite will not return
the correct result.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added unit test which covers multi events for the same shuffleId and
partitionId
---
.../server/storage/LocalStorageManager.java | 5 ++-
.../uniffle/server/ShuffleFlushManagerTest.java | 36 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 1 deletion(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 151038fd..be68b510 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -179,6 +179,9 @@ public class LocalStorageManager extends
SingleStorageManager {
storage.getBasePath(), event);
}
} else {
+ if (event.getUnderStorage() == null) {
+ event.setUnderStorage(storage);
+ }
return storage;
}
}
@@ -204,7 +207,7 @@ public class LocalStorageManager extends
SingleStorageManager {
(key, localStorage) -> {
// If this is the first time to select storage or existing storage
is corrupted,
// we should refresh the cache.
- if (localStorage == null || localStorage.isCorrupted()) {
+ if (localStorage == null || localStorage.isCorrupted() ||
event.getUnderStorage() == null) {
event.setUnderStorage(selectedStorage);
return selectedStorage;
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index ea0e236e..a316a0df 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -55,6 +55,7 @@ import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.event.AppPurgeEvent;
import org.apache.uniffle.server.storage.HdfsStorageManager;
+import org.apache.uniffle.server.storage.LocalStorageManager;
import org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy;
import org.apache.uniffle.server.storage.MultiStorageManager;
import org.apache.uniffle.server.storage.StorageManager;
@@ -68,6 +69,7 @@ import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -193,6 +195,34 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
waitForQueueClear(manager);
waitForMetrics(ShuffleServerMetrics.gaugeWriteHandler, 0, 0.5);
}
+
+ @Test
+ public void localMetricsTest(@TempDir File tempDir) throws Exception {
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(tempDir.getAbsolutePath()));
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE.name());
+
+ String appId = "localMetricsTest_appId";
+ StorageManager storageManager =
+
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+ ShuffleFlushManager manager =
+ new ShuffleFlushManager(shuffleServerConf, "shuffleServerId",
mockShuffleServer, storageManager);
+ ShuffleDataFlushEvent event1 =
+ createShuffleDataFlushEvent(appId, 1, 1, 1, null);
+ manager.addToFlushQueue(event1);
+ // wait for write data
+ waitForFlush(manager, appId, 1, 5);
+
+ validateLocalMetadata(storageManager, 160L);
+
+ ShuffleDataFlushEvent event12 =
+ createShuffleDataFlushEvent(appId, 1, 1, 1, null);
+ manager.addToFlushQueue(event12);
+
+ // wait for write data
+ waitForFlush(manager, appId, 1, 10);
+
+ validateLocalMetadata(storageManager, 320L);
+ }
@Test
public void complexWriteTest() throws Exception {
@@ -554,4 +584,10 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
fail();
}
}
+
+ private void validateLocalMetadata(StorageManager storageManager, Long size)
{
+ assertInstanceOf(LocalStorageManager.class, storageManager);
+ LocalStorage localStorage = ((LocalStorageManager)
storageManager).getStorages().get(0);
+ assertEquals(size, localStorage.getMetaData().getDiskSize().longValue());
+ }
}