This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 7d25218d [#881] fix: Ensure LocalStorageMeta disk size is correctly
updated when events are processed (#902)
7d25218d is described below
commit 7d25218dcde9c05a2f3043f15ca2614f74428b5b
Author: awdavidson <[email protected]>
AuthorDate: Mon May 29 15:59:07 2023 +0100
[#881] fix: Ensure LocalStorageMeta disk size is correctly updated when
events are processed (#902)
### What changes were proposed in this pull request?
Ensure all events are marked as understorage, this will result to the
LocalStorageMeta being updated when events are processed.
### Why are the changes needed?
Currently LocalStorageMeta is only update with metrics from the first event
in a given shuffleId and partitionId, the first event updates metrics because
there is no entry in `partitionsOfStorage` and the event get marked as
`underStorage`, however, for future events in the same shuffleId and
partitionId `selectStorage` returns the storage and does not mark the event as
`underStorage` so when `updateWriteMetrics` is called,
`event.getUnderStorage()` returns null and `storage.updateWri [...]
As metrics are not updated correctly, `LocalStorage.canWrite` will not
return the correct result.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit test which covers multi events for the same shuffleId and
partitionId
---
.../server/storage/LocalStorageManager.java | 5 ++-
.../uniffle/server/ShuffleFlushManagerTest.java | 36 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 1 deletion(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 4080eba0..c1d423cd 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -181,6 +181,9 @@ public class LocalStorageManager extends
SingleStorageManager {
storage.getBasePath(), event);
}
} else {
+ if (event.getUnderStorage() == null) {
+ event.setUnderStorage(storage);
+ }
return storage;
}
}
@@ -206,7 +209,7 @@ public class LocalStorageManager extends
SingleStorageManager {
(key, localStorage) -> {
// If this is the first time to select storage or existing storage
is corrupted,
// we should refresh the cache.
- if (localStorage == null || localStorage.isCorrupted()) {
+ if (localStorage == null || localStorage.isCorrupted() ||
event.getUnderStorage() == null) {
event.setUnderStorage(selectedStorage);
return selectedStorage;
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 4817b812..7de4c653 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -56,6 +56,7 @@ import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.event.AppPurgeEvent;
import org.apache.uniffle.server.storage.HadoopStorageManager;
+import org.apache.uniffle.server.storage.LocalStorageManager;
import org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy;
import org.apache.uniffle.server.storage.MultiStorageManager;
import org.apache.uniffle.server.storage.StorageManager;
@@ -69,6 +70,7 @@ import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -233,6 +235,34 @@ public class ShuffleFlushManagerTest extends
HadoopTestBase {
waitForMetrics(ShuffleServerMetrics.gaugeWriteHandler, 0, 0.5);
}
+ @Test
+ public void localMetricsTest(@TempDir File tempDir) throws Exception {
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(tempDir.getAbsolutePath()));
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE.name());
+
+ String appId = "localMetricsTest_appId";
+ StorageManager storageManager =
+
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+ ShuffleFlushManager manager =
+ new ShuffleFlushManager(shuffleServerConf, mockShuffleServer,
storageManager);
+ ShuffleDataFlushEvent event1 =
+ createShuffleDataFlushEvent(appId, 1, 1, 1, null);
+ manager.addToFlushQueue(event1);
+ // wait for write data
+ waitForFlush(manager, appId, 1, 5);
+
+ validateLocalMetadata(storageManager, 160L);
+
+ ShuffleDataFlushEvent event12 =
+ createShuffleDataFlushEvent(appId, 1, 1, 1, null);
+ manager.addToFlushQueue(event12);
+
+ // wait for write data
+ waitForFlush(manager, appId, 1, 10);
+
+ validateLocalMetadata(storageManager, 320L);
+ }
+
@Test
public void complexWriteTest() throws Exception {
shuffleServerConf.setString("rss.server.flush.handler.expired", "3");
@@ -587,4 +617,10 @@ public class ShuffleFlushManagerTest extends
HadoopTestBase {
assertEquals(eventNum + 3, (int)
ShuffleServerMetrics.counterTotalDroppedEventNum.get());
assertEquals(0, manager.getPendingEventsSize());
}
+
+ private void validateLocalMetadata(StorageManager storageManager, Long size)
{
+ assertInstanceOf(LocalStorageManager.class, storageManager);
+ LocalStorage localStorage = ((LocalStorageManager)
storageManager).getStorages().get(0);
+ assertEquals(size, localStorage.getMetaData().getDiskSize().longValue());
+ }
}