This is an automated email from the ASF dual-hosted git repository. roryqi pushed a commit to branch branch-0.8 in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
commit 096c4b98cee7988a7692f29d364bd817d9dee6b8 Author: Fantasy-Jay <[email protected]> AuthorDate: Wed Sep 27 17:12:35 2023 +0800 [#1211] fix(server): unexpectedly removing resources when app has re-registered shuffle later (#1212) ### What changes were proposed in this pull request? 1. check again if the App has expired when removing resources. 2. add the lock of app scope to keep the sequence of remove and register operations ### Why are the changes needed? Fix: https://github.com/apache/incubator-uniffle/issues/1211 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add more tests. --- .../apache/uniffle/server/ShuffleTaskManager.java | 145 ++++++++++++++------- .../server/KerberizedShuffleTaskManagerTest.java | 2 +- .../apache/uniffle/server/ShuffleServerTest.java | 2 +- .../uniffle/server/ShuffleTaskManagerTest.java | 65 ++++++++- 4 files changed, 160 insertions(+), 54 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index 3579d4a51..9e15e3892 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -26,12 +26,17 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Queues; @@ -101,6 +106,7 @@ public class ShuffleTaskManager { private Map<Long, PreAllocatedBufferInfo> requireBufferIds = JavaUtils.newConcurrentMap(); private Runnable clearResourceThread; private BlockingQueue<PurgeEvent> expiredAppIdQueue = Queues.newLinkedBlockingQueue(); + private final Cache<String, Lock> appLocks; public ShuffleTaskManager( ShuffleServerConf conf, @@ -153,6 +159,12 @@ public class ShuffleTaskManager { shuffleBufferManager.setShuffleTaskManager(this); } + appLocks = + CacheBuilder.newBuilder() + .expireAfterAccess(3600, TimeUnit.SECONDS) + .maximumSize(Integer.MAX_VALUE) + .build(); + // the thread for clear expired resources clearResourceThread = () -> { @@ -160,7 +172,7 @@ public class ShuffleTaskManager { try { PurgeEvent event = expiredAppIdQueue.take(); if (event instanceof AppPurgeEvent) { - removeResources(event.getAppId()); + removeResources(event.getAppId(), true); } if (event instanceof ShufflePurgeEvent) { removeResourcesByShuffleIds(event.getAppId(), event.getShuffleIds()); @@ -176,6 +188,15 @@ public class ShuffleTaskManager { thread.start(); } + private Lock getAppLock(String appId) { + try { + return appLocks.get(appId, ReentrantLock::new); + } catch (ExecutionException e) { + LOG.error("Failed to get App lock.", e); + throw new RssException(e); + } + } + /** Only for test */ @VisibleForTesting public StatusCode registerShuffle( @@ -202,26 +223,32 @@ public class ShuffleTaskManager { String user, ShuffleDataDistributionType dataDistType, int maxConcurrencyPerPartitionToWrite) { - refreshAppId(appId); - - ShuffleTaskInfo taskInfo = shuffleTaskInfos.get(appId); - taskInfo.setUser(user); - taskInfo.setSpecification( - ShuffleSpecification.builder() - .maxConcurrencyPerPartitionToWrite( - getMaxConcurrencyWriting(maxConcurrencyPerPartitionToWrite, conf)) - .dataDistributionType(dataDistType) - .build()); - - partitionsToBlockIds.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap()); - for (PartitionRange partitionRange : partitionRanges) { - shuffleBufferManager.registerBuffer( - appId, shuffleId, partitionRange.getStart(), partitionRange.getEnd()); - } - if (!remoteStorageInfo.isEmpty()) { - storageManager.registerRemoteStorage(appId, remoteStorageInfo); + Lock lock = getAppLock(appId); + try { + lock.lock(); + refreshAppId(appId); + + ShuffleTaskInfo taskInfo = shuffleTaskInfos.get(appId); + taskInfo.setUser(user); + taskInfo.setSpecification( + ShuffleSpecification.builder() + .maxConcurrencyPerPartitionToWrite( + getMaxConcurrencyWriting(maxConcurrencyPerPartitionToWrite, conf)) + .dataDistributionType(dataDistType) + .build()); + + partitionsToBlockIds.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap()); + for (PartitionRange partitionRange : partitionRanges) { + shuffleBufferManager.registerBuffer( + appId, shuffleId, partitionRange.getStart(), partitionRange.getEnd()); + } + if (!remoteStorageInfo.isEmpty()) { + storageManager.registerRemoteStorage(appId, remoteStorageInfo); + } + return StatusCode.SUCCESS; + } finally { + lock.unlock(); } - return StatusCode.SUCCESS; } @VisibleForTesting @@ -582,8 +609,7 @@ public class ShuffleTaskManager { Set<String> appNames = Sets.newHashSet(shuffleTaskInfos.keySet()); // remove applications which is timeout according to rss.server.app.expired.withoutHeartbeat for (String appId : appNames) { - if (System.currentTimeMillis() - shuffleTaskInfos.get(appId).getCurrentTimes() - > appExpiredWithoutHB) { + if (isAppExpired(appId)) { LOG.info( "Detect expired appId[" + appId @@ -598,6 +624,14 @@ public class ShuffleTaskManager { } } + private boolean isAppExpired(String appId) { + if (shuffleTaskInfos.get(appId) == null) { + return true; + } + return System.currentTimeMillis() - shuffleTaskInfos.get(appId).getCurrentTimes() + > appExpiredWithoutHB; + } + /** * Clear up the partial resources of shuffleIds of App. * @@ -648,34 +682,47 @@ public class ShuffleTaskManager { } @VisibleForTesting - public void removeResources(String appId) { - LOG.info("Start remove resource for appId[" + appId + "]"); - final long start = System.currentTimeMillis(); - String user = getUserByAppId(appId); - ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.remove(appId); - if (shuffleTaskInfo == null) { - LOG.info("Resource for appId[" + appId + "] had been removed before."); - return; - } - final Map<Integer, Roaring64NavigableMap> shuffleToCachedBlockIds = - shuffleTaskInfo.getCachedBlockIds(); - partitionsToBlockIds.remove(appId); - shuffleBufferManager.removeBuffer(appId); - shuffleFlushManager.removeResources(appId); - if (!shuffleToCachedBlockIds.isEmpty()) { - storageManager.removeResources( - new AppPurgeEvent(appId, user, new ArrayList<>(shuffleToCachedBlockIds.keySet()))); - } - if (shuffleTaskInfo.hasHugePartition()) { - ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec(); - ShuffleServerMetrics.gaugeHugePartitionNum.dec(shuffleTaskInfo.getHugePartitionSize()); + public void removeResources(String appId, boolean checkAppExpired) { + Lock lock = getAppLock(appId); + try { + lock.lock(); + LOG.info("Start remove resource for appId[" + appId + "]"); + if (checkAppExpired && !isAppExpired(appId)) { + LOG.info( + "It seems that this appId[{}] has registered a new shuffle, just ignore this AppPurgeEvent event.", + appId); + return; + } + final long start = System.currentTimeMillis(); + String user = getUserByAppId(appId); + ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.remove(appId); + if (shuffleTaskInfo == null) { + LOG.info("Resource for appId[" + appId + "] had been removed before."); + return; + } + + final Map<Integer, Roaring64NavigableMap> shuffleToCachedBlockIds = + shuffleTaskInfo.getCachedBlockIds(); + partitionsToBlockIds.remove(appId); + shuffleBufferManager.removeBuffer(appId); + shuffleFlushManager.removeResources(appId); + if (!shuffleToCachedBlockIds.isEmpty()) { + storageManager.removeResources( + new AppPurgeEvent(appId, user, new ArrayList<>(shuffleToCachedBlockIds.keySet()))); + } + if (shuffleTaskInfo.hasHugePartition()) { + ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec(); + ShuffleServerMetrics.gaugeHugePartitionNum.dec(shuffleTaskInfo.getHugePartitionSize()); + } + LOG.info( + "Finish remove resource for appId[" + + appId + + "] cost " + + (System.currentTimeMillis() - start) + + " ms"); + } finally { + lock.unlock(); } - LOG.info( - "Finish remove resource for appId[" - + appId - + "] cost " - + (System.currentTimeMillis() - start) - + " ms"); } public void refreshAppId(String appId) { diff --git a/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java index 4c4fe38f3..8fe82711a 100644 --- a/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java @@ -158,7 +158,7 @@ public class KerberizedShuffleTaskManagerTest extends KerberizedHadoopBase { assertTrue(fs.exists(new Path(appBasePath))); assertNull(shuffleBufferManager.getBufferPool().get(appId).get(0)); assertNotNull(shuffleBufferManager.getBufferPool().get(appId).get(1)); - shuffleTaskManager.removeResources(appId); + shuffleTaskManager.removeResources(appId, false); assertFalse(fs.exists(new Path(appBasePath))); assertNull(shuffleBufferManager.getBufferPool().get(appId)); } diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java index 767860c5c..db76dd2c6 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java @@ -103,7 +103,7 @@ public class ShuffleServerTest { // Shuffle server is decommissioning, but we can also decommission it again. shuffleServer.decommission(); shuffleServer.cancelDecommission(); - shuffleTaskManager.removeResources(appId); + shuffleTaskManager.removeResources(appId, false); // Wait for 2 seconds, make sure cancel command is work. Thread.sleep(2000); assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus()); diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java index ea32911e3..36a089b0e 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java @@ -48,6 +48,7 @@ import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShufflePartitionedBlock; import org.apache.uniffle.common.ShufflePartitionedData; +import org.apache.uniffle.common.exception.RssException; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.ChecksumUtils; import org.apache.uniffle.common.util.Constants; @@ -150,7 +151,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase { assertEquals(1, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get()); // case5 - shuffleTaskManager.removeResources(appId); + shuffleTaskManager.removeResources(appId, false); assertEquals(0, ShuffleServerMetrics.gaugeHugePartitionNum.get()); assertEquals(0, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get()); } @@ -447,7 +448,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase { assertTrue(fs.exists(new Path(appBasePath))); assertNull(shuffleBufferManager.getBufferPool().get(appId).get(0)); assertNotNull(shuffleBufferManager.getBufferPool().get(appId).get(1)); - shuffleTaskManager.removeResources(appId); + shuffleTaskManager.removeResources(appId, false); } @Test @@ -618,7 +619,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase { new Thread( () -> { try { - shuffleTaskManager.removeResources(appId); + shuffleTaskManager.removeResources(appId, false); } finally { countDownLatch.countDown(); } @@ -1063,4 +1064,62 @@ public class ShuffleTaskManagerTest extends HadoopTestBase { // case3: client max concurrency exceed 30 assertEquals(30, ShuffleTaskManager.getMaxConcurrencyWriting(40, conf)); } + + @Test + public void testRegisterShuffleAfterAppIsExpired() throws Exception { + String confFile = ClassLoader.getSystemResource("server.conf").getFile(); + ShuffleServerConf conf = new ShuffleServerConf(confFile); + final String storageBasePath = HDFS_URI + "rss/testRegisterShuffleAfterAppIsExpired"; + conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true); + conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234); + conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527"); + + shuffleServer = new ShuffleServer(conf); + ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); + + String appId = "appId1"; + shuffleTaskManager.registerShuffle( + appId, + 1, + Lists.newArrayList(new PartitionRange(0, 1)), + new RemoteStorageInfo(storageBasePath, Maps.newHashMap()), + StringUtils.EMPTY); + shuffleTaskManager.refreshAppId(appId); + assertEquals(1, shuffleTaskManager.getAppIds().size()); + + ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35); + shuffleTaskManager.requireBuffer(35); + shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0); + shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0.getBlockList()); + shuffleTaskManager.refreshAppId(appId); + shuffleTaskManager.checkResourceStatus(); + assertEquals(1, shuffleTaskManager.getAppIds().size()); + + // App is expired due to no heartbeat, so it was added to expired queue and will be removed + // resource soon. + Thread thread = + new Thread( + () -> { + try { + Thread.sleep(1000); + shuffleTaskManager.removeResources(appId, true); + } catch (InterruptedException e) { + throw new RssException(e); + } + }); + thread.start(); + + // At this moment, this app re-registers shuffle. + shuffleTaskManager.registerShuffle( + appId, + 2, + Lists.newArrayList(new PartitionRange(0, 1)), + new RemoteStorageInfo(storageBasePath, Maps.newHashMap()), + StringUtils.EMPTY); + Thread.sleep(2000); + + // The NO_REGISTER status code should not appear. + assertTrue(shuffleTaskManager.requireBuffer(appId, 2, Arrays.asList(1), 35) != -4); + shuffleTaskManager.removeResources(appId, false); + } }
