This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 14113765 [#1070] fix(tez): shuffle server may leak if not register 
remote storage. (#1076)
14113765 is described below

commit 14113765ec243ae66eee370c62fc8799bc9ce240
Author: zhengchenyu <[email protected]>
AuthorDate: Fri Aug 4 14:00:25 2023 +0800

    [#1070] fix(tez): shuffle server may leak if not register remote storage. 
(#1076)
    
    ### What changes were proposed in this pull request?
    
    * Merge dynamicClientConf for TezRemoteShuffleManager. Then solve the 
problem that can't not register remote storage when tez.rss.storage.type is not 
set in client side.
    * doCleanup when selected storage is null. The solve the problem that 
usedMemory and inFlushSize are leaked.
    
    ### Why are the changes needed?
    
    When this bug is trigger, even though no applications are running for long 
time, usedMemory and inFlushSize are keeping high.
    
    Fix: #1070
    
    ### How was this patch tested?
    
    unit test and test on cluster.
---
 .../org/apache/tez/dag/app/RssDAGAppMaster.java    | 23 +++++----
 .../tez/dag/app/TezRemoteShuffleManager.java       |  3 +-
 .../uniffle/server/DefaultFlushEventHandler.java   | 14 +++++-
 .../apache/uniffle/server/ShuffleFlushManager.java |  6 +++
 .../server/storage/HadoopStorageManager.java       |  1 +
 .../server/buffer/ShuffleBufferManagerTest.java    | 56 ++++++++++++++++++++++
 6 files changed, 92 insertions(+), 11 deletions(-)

diff --git 
a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java 
b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
index 608f26d0..dc498b5f 100644
--- a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
+++ b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
@@ -200,14 +200,6 @@ public class RssDAGAppMaster extends DAGAppMaster {
         heartbeatInterval,
         TimeUnit.MILLISECONDS);
 
-    Token<JobTokenIdentifier> sessionToken =
-        TokenCache.getSessionToken(appMaster.getContext().getAppCredentials());
-    appMaster.setTezRemoteShuffleManager(
-        new TezRemoteShuffleManager(
-            appMaster.getAppID().toString(), sessionToken, conf, 
strAppAttemptId, client));
-    appMaster.getTezRemoteShuffleManager().initialize();
-    appMaster.getTezRemoteShuffleManager().start();
-
     // apply dynamic configuration
     boolean dynamicConfEnabled =
         conf.getBoolean(
@@ -221,6 +213,21 @@ public class RssDAGAppMaster extends DAGAppMaster {
                   RssTezConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE));
     }
 
+    Configuration shuffleManagerConf = new Configuration(conf);
+    RssTezUtils.applyDynamicClientConf(shuffleManagerConf, 
appMaster.getClusterClientConf());
+
+    Token<JobTokenIdentifier> sessionToken =
+        TokenCache.getSessionToken(appMaster.getContext().getAppCredentials());
+    appMaster.setTezRemoteShuffleManager(
+        new TezRemoteShuffleManager(
+            appMaster.getAppID().toString(),
+            sessionToken,
+            shuffleManagerConf,
+            strAppAttemptId,
+            client));
+    appMaster.getTezRemoteShuffleManager().initialize();
+    appMaster.getTezRemoteShuffleManager().start();
+
     mayCloseTezSlowStart(conf);
   }
 
diff --git 
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java 
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
index b6fb9e56..4060b552 100644
--- 
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
+++ 
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
@@ -199,7 +199,8 @@ public class TezRemoteShuffleManager implements 
ServicePluginLifecycle {
             RssTezConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE);
     RemoteStorageInfo defaultRemoteStorage =
         new RemoteStorageInfo(conf.get(RssTezConfig.RSS_REMOTE_STORAGE_PATH, 
""));
-    String storageType = conf.get(RssTezConfig.RSS_STORAGE_TYPE);
+    String storageType =
+        conf.get(RssTezConfig.RSS_STORAGE_TYPE, 
RssTezConfig.RSS_STORAGE_TYPE_DEFAULT_VALUE);
     boolean testMode = conf.getBoolean(RssTezConfig.RSS_TEST_MODE_ENABLE, 
false);
     ClientUtils.validateTestModeConf(testMode, storageType);
     RemoteStorageInfo remoteStorage =
diff --git 
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
index 4a90ba28..c3d7aebb 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
@@ -23,12 +23,12 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.config.RssBaseConf;
-import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.storage.common.HadoopStorage;
@@ -43,6 +43,7 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
   private final StorageManager storageManager;
   private Executor localFileThreadPoolExecutor;
   private Executor hadoopThreadPoolExecutor;
+  private Executor fallbackThreadPoolExecutor;
   private final StorageType storageType;
   protected final BlockingQueue<ShuffleDataFlushEvent> flushQueue = 
Queues.newLinkedBlockingQueue();
   private Consumer<ShuffleDataFlushEvent> eventConsumer;
@@ -93,6 +94,7 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
           
shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_HADOOP_THREAD_POOL_SIZE);
       hadoopThreadPoolExecutor = createFlushEventExecutor(poolSize, 
"HadoopFlushEventThreadPool");
     }
+    fallbackThreadPoolExecutor = createFlushEventExecutor(5, 
"FallBackFlushEventThreadPool");
     startEventProcessor();
   }
 
