This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 4ae9451b637 branch-2.1: [opt](connection) add connection num in error
msg (#49471) (#49910)
4ae9451b637 is described below
commit 4ae9451b63744b938ae7d3959efbf310bba85803
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Thu Apr 10 07:00:51 2025 -0700
branch-2.1: [opt](connection) add connection num in error msg (#49471)
(#49910)
bp #49471
---
.../java/org/apache/doris/mysql/AcceptListener.java | 11 +++++++----
.../java/org/apache/doris/qe/ConnectScheduler.java | 14 ++++++++++----
.../sessions/FlightSessionsWithTokenManager.java | 20 ++++++++++++++------
3 files changed, 31 insertions(+), 14 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
index 3d783f28cb3..0388a532efd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
@@ -89,15 +89,18 @@ public class AcceptListener implements
ChannelListener<AcceptingChannel<StreamCo
if (!MysqlProto.negotiate(context)) {
throw new AfterConnectedException("mysql negotiate
failed");
}
- if (connectScheduler.registerConnection(context)) {
+ int res = connectScheduler.registerConnection(context);
+ if (res == -1) {
MysqlProto.sendResponsePacket(context);
connection.setCloseListener(
streamConnection ->
connectScheduler.unregisterConnection(context));
} else {
-
context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS,
- "Reach limit of connections");
+ long userConnLimit =
context.getEnv().getAuth().getMaxConn(context.getQualifiedUser());
+ String errMsg = String.format("Reach limit of
connections. Total: %, User: %d, Current: %d",
+ connectScheduler.getMaxConnections(),
userConnLimit, res);
+
context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, errMsg);
MysqlProto.sendResponsePacket(context);
- throw new AfterConnectedException("Reach limit of
connections");
+ throw new AfterConnectedException(errMsg);
}
context.setStartTime();
int userQueryTimeout =
context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
index e4626a0d215..1618ca2dd43 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectScheduler.java
@@ -90,10 +90,12 @@ public class ConnectScheduler {
}
// Register one connection with its connection id.
- public boolean registerConnection(ConnectContext ctx) {
+ // Return -1 means register OK
+ // Return >=0 means register failed, and return value is current
connection num.
+ public int registerConnection(ConnectContext ctx) {
if (numberConnection.incrementAndGet() > maxConnections) {
numberConnection.decrementAndGet();
- return false;
+ return numberConnection.get();
}
// Check user
connByUser.putIfAbsent(ctx.getQualifiedUser(), new AtomicInteger(0));
@@ -101,13 +103,13 @@ public class ConnectScheduler {
if (conns.incrementAndGet() >
ctx.getEnv().getAuth().getMaxConn(ctx.getQualifiedUser())) {
conns.decrementAndGet();
numberConnection.decrementAndGet();
- return false;
+ return numberConnection.get();
}
connectionMap.put(ctx.getConnectionId(), ctx);
if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
flightToken2ConnectionId.put(ctx.getPeerIdentity(),
ctx.getConnectionId());
}
- return true;
+ return -1;
}
public void unregisterConnection(ConnectContext ctx) {
@@ -202,4 +204,8 @@ public class ConnectScheduler {
public Map<Integer, ConnectContext> getConnectionMap() {
return connectionMap;
}
+
+ public int getMaxConnections() {
+ return maxConnections;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java
index 75d7ff1b334..a495b02c393 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java
@@ -20,6 +20,7 @@ package org.apache.doris.service.arrowflight.sessions;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.util.Util;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ConnectScheduler;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.arrowflight.tokens.FlightTokenDetails;
import org.apache.doris.service.arrowflight.tokens.FlightTokenManager;
@@ -65,12 +66,19 @@ public class FlightSessionsWithTokenManager implements
FlightSessionsManager {
flightTokenDetails.setCreatedSession(true);
ConnectContext connectContext =
FlightSessionsManager.buildConnectContext(peerIdentity,
flightTokenDetails.getUserIdentity(),
flightTokenDetails.getRemoteIp());
- ExecuteEnv.getInstance().getScheduler().submit(connectContext);
- if
(!ExecuteEnv.getInstance().getScheduler().registerConnection(connectContext)) {
- String err = "Reach limit of connections, increase
`qe_max_connection` in fe.conf, or decrease "
- + "`arrow_flight_token_cache_size` to evict unused bearer
tokens and it connections faster";
-
connectContext.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS,
err);
- throw new IllegalArgumentException(err);
+ ConnectScheduler connectScheduler =
ExecuteEnv.getInstance().getScheduler();
+ connectScheduler.submit(connectContext);
+ int res = connectScheduler.registerConnection(connectContext);
+ if (res >= 0) {
+ long userConnLimit =
connectContext.getEnv().getAuth().getMaxConn(connectContext.getQualifiedUser());
+ String errMsg = String.format(
+ "Reach limit of connections. Total: %d, User: %d, Current:
%d. "
+ + "Increase `qe_max_connection` in fe.conf or
user's `max_user_connections`,"
+ + " or decrease `arrow_flight_token_cache_size` "
+ + "to evict unused bearer tokens and it
connections faster",
+ connectScheduler.getMaxConnections(), userConnLimit, res);
+
connectContext.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS,
errMsg);
+ throw new IllegalArgumentException(errMsg);
}
return connectContext;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]