This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new c6defb2 [improvement](query) Improve fe high concurrent query
performance (#7936)
c6defb2 is described below
commit c6defb2fafa42e2bae3b3cca49b2180277570a7d
Author: caiconghui <[email protected]>
AuthorDate: Tue Feb 8 09:54:59 2022 +0800
[improvement](query) Improve fe high concurrent query performance (#7936)
---
.../java/org/apache/doris/qe/ConnectScheduler.java | 52 +++++++++++-----------
.../main/java/org/apache/doris/qe/Coordinator.java | 5 ++-
.../java/org/apache/doris/qe/QeProcessorImpl.java | 8 +++-
.../java/org/apache/doris/qe/StmtExecutor.java | 4 +-
4 files changed, 37 insertions(+), 32 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index 505cf00..ecb6048 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -46,10 +46,10 @@ import java.util.concurrent.atomic.AtomicInteger;
public class ConnectScheduler {
private static final Logger LOG =
LogManager.getLogger(ConnectScheduler.class);
private int maxConnections;
- private int numberConnection;
+ private AtomicInteger numberConnection;
private AtomicInteger nextConnectionId;
- private Map<Long, ConnectContext> connectionMap = Maps.newConcurrentMap();
- private Map<String, AtomicInteger> connByUser = Maps.newHashMap();
+ private Map<Integer, ConnectContext> connectionMap =
Maps.newConcurrentMap();
+ private Map<String, AtomicInteger> connByUser = Maps.newConcurrentMap();
private ExecutorService executor =
ThreadPoolManager.newDaemonCacheThreadPool(Config.max_connection_scheduler_threads_num,
"connect-scheduler-pool", true);
// Use a thread to check whether connection is timeout. Because
@@ -61,7 +61,7 @@ public class ConnectScheduler {
public ConnectScheduler(int maxConnections) {
this.maxConnections = maxConnections;
- numberConnection = 0;
+ numberConnection = new AtomicInteger(0);
nextConnectionId = new AtomicInteger(0);
checkTimer.scheduleAtFixedRate(new TimeoutChecker(), 0, 1000L,
TimeUnit.MILLISECONDS);
}
@@ -70,10 +70,8 @@ public class ConnectScheduler {
@Override
public void run() {
long now = System.currentTimeMillis();
- synchronized (ConnectScheduler.this) {
- for (ConnectContext connectContext : connectionMap.values()) {
- connectContext.checkTimeout(now);
- }
+ for (ConnectContext connectContext : connectionMap.values()) {
+ connectContext.checkTimeout(now);
}
}
}
@@ -96,50 +94,50 @@ public class ConnectScheduler {
}
// Register one connection with its connection id.
- public synchronized boolean registerConnection(ConnectContext ctx) {
- if (numberConnection >= maxConnections) {
+ public boolean registerConnection(ConnectContext ctx) {
+ if (numberConnection.incrementAndGet() > maxConnections) {
+ numberConnection.decrementAndGet();
return false;
}
// Check user
- if (connByUser.get(ctx.getQualifiedUser()) == null) {
- connByUser.put(ctx.getQualifiedUser(), new AtomicInteger(0));
- }
- int conns = connByUser.get(ctx.getQualifiedUser()).get();
+ connByUser.putIfAbsent(ctx.getQualifiedUser(), new AtomicInteger(0));
+ AtomicInteger conns = connByUser.get(ctx.getQualifiedUser());
if (ctx.getIsTempUser()) {
- if (conns >= LdapAuthenticate.getMaxConn()) {
+ if (conns.incrementAndGet() > LdapAuthenticate.getMaxConn()) {
+ conns.decrementAndGet();
+ numberConnection.decrementAndGet();
return false;
}
- } else if (conns >=
ctx.getCatalog().getAuth().getMaxConn(ctx.getQualifiedUser())) {
+ } else if (conns.incrementAndGet() >
ctx.getCatalog().getAuth().getMaxConn(ctx.getQualifiedUser())) {
+ conns.decrementAndGet();
+ numberConnection.decrementAndGet();
return false;
}
- numberConnection++;
- connByUser.get(ctx.getQualifiedUser()).incrementAndGet();
- connectionMap.put((long) ctx.getConnectionId(), ctx);
+ connectionMap.put(ctx.getConnectionId(), ctx);
return true;
}
- public synchronized void unregisterConnection(ConnectContext ctx) {
+ public void unregisterConnection(ConnectContext ctx) {
ctx.closeTxn();
- if (connectionMap.remove((long) ctx.getConnectionId()) != null) {
- numberConnection--;
+ if (connectionMap.remove(ctx.getConnectionId()) != null) {
AtomicInteger conns = connByUser.get(ctx.getQualifiedUser());
if (conns != null) {
conns.decrementAndGet();
}
+ numberConnection.decrementAndGet();
}
}
- public synchronized ConnectContext getContext(long connectionId) {
+ public ConnectContext getContext(long connectionId) {
return connectionMap.get(connectionId);
}
- public synchronized int getConnectionNum() {
- return numberConnection;
+ public int getConnectionNum() {
+ return numberConnection.get();
}
- public synchronized List<ConnectContext.ThreadInfo> listConnection(String
user) {
+ public List<ConnectContext.ThreadInfo> listConnection(String user) {
List<ConnectContext.ThreadInfo> infos = Lists.newArrayList();
-
for (ConnectContext ctx : connectionMap.values()) {
// Check auth
if (!ctx.getQualifiedUser().equals(user) &&
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 27b072d..dd3fcdc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -471,8 +471,9 @@ public class Coordinator {
addressToBackendID.get(execBeAddr),
toBrpcHost(execBeAddr),
queryOptions.query_timeout * 1000);
-
- LOG.info("dispatch query job: {} to {}",
DebugUtil.printId(queryId), topParams.instanceExecParams.get(0).host);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("dispatch query job: {} to {}",
DebugUtil.printId(queryId), topParams.instanceExecParams.get(0).host);
+ }
if (topDataSink instanceof ResultFileSink
&& ((ResultFileSink) topDataSink).getStorageType() ==
StorageBackend.StorageType.BROKER) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
index c05bf00..449aa7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QeProcessorImpl.java
@@ -81,7 +81,9 @@ public final class QeProcessorImpl implements QeProcessor {
@Override
public void registerQuery(TUniqueId queryId, QueryInfo info) throws
UserException {
- LOG.info("register query id = " + DebugUtil.printId(queryId) + ", job:
" + info.getCoord().getJobId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("register query id = " + DebugUtil.printId(queryId) + ",
job: " + info.getCoord().getJobId());
+ }
final QueryInfo result = coordinatorMap.putIfAbsent(queryId, info);
if (result != null) {
throw new UserException("queryId " + queryId + " already exists");
@@ -121,7 +123,9 @@ public final class QeProcessorImpl implements QeProcessor {
public void unregisterQuery(TUniqueId queryId) {
QueryInfo queryInfo = coordinatorMap.remove(queryId);
if (queryInfo != null) {
- LOG.info("deregister query id {}", DebugUtil.printId(queryId));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("deregister query id {}",
DebugUtil.printId(queryId));
+ }
if (queryInfo.getConnectContext() != null &&
!Strings.isNullOrEmpty(queryInfo.getConnectContext().getQualifiedUser())
) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index a0d11fd..0bad23f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -526,7 +526,9 @@ public class StmtExecutor implements ProfileWriter {
// Analyze one statement to structure in memory.
public void analyze(TQueryOptions tQueryOptions) throws UserException {
- LOG.info("begin to analyze stmt: {}, forwarded stmt id: {}",
context.getStmtId(), context.getForwardedStmtId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("begin to analyze stmt: {}, forwarded stmt id: {}",
context.getStmtId(), context.getForwardedStmtId());
+ }
parse();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]