@@ -119,7 +121,10 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
       } else if (storage instanceof LocalStorage) {
         localFileThreadPoolExecutor.execute(() -> 
handleEventAndUpdateMetrics(event, true));
       } else {
-        throw new RssException("Unexpected storage type!");
+        // When we did not select storage for this event, we will ignore this 
event.
+        // Then we must doCleanup, or will result to resource leak.
+        fallbackThreadPoolExecutor.execute(() -> event.doCleanup());
+        LOG.error("Found unexpected storage type, will not flush for event 
{}.", event);
       }
     } catch (Exception e) {
       LOG.error("Exception happened when process event.", e);
@@ -149,4 +154,9 @@ public class DefaultFlushEventHandler implements 
FlushEventHandler {
   public void stop() {
     stopped = true;
   }
+
+  @VisibleForTesting
+  public Executor getFallbackThreadPoolExecutor() {
+    return fallbackThreadPoolExecutor;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 460d3dea..3335deef 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -296,4 +297,9 @@ public class ShuffleFlushManager {
   public ShuffleDataDistributionType getDataDistributionType(String appId) {
     return 
shuffleServer.getShuffleTaskManager().getDataDistributionType(appId);
   }
+
+  @VisibleForTesting
+  public FlushEventHandler getEventHandler() {
+    return eventHandler;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
index f40d3ffc..09039dfb 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
@@ -138,6 +138,7 @@ public class HadoopStorageManager extends 
SingleStorageManager {
           return hadoopStorage;
         });
     appIdToStorages.computeIfAbsent(appId, key -> 
pathToStorages.get(remoteStorage));
+    LOG.info("register remote storage {} successfully for appId {}", 
remoteStorageInfo, appId);
   }
 
   @Override
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 6a466764..2614624c 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -39,6 +40,7 @@ import org.apache.uniffle.common.ShufflePartitionedData;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.ByteBufUtils;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.server.DefaultFlushEventHandler;
 import org.apache.uniffle.server.ShuffleFlushManager;
 import org.apache.uniffle.server.ShuffleServer;
 import org.apache.uniffle.server.ShuffleServerConf;
@@ -638,4 +640,58 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
         (long) (Runtime.getRuntime().maxMemory() * readRatio),
         shuffleBufferManager.getReadCapacity());
   }
+
+  @Test
+  public void flushBufferTestWhenNotSelectedStorage(@TempDir File tmpDir) 
throws Exception {
+    // In this test, rss.server.single.buffer.flush.threshold and
+    // rss.server.flush.cold.storage.threshold.size are 16.
+    // When cacheShuffleData with 64 bytes, will flush to HDFS storage, but we 
do not register
+    // remote storage.
+    // Then storageManager.selectStorage will return null, we should make sure 
that when we can not
+    // select a storage,
+    // the resources will not leak.
+    ShuffleServerConf shuffleConf = new ShuffleServerConf();
+    File dataDir = new File(tmpDir, "data");
+    shuffleConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.LOCALFILE.name());
+    shuffleConf.set(
+        ShuffleServerConf.RSS_STORAGE_BASE_PATH, 
Arrays.asList(dataDir.getAbsolutePath()));
+    shuffleConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 200L);
+    
shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
 20.0);
+    
shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
 80.0);
+    shuffleConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 
1024L);
+    shuffleConf.setBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED, 
true);
+    
shuffleConf.setSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD, 
16L);
+    
shuffleConf.setSizeAsBytes(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 
16L);
+    shuffleConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.LOCALFILE_HDFS.name());
+
+    ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
+    StorageManager storageManager =
+        StorageManagerFactory.getInstance().createStorageManager(shuffleConf);
+    ShuffleFlushManager shuffleFlushManager =
+        new ShuffleFlushManager(shuffleConf, mockShuffleServer, 
storageManager);
+    shuffleBufferManager = new ShuffleBufferManager(shuffleConf, 
shuffleFlushManager);
+
+    
when(mockShuffleServer.getShuffleFlushManager()).thenReturn(shuffleFlushManager);
+    
when(mockShuffleServer.getShuffleBufferManager()).thenReturn(shuffleBufferManager);
+    
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mock(ShuffleTaskManager.class));
+
+    String appId = "bufferSizeTest";
+    int shuffleId = 1;
+    shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 1);
+    shuffleBufferManager.registerBuffer(appId, shuffleId, 2, 3);
+    shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, 
createData(0, 64));
+    shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, 
createData(2, 64));
+    // wait flush event drained
+    Awaitility.await()
+        .atMost(Duration.ofSeconds(5))
+        .until(() -> shuffleFlushManager.getEventNumInFlush() == 0);
+    // make sure all cleanup tasks are done.
+    DefaultFlushEventHandler flushEventHandler =
+        (DefaultFlushEventHandler) shuffleFlushManager.getEventHandler();
+    ThreadPoolExecutor executor =
+        ((ThreadPoolExecutor) 
flushEventHandler.getFallbackThreadPoolExecutor());
+    executor.shutdown();
+    assertEquals(0, shuffleBufferManager.getUsedMemory());
+    assertEquals(0, shuffleBufferManager.getInFlushSize());
+  }
 }

Reply via email to