This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 14113765 [#1070] fix(tez): shuffle server may leak if not register
remote storage. (#1076)
14113765 is described below
commit 14113765ec243ae66eee370c62fc8799bc9ce240
Author: zhengchenyu <[email protected]>
AuthorDate: Fri Aug 4 14:00:25 2023 +0800
[#1070] fix(tez): shuffle server may leak if not register remote storage.
(#1076)
### What changes were proposed in this pull request?
* Merge dynamicClientConf for TezRemoteShuffleManager. Then solve the
problem that can't not register remote storage when tez.rss.storage.type is not
set in client side.
* doCleanup when selected storage is null. The solve the problem that
usedMemory and inFlushSize are leaked.
### Why are the changes needed?
When this bug is trigger, even though no applications are running for long
time, usedMemory and inFlushSize are keeping high.
Fix: #1070
### How was this patch tested?
unit test and test on cluster.
---
.../org/apache/tez/dag/app/RssDAGAppMaster.java | 23 +++++----
.../tez/dag/app/TezRemoteShuffleManager.java | 3 +-
.../uniffle/server/DefaultFlushEventHandler.java | 14 +++++-
.../apache/uniffle/server/ShuffleFlushManager.java | 6 +++
.../server/storage/HadoopStorageManager.java | 1 +
.../server/buffer/ShuffleBufferManagerTest.java | 56 ++++++++++++++++++++++
6 files changed, 92 insertions(+), 11 deletions(-)
diff --git
a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
index 608f26d0..dc498b5f 100644
--- a/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
+++ b/client-tez/src/main/java/org/apache/tez/dag/app/RssDAGAppMaster.java
@@ -200,14 +200,6 @@ public class RssDAGAppMaster extends DAGAppMaster {
heartbeatInterval,
TimeUnit.MILLISECONDS);
- Token<JobTokenIdentifier> sessionToken =
- TokenCache.getSessionToken(appMaster.getContext().getAppCredentials());
- appMaster.setTezRemoteShuffleManager(
- new TezRemoteShuffleManager(
- appMaster.getAppID().toString(), sessionToken, conf,
strAppAttemptId, client));
- appMaster.getTezRemoteShuffleManager().initialize();
- appMaster.getTezRemoteShuffleManager().start();
-
// apply dynamic configuration
boolean dynamicConfEnabled =
conf.getBoolean(
@@ -221,6 +213,21 @@ public class RssDAGAppMaster extends DAGAppMaster {
RssTezConfig.RSS_ACCESS_TIMEOUT_MS_DEFAULT_VALUE));
}
+ Configuration shuffleManagerConf = new Configuration(conf);
+ RssTezUtils.applyDynamicClientConf(shuffleManagerConf,
appMaster.getClusterClientConf());
+
+ Token<JobTokenIdentifier> sessionToken =
+ TokenCache.getSessionToken(appMaster.getContext().getAppCredentials());
+ appMaster.setTezRemoteShuffleManager(
+ new TezRemoteShuffleManager(
+ appMaster.getAppID().toString(),
+ sessionToken,
+ shuffleManagerConf,
+ strAppAttemptId,
+ client));
+ appMaster.getTezRemoteShuffleManager().initialize();
+ appMaster.getTezRemoteShuffleManager().start();
+
mayCloseTezSlowStart(conf);
}
diff --git
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
index b6fb9e56..4060b552 100644
---
a/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
+++
b/client-tez/src/main/java/org/apache/tez/dag/app/TezRemoteShuffleManager.java
@@ -199,7 +199,8 @@ public class TezRemoteShuffleManager implements
ServicePluginLifecycle {
RssTezConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED_DEFAULT_VALUE);
RemoteStorageInfo defaultRemoteStorage =
new RemoteStorageInfo(conf.get(RssTezConfig.RSS_REMOTE_STORAGE_PATH,
""));
- String storageType = conf.get(RssTezConfig.RSS_STORAGE_TYPE);
+ String storageType =
+ conf.get(RssTezConfig.RSS_STORAGE_TYPE,
RssTezConfig.RSS_STORAGE_TYPE_DEFAULT_VALUE);
boolean testMode = conf.getBoolean(RssTezConfig.RSS_TEST_MODE_ENABLE,
false);
ClientUtils.validateTestModeConf(testMode, storageType);
RemoteStorageInfo remoteStorage =
diff --git
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
index 4a90ba28..c3d7aebb 100644
---
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
@@ -23,12 +23,12 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.config.RssBaseConf;
-import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.storage.common.HadoopStorage;
@@ -43,6 +43,7 @@ public class DefaultFlushEventHandler implements
FlushEventHandler {
private final StorageManager storageManager;
private Executor localFileThreadPoolExecutor;
private Executor hadoopThreadPoolExecutor;
+ private Executor fallbackThreadPoolExecutor;
private final StorageType storageType;
protected final BlockingQueue<ShuffleDataFlushEvent> flushQueue =
Queues.newLinkedBlockingQueue();
private Consumer<ShuffleDataFlushEvent> eventConsumer;
@@ -93,6 +94,7 @@ public class DefaultFlushEventHandler implements
FlushEventHandler {
shuffleServerConf.getInteger(ShuffleServerConf.SERVER_FLUSH_HADOOP_THREAD_POOL_SIZE);
hadoopThreadPoolExecutor = createFlushEventExecutor(poolSize,
"HadoopFlushEventThreadPool");
}
+ fallbackThreadPoolExecutor = createFlushEventExecutor(5,
"FallBackFlushEventThreadPool");
startEventProcessor();
}
@@ -119,7 +121,10 @@ public class DefaultFlushEventHandler implements
FlushEventHandler {
} else if (storage instanceof LocalStorage) {
localFileThreadPoolExecutor.execute(() ->
handleEventAndUpdateMetrics(event, true));
} else {
- throw new RssException("Unexpected storage type!");
+ // When we did not select storage for this event, we will ignore this
event.
+ // Then we must doCleanup, or will result to resource leak.
+ fallbackThreadPoolExecutor.execute(() -> event.doCleanup());
+ LOG.error("Found unexpected storage type, will not flush for event
{}.", event);
}
} catch (Exception e) {
LOG.error("Exception happened when process event.", e);
@@ -149,4 +154,9 @@ public class DefaultFlushEventHandler implements
FlushEventHandler {
public void stop() {
stopped = true;
}
+
+ @VisibleForTesting
+ public Executor getFallbackThreadPoolExecutor() {
+ return fallbackThreadPoolExecutor;
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 460d3dea..3335deef 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -296,4 +297,9 @@ public class ShuffleFlushManager {
public ShuffleDataDistributionType getDataDistributionType(String appId) {
return
shuffleServer.getShuffleTaskManager().getDataDistributionType(appId);
}
+
+ @VisibleForTesting
+ public FlushEventHandler getEventHandler() {
+ return eventHandler;
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
index f40d3ffc..09039dfb 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
@@ -138,6 +138,7 @@ public class HadoopStorageManager extends
SingleStorageManager {
return hadoopStorage;
});
appIdToStorages.computeIfAbsent(appId, key ->
pathToStorages.get(remoteStorage));
+ LOG.info("register remote storage {} successfully for appId {}",
remoteStorageInfo, appId);
}
@Override
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 6a466764..2614624c 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -39,6 +40,7 @@ import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.server.DefaultFlushEventHandler;
import org.apache.uniffle.server.ShuffleFlushManager;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
@@ -638,4 +640,58 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
(long) (Runtime.getRuntime().maxMemory() * readRatio),
shuffleBufferManager.getReadCapacity());
}
+
+ @Test
+ public void flushBufferTestWhenNotSelectedStorage(@TempDir File tmpDir)
throws Exception {
+ // In this test, rss.server.single.buffer.flush.threshold and
+ // rss.server.flush.cold.storage.threshold.size are 16.
+ // When cacheShuffleData with 64 bytes, will flush to HDFS storage, but we
do not register
+ // remote storage.
+ // Then storageManager.selectStorage will return null, we should make sure
that when we can not
+ // select a storage,
+ // the resources will not leak.
+ ShuffleServerConf shuffleConf = new ShuffleServerConf();
+ File dataDir = new File(tmpDir, "data");
+ shuffleConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE,
StorageType.LOCALFILE.name());
+ shuffleConf.set(
+ ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(dataDir.getAbsolutePath()));
+ shuffleConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 200L);
+
shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
20.0);
+
shuffleConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
80.0);
+ shuffleConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L *
1024L);
+ shuffleConf.setBoolean(ShuffleServerConf.SINGLE_BUFFER_FLUSH_ENABLED,
true);
+
shuffleConf.setSizeAsBytes(ShuffleServerConf.SINGLE_BUFFER_FLUSH_THRESHOLD,
16L);
+
shuffleConf.setSizeAsBytes(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE,
16L);
+ shuffleConf.set(ShuffleServerConf.RSS_STORAGE_TYPE,
StorageType.LOCALFILE_HDFS.name());
+
+ ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
+ StorageManager storageManager =
+ StorageManagerFactory.getInstance().createStorageManager(shuffleConf);
+ ShuffleFlushManager shuffleFlushManager =
+ new ShuffleFlushManager(shuffleConf, mockShuffleServer,
storageManager);
+ shuffleBufferManager = new ShuffleBufferManager(shuffleConf,
shuffleFlushManager);
+
+
when(mockShuffleServer.getShuffleFlushManager()).thenReturn(shuffleFlushManager);
+
when(mockShuffleServer.getShuffleBufferManager()).thenReturn(shuffleBufferManager);
+
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mock(ShuffleTaskManager.class));
+
+ String appId = "bufferSizeTest";
+ int shuffleId = 1;
+ shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 1);
+ shuffleBufferManager.registerBuffer(appId, shuffleId, 2, 3);
+ shuffleBufferManager.cacheShuffleData(appId, shuffleId, false,
createData(0, 64));
+ shuffleBufferManager.cacheShuffleData(appId, shuffleId, false,
createData(2, 64));
+ // wait flush event drained
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(5))
+ .until(() -> shuffleFlushManager.getEventNumInFlush() == 0);
+ // make sure all cleanup tasks are done.
+ DefaultFlushEventHandler flushEventHandler =
+ (DefaultFlushEventHandler) shuffleFlushManager.getEventHandler();
+ ThreadPoolExecutor executor =
+ ((ThreadPoolExecutor)
flushEventHandler.getFallbackThreadPoolExecutor());
+ executor.shutdown();
+ assertEquals(0, shuffleBufferManager.getUsedMemory());
+ assertEquals(0, shuffleBufferManager.getInFlushSize());
+ }
}