This is an automated email from the ASF dual-hosted git repository. ckj pushed a commit to branch branch-0.6 in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit f40d76c0bbc221782eda3d0ce4cd5fa98c22a682 Author: xianjingfeng <[email protected]> AuthorDate: Fri Nov 4 15:00:37 2022 +0800 Fix AbstractStorage#containsWriteHandler (#281) ### What changes were proposed in this pull request? Fix AbstractStorage#containsWriteHandler ### Why are the changes needed? It is a bug, and it is obvious. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Feel unnecessary --- .../uniffle/server/storage/LocalStorageManager.java | 2 +- .../apache/uniffle/storage/common/AbstractStorage.java | 6 +++++- .../apache/uniffle/storage/common/LocalStorageTest.java | 15 +++++++++++++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index 1bbc6298..f3b48032 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -137,7 +137,7 @@ public class LocalStorageManager extends SingleStorageManager { event.getStartPartition())); if (storage.containsWriteHandler(event.getAppId(), event.getShuffleId(), event.getStartPartition()) && storage.isCorrupted()) { - throw new RuntimeException("storage " + storage.getBasePath() + " is corrupted"); + LOG.error("storage " + storage.getBasePath() + " is corrupted"); } if (storage.isCorrupted()) { storage = getRepairedStorage(event.getAppId(), event.getShuffleId(), event.getStartPartition()); diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java b/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java index ddddae9c..80125a63 100644 --- a/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java +++ b/storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java @@ -73,8 +73,12 @@ public abstract class AbstractStorage implements Storage { protected abstract ServerReadHandler newReadHandler(CreateShuffleReadHandlerRequest request); public boolean containsWriteHandler(String appId, int shuffleId, int partition) { + Map<String, ShuffleWriteHandler> map = writerHandlers.get(appId); + if (map == null || map.isEmpty()) { + return false; + } String partitionKey = RssUtils.generatePartitionKey(appId, shuffleId, partition); - return writerHandlers.containsKey(partitionKey); + return map.containsKey(partitionKey); } @Override diff --git a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java index 5240e59a..835d5e9d 100644 --- a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java +++ b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java @@ -29,6 +29,8 @@ import org.junit.jupiter.api.io.TempDir; import org.roaringbitmap.RoaringBitmap; import org.apache.uniffle.common.util.RssUtils; +import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest; +import org.apache.uniffle.storage.util.StorageType; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -161,4 +163,17 @@ public class LocalStorageTest { assertEquals(2, item.getSortedShuffleKeys(false, 2).size()); assertEquals(2, item.getSortedShuffleKeys(false, 3).size()); } + + @Test + public void writeHandlerTest() { + LocalStorage item = LocalStorage.newBuilder().basePath(testBaseDir.getAbsolutePath()).build(); + String appId = "writeHandlerTest"; + assertFalse(item.containsWriteHandler(appId, 0, 1)); + String[] storageBasePaths = {testBaseDir.getAbsolutePath()}; + CreateShuffleWriteHandlerRequest request = new CreateShuffleWriteHandlerRequest( + StorageType.LOCALFILE.name(), appId, 0, 1, 1, storageBasePaths, + "ss1", null, 1, null); + item.getOrCreateWriteHandler(request); + assertTrue(item.containsWriteHandler(appId, 0, 1)); + } }
