This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0883d47832c [Enhance](broker) add inputstream expire scheduled checker
to avoid memory leak for broker scan (#28589)
0883d47832c is described below
commit 0883d47832c4bd8a674a612b7a99937466ecdbaa
Author: DuRipeng <[email protected]>
AuthorDate: Tue Dec 19 19:24:29 2023 +0800
[Enhance](broker) add inputstream expire scheduled checker to avoid memory
leak for broker scan (#28589)
This pr introduces 2 broker conf:
1. enable_input_stream_expire_check: which indicates whether enable
inputStream expire check.
2. input_stream_expire_seconds: which indicates the timeout seconds for
inputStream since last update.
---
.../org/apache/doris/broker/hdfs/BrokerConfig.java | 6 +++
.../doris/broker/hdfs/ClientContextManager.java | 60 ++++++++++++++++++++--
.../doris/broker/hdfs/FileSystemManager.java | 4 +-
3 files changed, 62 insertions(+), 8 deletions(-)
diff --git
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
index b5d4ca996f7..5fd17706a1c 100644
---
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
+++
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
@@ -31,6 +31,12 @@ public class BrokerConfig extends ConfigBase {
@ConfField
public static int client_expire_seconds = 3600;
+ @ConfField
+ public static boolean enable_input_stream_expire_check = false;
+
+ @ConfField
+ public static int input_stream_expire_seconds = 300;
+
@ConfField
public static int broker_ipc_port = 8000;
}
diff --git
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
index 736f9ae448f..2df4c12a7fd 100644
---
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
+++
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
@@ -17,9 +17,13 @@
package org.apache.doris.broker.hdfs;
+import java.util.Iterator;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -32,16 +36,21 @@ public class ClientContextManager {
private static Logger logger = Logger
.getLogger(ClientContextManager.class.getName());
- private ScheduledExecutorService executorService;
+ private ScheduledExecutorService clientCheckExecutorService;
+ private ScheduledExecutorService inputStreamCheckExecuterService;
private ConcurrentHashMap<String, ClientResourceContext> clientContexts;
private ConcurrentHashMap<TBrokerFD, String> fdToClientMap;
private int clientExpirationSeconds = BrokerConfig.client_expire_seconds;
- public ClientContextManager(ScheduledExecutorService executorService) {
+ public ClientContextManager() {
clientContexts = new ConcurrentHashMap<>();
fdToClientMap = new ConcurrentHashMap<>();
- this.executorService = executorService;
- this.executorService.schedule(new CheckClientExpirationTask(), 0,
TimeUnit.SECONDS);
+ this.clientCheckExecutorService = Executors.newScheduledThreadPool(2);
+ this.clientCheckExecutorService.schedule(new
CheckClientExpirationTask(), 0, TimeUnit.SECONDS);
+ if (BrokerConfig.enable_input_stream_expire_check) {
+ this.inputStreamCheckExecuterService =
Executors.newScheduledThreadPool(2);
+ this.inputStreamCheckExecuterService.schedule(new
CheckInputStreamExpirationTask(), 0, TimeUnit.SECONDS);
+ }
}
public void onPing(String clientId) {
@@ -126,6 +135,25 @@ public class ClientContextManager {
}
}
+ public synchronized void remoteExpireInputStreams() {
+ int inputStreamExpireSeconds =
BrokerConfig.input_stream_expire_seconds;
+ TBrokerFD fd;
+ for (ClientResourceContext clientContext : clientContexts.values()) {
+ Iterator<Entry<TBrokerFD, BrokerInputStream>> iter =
clientContext.inputStreams.entrySet().iterator();
+ while (iter.hasNext()) {
+ Entry<TBrokerFD, BrokerInputStream> entry = iter.next();
+ fd = entry.getKey();
+ if (entry.getValue().checkExpire(inputStreamExpireSeconds)) {
+ ClientContextManager.this.removeInputStream(fd);
+ }
+ iter.remove();
+ logger.info(fd + " in client [" + clientContext.clientId
+ + "] is expired, remove it from contexts. last update
time is "
+ + entry.getValue().getLastPingTimestamp());
+ }
+ }
+ }
+
class CheckClientExpirationTask implements Runnable {
@Override
public void run() {
@@ -145,7 +173,18 @@ public class ClientContextManager {
}
}
} finally {
- ClientContextManager.this.executorService.schedule(this, 60,
TimeUnit.SECONDS);
+
ClientContextManager.this.clientCheckExecutorService.schedule(this, 60,
TimeUnit.SECONDS);
+ }
+ }
+ }
+
+ class CheckInputStreamExpirationTask implements Runnable {
+ @Override
+ public void run() {
+ try {
+ ClientContextManager.this.remoteExpireInputStreams();
+ } finally {
+
ClientContextManager.this.inputStreamCheckExecuterService.schedule(this, 60,
TimeUnit.SECONDS);
}
}
}
@@ -175,21 +214,32 @@ public class ClientContextManager {
private final FSDataInputStream inputStream;
private final BrokerFileSystem brokerFileSystem;
+ private AtomicLong lastPingTimestamp;
public BrokerInputStream(FSDataInputStream inputStream,
BrokerFileSystem brokerFileSystem) {
this.inputStream = inputStream;
this.brokerFileSystem = brokerFileSystem;
this.brokerFileSystem.updateLastUpdateAccessTime();
+ this.lastPingTimestamp = new
AtomicLong(System.currentTimeMillis());
}
public FSDataInputStream getInputStream() {
this.brokerFileSystem.updateLastUpdateAccessTime();
+ this.lastPingTimestamp.set(System.currentTimeMillis());
return inputStream;
}
public void updateLastUpdateAccessTime() {
this.brokerFileSystem.updateLastUpdateAccessTime();
}
+
+ public boolean checkExpire(long expireSecond) {
+ return System.currentTimeMillis() - lastPingTimestamp.get() >
expireSecond * 1000;
+ }
+
+ public long getLastPingTimestamp() {
+ return lastPingTimestamp.get();
+ }
}
static class ClientResourceContext {
diff --git
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 58a09c7e090..beeff588bcb 100644
---
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -163,8 +163,6 @@ public class FileSystemManager {
private static final String DFS_CLIENT_AUTH_METHOD =
"dfs.client.auth.method";
private static final String DFS_RPC_TIMEOUT = "dfs.rpc.timeout";
- private ScheduledExecutorService handleManagementPool =
Executors.newScheduledThreadPool(2);
-
private int readBufferSize = 128 << 10; // 128k
private int writeBufferSize = 128 << 10; // 128k
@@ -173,7 +171,7 @@ public class FileSystemManager {
public FileSystemManager() {
cachedFileSystem = new ConcurrentHashMap<>();
- clientContextManager = new ClientContextManager(handleManagementPool);
+ clientContextManager = new ClientContextManager();
readBufferSize = BrokerConfig.hdfs_read_buffer_size_kb << 10;
writeBufferSize = BrokerConfig.hdfs_write_buffer_size_kb << 10;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]