This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new e32932006ac [fix](connections) fix connection hang after too many
connections #31761
e32932006ac is described below
commit e32932006ace0f5a3f8a7deb9cc3bb0d833b0e7f
Author: camby <[email protected]>
AuthorDate: Wed Mar 6 20:38:14 2024 +0800
[fix](connections) fix connection hang after too many connections #31761
---
.../org/apache/doris/mysql/AcceptListener.java | 97 ++++++++++++----------
.../java/org/apache/doris/mysql/MysqlServer.java | 2 +-
2 files changed, 54 insertions(+), 45 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
index 1bde95c1650..30d0693a6b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
@@ -31,6 +31,7 @@ import org.xnio.StreamConnection;
import org.xnio.channels.AcceptingChannel;
import java.io.IOException;
+import java.util.concurrent.RejectedExecutionException;
/**
* listener for accept mysql connections.
@@ -58,51 +59,59 @@ public class AcceptListener implements
ChannelListener<AcceptingChannel<StreamCo
context.setEnv(Env.getCurrentEnv());
connectScheduler.submit(context);
- channel.getWorker().execute(() -> {
- try {
- // Set thread local info
- context.setThreadLocalInfo();
- context.setConnectScheduler(connectScheduler);
- // authenticate check failed.
- if (!MysqlProto.negotiate(context)) {
- throw new AfterConnectedException("mysql negotiate
failed");
+ try {
+ channel.getWorker().execute(() -> {
+ try {
+ // Set thread local info
+ context.setThreadLocalInfo();
+ context.setConnectScheduler(connectScheduler);
+ // authenticate check failed.
+ if (!MysqlProto.negotiate(context)) {
+ throw new AfterConnectedException("mysql negotiate
failed");
+ }
+ if (connectScheduler.registerConnection(context)) {
+ MysqlProto.sendResponsePacket(context);
+ connection.setCloseListener(
+ streamConnection ->
connectScheduler.unregisterConnection(context));
+ } else {
+
context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS,
+ "Reach limit of connections");
+ MysqlProto.sendResponsePacket(context);
+ throw new AfterConnectedException("Reach limit of
connections");
+ }
+ context.setStartTime();
+ context.setUserQueryTimeout(
+
context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser()));
+ context.setUserInsertTimeout(
+
context.getEnv().getAuth().getInsertTimeout(context.getQualifiedUser()));
+ ConnectProcessor processor = new
ConnectProcessor(context);
+ context.startAcceptQuery(processor);
+ } catch (AfterConnectedException e) {
+ // do not need to print log for this kind of exception.
+ // just clean up the context;
+ context.cleanup();
+ } catch (Throwable e) {
+ // should be unexpected exception, so print warn log
+ if (context.getCurrentUserIdentity() != null) {
+ LOG.warn("connect processor exception because ",
e);
+ } else if (e instanceof Error) {
+ LOG.error("connect processor exception because ",
e);
+ } else {
+ // for unauthrorized access such lvs probe request,
+ // may cause exception, just log it in debug level
+ LOG.debug("connect processor exception because ",
e);
+ }
+ context.cleanup();
+ } finally {
+ ConnectContext.remove();
}
- if (connectScheduler.registerConnection(context)) {
- MysqlProto.sendResponsePacket(context);
- connection.setCloseListener(streamConnection ->
connectScheduler.unregisterConnection(context));
- } else {
-
context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS,
- "Reach limit of connections");
- MysqlProto.sendResponsePacket(context);
- throw new AfterConnectedException("Reach limit of
connections");
- }
- context.setStartTime();
- context.setUserQueryTimeout(
-
context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser()));
- context.setUserInsertTimeout(
-
context.getEnv().getAuth().getInsertTimeout(context.getQualifiedUser()));
- ConnectProcessor processor = new ConnectProcessor(context);
- context.startAcceptQuery(processor);
- } catch (AfterConnectedException e) {
- // do not need to print log for this kind of exception.
- // just clean up the context;
- context.cleanup();
- } catch (Throwable e) {
- // should be unexpected exception, so print warn log
- if (context.getCurrentUserIdentity() != null) {
- LOG.warn("connect processor exception because ", e);
- } else if (e instanceof Error) {
- LOG.error("connect processor exception because ", e);
- } else {
- // for unauthrorized access such lvs probe request,
- // may cause exception, just log it in debug level
- LOG.debug("connect processor exception because ", e);
- }
- context.cleanup();
- } finally {
- ConnectContext.remove();
- }
- });
+ });
+ } catch (RejectedExecutionException e) {
+
context.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS,
+ "Too many connections");
+ MysqlProto.sendResponsePacket(context);
+ context.cleanup();
+ }
} catch (IOException e) {
LOG.warn("Connection accept failed.", e);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java
index 4fb11136132..5f70e3000b9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlServer.java
@@ -51,7 +51,7 @@ public class MysqlServer {
private AcceptingChannel<StreamConnection> server;
// default task service.
- private ExecutorService taskService =
ThreadPoolManager.newDaemonCacheThreadPool(
+ private ExecutorService taskService =
ThreadPoolManager.newDaemonCacheThreadPoolThrowException(
Config.max_mysql_service_task_threads_num, "mysql-nio-pool", true);
public MysqlServer(int port, ConnectScheduler connectScheduler) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]