This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0883d47832c [Enhance](broker) add inputstream expire scheduled checker 
to avoid memory leak for broker scan (#28589)
0883d47832c is described below

commit 0883d47832c4bd8a674a612b7a99937466ecdbaa
Author: DuRipeng <[email protected]>
AuthorDate: Tue Dec 19 19:24:29 2023 +0800

    [Enhance](broker) add inputstream expire scheduled checker to avoid memory 
leak for broker scan (#28589)
    
    This pr introduces 2 broker conf:
    
    1. enable_input_stream_expire_check: which indicates whether enable 
inputStream expire check.
    2. input_stream_expire_seconds: which indicates the timeout seconds for 
inputStream since last update.
---
 .../org/apache/doris/broker/hdfs/BrokerConfig.java |  6 +++
 .../doris/broker/hdfs/ClientContextManager.java    | 60 ++++++++++++++++++++--
 .../doris/broker/hdfs/FileSystemManager.java       |  4 +-
 3 files changed, 62 insertions(+), 8 deletions(-)

diff --git 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
index b5d4ca996f7..5fd17706a1c 100644
--- 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
+++ 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/BrokerConfig.java
@@ -31,6 +31,12 @@ public class BrokerConfig extends ConfigBase {
     @ConfField
     public static int client_expire_seconds = 3600;
 
+    @ConfField
+    public static boolean enable_input_stream_expire_check = false;
+
+    @ConfField
+    public static int input_stream_expire_seconds = 300;
+
     @ConfField
     public static int broker_ipc_port = 8000;
 }
diff --git 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
index 736f9ae448f..2df4c12a7fd 100644
--- 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
+++ 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java
@@ -17,9 +17,13 @@
 
 package org.apache.doris.broker.hdfs;
 
+import java.util.Iterator;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -32,16 +36,21 @@ public class ClientContextManager {
 
     private static Logger logger = Logger
         .getLogger(ClientContextManager.class.getName());
-    private ScheduledExecutorService executorService;
+    private ScheduledExecutorService clientCheckExecutorService;
+    private ScheduledExecutorService inputStreamCheckExecuterService;
     private ConcurrentHashMap<String, ClientResourceContext> clientContexts;
     private ConcurrentHashMap<TBrokerFD, String> fdToClientMap;
     private int clientExpirationSeconds = BrokerConfig.client_expire_seconds;
 
-    public ClientContextManager(ScheduledExecutorService executorService) {
+    public ClientContextManager() {
         clientContexts = new ConcurrentHashMap<>();
         fdToClientMap = new ConcurrentHashMap<>();
-        this.executorService = executorService;
-        this.executorService.schedule(new CheckClientExpirationTask(), 0, 
TimeUnit.SECONDS);
+        this.clientCheckExecutorService = Executors.newScheduledThreadPool(2);
+        this.clientCheckExecutorService.schedule(new 
CheckClientExpirationTask(), 0, TimeUnit.SECONDS);
+        if (BrokerConfig.enable_input_stream_expire_check) {
+            this.inputStreamCheckExecuterService = 
Executors.newScheduledThreadPool(2);
+            this.inputStreamCheckExecuterService.schedule(new 
CheckInputStreamExpirationTask(), 0, TimeUnit.SECONDS);
+        }
     }
 
     public void onPing(String clientId) {
@@ -126,6 +135,25 @@ public class ClientContextManager {
         }
     }
 
+    public synchronized void remoteExpireInputStreams() {
+        int inputStreamExpireSeconds = 
BrokerConfig.input_stream_expire_seconds;
+        TBrokerFD fd;
+        for (ClientResourceContext clientContext : clientContexts.values()) {
+            Iterator<Entry<TBrokerFD, BrokerInputStream>> iter = 
clientContext.inputStreams.entrySet().iterator();
+            while (iter.hasNext()) {
+                Entry<TBrokerFD, BrokerInputStream> entry = iter.next();
+                fd = entry.getKey();
+                if (entry.getValue().checkExpire(inputStreamExpireSeconds)) {
+                    ClientContextManager.this.removeInputStream(fd);
+                }
+                iter.remove();
+                logger.info(fd + " in client [" + clientContext.clientId
+                        + "] is expired, remove it from contexts. last update 
time is "
+                        + entry.getValue().getLastPingTimestamp());
+            }
+        }
+    }
+
     class CheckClientExpirationTask implements Runnable {
         @Override
         public void run() {
@@ -145,7 +173,18 @@ public class ClientContextManager {
                     }
                 }
             } finally {
-                ClientContextManager.this.executorService.schedule(this, 60, 
TimeUnit.SECONDS);
+                
ClientContextManager.this.clientCheckExecutorService.schedule(this, 60, 
TimeUnit.SECONDS);
+            }
+        }
+    }
+
+    class CheckInputStreamExpirationTask implements Runnable {
+        @Override
+        public void run() {
+            try {
+                ClientContextManager.this.remoteExpireInputStreams();
+            } finally {
+                
ClientContextManager.this.inputStreamCheckExecuterService.schedule(this, 60, 
TimeUnit.SECONDS);
             }
         }
     }
@@ -175,21 +214,32 @@ public class ClientContextManager {
 
         private final FSDataInputStream inputStream;
         private final BrokerFileSystem brokerFileSystem;
+        private AtomicLong lastPingTimestamp;
 
         public BrokerInputStream(FSDataInputStream inputStream, 
BrokerFileSystem brokerFileSystem) {
             this.inputStream = inputStream;
             this.brokerFileSystem = brokerFileSystem;
             this.brokerFileSystem.updateLastUpdateAccessTime();
+            this.lastPingTimestamp = new 
AtomicLong(System.currentTimeMillis());
         }
 
         public FSDataInputStream getInputStream() {
             this.brokerFileSystem.updateLastUpdateAccessTime();
+            this.lastPingTimestamp.set(System.currentTimeMillis());
             return inputStream;
         }
 
         public void updateLastUpdateAccessTime() {
             this.brokerFileSystem.updateLastUpdateAccessTime();
         }
+
+        public boolean checkExpire(long expireSecond) {
+            return System.currentTimeMillis() - lastPingTimestamp.get() > 
expireSecond * 1000;
+        }
+
+        public long getLastPingTimestamp() {
+            return lastPingTimestamp.get();
+        }
     }
 
     static class ClientResourceContext {
diff --git 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 58a09c7e090..beeff588bcb 100644
--- 
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ 
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -163,8 +163,6 @@ public class FileSystemManager {
     private static final String DFS_CLIENT_AUTH_METHOD = 
"dfs.client.auth.method";
     private static final String DFS_RPC_TIMEOUT = "dfs.rpc.timeout";
 
-    private ScheduledExecutorService handleManagementPool = 
Executors.newScheduledThreadPool(2);
-
     private int readBufferSize = 128 << 10; // 128k
     private int writeBufferSize = 128 << 10; // 128k
 
@@ -173,7 +171,7 @@ public class FileSystemManager {
 
     public FileSystemManager() {
         cachedFileSystem = new ConcurrentHashMap<>();
-        clientContextManager = new ClientContextManager(handleManagementPool);
+        clientContextManager = new ClientContextManager();
         readBufferSize = BrokerConfig.hdfs_read_buffer_size_kb << 10;
         writeBufferSize = BrokerConfig.hdfs_write_buffer_size_kb << 10;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to