This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 3ee1688c8 [#1428] fix(server): fallback invalid when local storage
can't write (#1429)
3ee1688c8 is described below
commit 3ee1688c828c26d60f171ddf54b701beaaf0dfaf
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Jan 17 13:45:47 2024 +0800
[#1428] fix(server): fallback invalid when local storage can't write (#1429)
### What changes were proposed in this pull request?
allow specifying negative fallback threshold to avoid event being discarded
### Why are the changes needed?
Fix: #1428
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests
---
.../apache/uniffle/server/ShuffleServerConf.java | 2 -
.../HadoopStorageManagerFallbackStrategy.java | 2 +-
.../LocalStorageManagerFallbackStrategy.java | 2 +-
.../RotateStorageManagerFallbackStrategy.java | 3 +-
.../uniffle/server/ShuffleFlushManagerTest.java | 13 +++---
.../server/storage/HybridStorageManagerTest.java | 48 ++++++++++++++++++++++
6 files changed, 58 insertions(+), 12 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 77287b2a7..f968a1722 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -352,8 +352,6 @@ public class ShuffleServerConf extends RssBaseConf {
public static final ConfigOption<Long> FALLBACK_MAX_FAIL_TIMES =
ConfigOptions.key("rss.server.hybrid.storage.fallback.max.fail.times")
.longType()
- .checkValue(
- ConfigUtils.NON_NEGATIVE_LONG_VALIDATOR, " fallback times must
be non-negative")
.defaultValue(0L)
.withDescription("For hybrid storage, fail times exceed the number,
will switch storage")
.withDeprecatedKeys("rss.server.multistorage.fallback.max.fail.times");
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManagerFallbackStrategy.java
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManagerFallbackStrategy.java
index 48a0858ee..e569578db 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManagerFallbackStrategy.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManagerFallbackStrategy.java
@@ -37,7 +37,7 @@ public class HadoopStorageManagerFallbackStrategy extends
AbstractStorageManager
@Override
public StorageManager tryFallback(
StorageManager current, ShuffleDataFlushEvent event, StorageManager...
candidates) {
- if (event.getRetryTimes() > fallBackTimes) {
+ if (event.getRetryTimes() > fallBackTimes || !current.canWrite(event)) {
return findNextStorageManager(current, excludeTypes, event, candidates);
}
return current;
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFallbackStrategy.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFallbackStrategy.java
index d506fe97d..565fa2b3c 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFallbackStrategy.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManagerFallbackStrategy.java
@@ -37,7 +37,7 @@ public class LocalStorageManagerFallbackStrategy extends
AbstractStorageManagerF
@Override
public StorageManager tryFallback(
StorageManager current, ShuffleDataFlushEvent event, StorageManager...
candidates) {
- if (event.getRetryTimes() > fallBackTimes) {
+ if (event.getRetryTimes() > fallBackTimes || !current.canWrite(event)) {
return findNextStorageManager(current, excludeTypes, event, candidates);
}
return current;
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/RotateStorageManagerFallbackStrategy.java
b/server/src/main/java/org/apache/uniffle/server/storage/RotateStorageManagerFallbackStrategy.java
index 0f61643d9..623438f20 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/RotateStorageManagerFallbackStrategy.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/RotateStorageManagerFallbackStrategy.java
@@ -32,7 +32,8 @@ public class RotateStorageManagerFallbackStrategy extends
AbstractStorageManager
public StorageManager tryFallback(
StorageManager current, ShuffleDataFlushEvent event, StorageManager...
candidates) {
if (fallBackTimes > 0
- && (event.getRetryTimes() < fallBackTimes || event.getRetryTimes() %
fallBackTimes > 0)) {
+ && (event.getRetryTimes() < fallBackTimes || event.getRetryTimes() %
fallBackTimes > 0)
+ && current.canWrite(event)) {
return current;
}
return findNextStorageManager(current, null, event, candidates);
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index ec767bc63..f2b0b3048 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -697,16 +697,15 @@ public class ShuffleFlushManagerTest extends
HadoopTestBase {
List<ShufflePartitionedBlock> blocks =
Lists.newArrayList(new ShufflePartitionedBlock(100000, 1000, 1, 1, 1L,
(byte[]) null));
ShuffleDataFlushEvent bigEvent =
- new ShuffleDataFlushEvent(1, "1", 1, 1, 1, 100, blocks, null, null);
+ new ShuffleDataFlushEvent(1, "1", 2, 1, 1, 100, blocks, null, null);
bigEvent.setUnderStorage(
((HybridStorageManager)
storageManager).getWarmStorageManager().selectStorage(event));
((HybridStorageManager)
storageManager).getWarmStorageManager().updateWriteMetrics(bigEvent, 0);
- event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100);
+ event = createShuffleDataFlushEvent(appId, 3, 1, 1, null, 100);
flushManager.addToFlushQueue(event);
Thread.sleep(1000);
assertTrue(event.getUnderStorage() instanceof HadoopStorage);
- assertEquals(1, event.getRetryTimes());
}
@Test
@@ -756,11 +755,11 @@ public class ShuffleFlushManagerTest extends
HadoopTestBase {
((HybridStorageManager)
storageManager).getWarmStorageManager().selectStorage(event));
((HybridStorageManager)
storageManager).getWarmStorageManager().updateWriteMetrics(bigEvent, 0);
- event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100);
+ event = createShuffleDataFlushEvent(appId, 2, 1, 1, null, 100);
flushManager.addToFlushQueue(event);
- waitForFlush(flushManager, appId, 1, 15);
- assertEquals(1, event.getRetryTimes());
- assertEquals(2, ShuffleServerMetrics.counterLocalFileEventFlush.get());
+ waitForFlush(flushManager, appId, 2, 5);
+ assertEquals(0, event.getRetryTimes());
+ assertEquals(1, ShuffleServerMetrics.counterLocalFileEventFlush.get());
assertEquals(2, ShuffleServerMetrics.counterHadoopEventFlush.get());
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/storage/HybridStorageManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/storage/HybridStorageManagerTest.java
index d3e12a284..7fe6f3bc0 100644
---
a/server/src/test/java/org/apache/uniffle/server/storage/HybridStorageManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/storage/HybridStorageManagerTest.java
@@ -36,6 +36,54 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class HybridStorageManagerTest {
+ /**
+ * this tests the fallback strategy when encountering the local storage is
invalid. 1. When
+ * specifying the fallback max fail time = 0, the event will be discarded 2.
When specifying the
+ * fallback max fail time < 0, the event will be taken by Hadoop Storage.
+ */
+ @Test
+ public void fallbackTestWhenLocalStorageCorrupted() {
+ ShuffleServerConf conf = new ShuffleServerConf();
+ conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 2000L);
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test"));
+ conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
+ conf.setString(
+ ShuffleServerConf.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE_HDFS.name());
+ conf.setString(
+ ShuffleServerConf.HYBRID_STORAGE_MANAGER_SELECTOR_CLASS,
+
"org.apache.uniffle.server.storage.hybrid.HugePartitionSensitiveStorageManagerSelector");
+ conf.setString(
+ ShuffleServerConf.HYBRID_STORAGE_FALLBACK_STRATEGY_CLASS,
+
"org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy");
+
+ // case1: fallback to hadoop storage when fallback_max_fail_time = -1
+ conf.setLong(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES, -1);
+ HybridStorageManager manager = new HybridStorageManager(conf);
+
+ LocalStorageManager localStorageManager = (LocalStorageManager)
manager.getWarmStorageManager();
+ localStorageManager.getStorages().get(0).markCorrupted();
+
+ String remoteStorage = "test";
+ String appId = "selectStorageManagerWithSelectorAndFallbackStrategy_appId";
+ manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
+ List<ShufflePartitionedBlock> blocks =
+ Lists.newArrayList(new ShufflePartitionedBlock(100, 1000, 1, 1, 1L,
(byte[]) null));
+ ShuffleDataFlushEvent event =
+ new ShuffleDataFlushEvent(1, appId, 1, 1, 1, 1000, blocks, null, null);
+ assertTrue((manager.selectStorage(event) instanceof HadoopStorage));
+
+ // case2: fallback is still valid when fallback_max_fail_time = 0
+ conf.setLong(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES, 0);
+ manager = new HybridStorageManager(conf);
+
+ localStorageManager = (LocalStorageManager)
manager.getWarmStorageManager();
+ localStorageManager.getStorages().get(0).markCorrupted();
+
+ event = new ShuffleDataFlushEvent(1, appId, 1, 1, 1, 1000, blocks, null,
null);
+ manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
+ assertTrue((manager.selectStorage(event) instanceof HadoopStorage));
+ }
+
@Test
public void selectStorageManagerTest() {
ShuffleServerConf conf = new ShuffleServerConf();