This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c6defb2  [improvement](query) Improve fe high concurrent query 
performance (#7936)
c6defb2 is described below

commit c6defb2fafa42e2bae3b3cca49b2180277570a7d
Author: caiconghui <[email protected]>
AuthorDate: Tue Feb 8 09:54:59 2022 +0800

    [improvement](query) Improve fe high concurrent query performance (#7936)
---
 .../java/org/apache/doris/qe/ConnectScheduler.java | 52 +++++++++++-----------
 .../main/java/org/apache/doris/qe/Coordinator.java |  5 ++-
 .../java/org/apache/doris/qe/QeProcessorImpl.java  |  8 +++-
 .../java/org/apache/doris/qe/StmtExecutor.java     |  4 +-
 4 files changed, 37 insertions(+), 32 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index 505cf00..ecb6048 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -46,10 +46,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class ConnectScheduler {
     private static final Logger LOG = 
LogManager.getLogger(ConnectScheduler.class);
     private int maxConnections;
-    private int numberConnection;
+    private AtomicInteger numberConnection;
     private AtomicInteger nextConnectionId;
-    private Map<Long, ConnectContext> connectionMap = Maps.newConcurrentMap();
-    private Map<String, AtomicInteger> connByUser = Maps.newHashMap();
+    private Map<Integer, ConnectContext> connectionMap = 
Maps.newConcurrentMap();
+    private Map<String, AtomicInteger> connByUser = Maps.newConcurrentMap();
     private ExecutorService executor = 
ThreadPoolManager.newDaemonCacheThreadPool(Config.max_connection_scheduler_threads_num,
 "connect-scheduler-pool", true);
 
     // Use a thread to check whether connection is timeout. Because
@@ -61,7 +61,7 @@ public class ConnectScheduler {
 
     public ConnectScheduler(int maxConnections) {
         this.maxConnections = maxConnections;
-        numberConnection = 0;
+        numberConnection = new AtomicInteger(0);
         nextConnectionId = new AtomicInteger(0);
         checkTimer.scheduleAtFixedRate(new TimeoutChecker(), 0, 1000L, 
TimeUnit.MILLISECONDS);
     }
@@ -70,10 +70,8 @@ public class ConnectScheduler {
         @Override
         public void run() {
             long now = System.currentTimeMillis();
-            synchronized (ConnectScheduler.this) {
-                for (ConnectContext connectContext : connectionMap.values()) {
-                    connectContext.checkTimeout(now);
-                }
+            for (ConnectContext connectContext : connectionMap.values()) {
+                connectContext.checkTimeout(now);
             }
         }
     }
@@ -96,50 +94,50 @@ public class ConnectScheduler {
     }
 
     // Register one connection with its connection id.
-    public synchronized boolean registerConnection(ConnectContext ctx) {
-        if (numberConnection >= maxConnections) {
+    public boolean registerConnection(ConnectContext ctx) {
+        if (numberConnection.incrementAndGet() > maxConnections) {
+            numberConnection.decrementAndGet();
             return false;
         }
         // Check user
-        if (connByUser.get(ctx.getQualifiedUser()) == null) {
-            connByUser.put(ctx.getQualifiedUser(), new AtomicInteger(0));
-        }
-        int conns = connByUser.get(ctx.getQualifiedUser()).get();
+        connByUser.putIfAbsent(ctx.getQualifiedUser(), new AtomicInteger(0));
+        AtomicInteger conns = connByUser.get(ctx.getQualifiedUser());
         if (ctx.getIsTempUser()) {
-            if (conns >= LdapAuthenticate.getMaxConn()) {
+            if (conns.incrementAndGet() > LdapAuthenticate.getMaxConn()) {
+                conns.decrementAndGet();
+                numberConnection.decrementAndGet();
                 return false;
             }
-        } else if (conns >= 
ctx.getCatalog().getAuth().getMaxConn(ctx.getQualifiedUser())) {
+        } else if (conns.incrementAndGet() > 
ctx.getCatalog().getAuth().getMaxConn(ctx.getQualifiedUser())) {
+            conns.decrementAndGet();
+            numberConnection.decrementAndGet();
             return false;
         }
-        numberConnection++;
-        connByUser.get(ctx.getQualifiedUser()).incrementAndGet();
-        connectionMap.put((long) ctx.getConnectionId(), ctx);
+        connectionMap.put(ctx.getConnectionId(), ctx);
         return true;
     }
 
-    public synchronized void unregisterConnection(ConnectContext ctx) {
+    public void unregisterConnection(ConnectContext ctx) {
         ctx.closeTxn();
-        if (connectionMap.remove((long) ctx.getConnectionId()) != null) {
-            numberConnection--;
+        if (connectionMap.remove(ctx.getConnectionId()) != null) {
             AtomicInteger conns = connByUser.get(ctx.getQualifiedUser());
             if (conns != null) {
                 conns.decrementAndGet();
             }
+            numberConnection.decrementAndGet();
         }
     }
 
-    public synchronized ConnectContext getContext(long connectionId) {
+    public ConnectContext getContext(long connectionId) {
         return connectionMap.get(connectionId);
     }
 
-    public synchronized int getConnectionNum() {
-        return numberConnection;
+    public int getConnectionNum() {
+        return numberConnection.get();
     }
 
-    public synchronized List<ConnectContext.ThreadInfo> listConnection(String 
user) {
+    public List<ConnectContext.ThreadInfo> listConnection(String user) {
         List<ConnectContext.ThreadInfo> infos = Lists.newArrayList();
-
         for (ConnectContext ctx : connectionMap.values()) {
             // Check auth
             if (!ctx.getQualifiedUser().equals(user) &&
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 27b072d..dd3fcdc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -471,8 +471,9 @@ public class Coordinator {
                     addressToBackendID.get(execBeAddr),
                     toBrpcHost(execBeAddr),
                     queryOptions.query_timeout * 1000);
-
-            LOG.info("dispatch query job: {} to {}", 
DebugUtil.printId(queryId), topParams.instanceExecParams.get(0).host);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("dispatch query job: {} to {}", 
DebugUtil.printId(queryId), topParams.instanceExecParams.get(0).host);
+            }
 
             if (topDataSink instanceof ResultFileSink
                     && ((ResultFileSink) topDataSink).getStorageType() == 
StorageBackend.StorageType.BROKER) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index c05bf00..449aa7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -81,7 +81,9 @@ public final class QeProcessorImpl implements QeProcessor {
 
     @Override
     public void registerQuery(TUniqueId queryId, QueryInfo info) throws 
UserException {
-        LOG.info("register query id = " + DebugUtil.printId(queryId) + ", job: 
" + info.getCoord().getJobId());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("register query id = " + DebugUtil.printId(queryId) + ", 
job: " + info.getCoord().getJobId());
+        }
         final QueryInfo result = coordinatorMap.putIfAbsent(queryId, info);
         if (result != null) {
             throw new UserException("queryId " + queryId + " already exists");
@@ -121,7 +123,9 @@ public final class QeProcessorImpl implements QeProcessor {
     public void unregisterQuery(TUniqueId queryId) {
         QueryInfo queryInfo = coordinatorMap.remove(queryId);
         if (queryInfo != null) {
-            LOG.info("deregister query id {}", DebugUtil.printId(queryId));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("deregister query id {}", 
DebugUtil.printId(queryId));
+            }
             if (queryInfo.getConnectContext() != null &&
                     
!Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser())
             ) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index a0d11fd..0bad23f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -526,7 +526,9 @@ public class StmtExecutor implements ProfileWriter {
 
     // Analyze one statement to structure in memory.
     public void analyze(TQueryOptions tQueryOptions) throws UserException {
-        LOG.info("begin to analyze stmt: {}, forwarded stmt id: {}", 
context.getStmtId(), context.getForwardedStmtId());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("begin to analyze stmt: {}, forwarded stmt id: {}", 
context.getStmtId(), context.getForwardedStmtId());
+        }
 
         parse();
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to