This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 3febac1d916 [fix](connection) kill connection when meeting Write mysql
packet failed error #36559 (#36616)
3febac1d916 is described below
commit 3febac1d916c0219d91c61b8c65feba2b64383fa
Author: Mingyu Chen <[email protected]>
AuthorDate: Thu Jun 20 22:27:01 2024 +0800
[fix](connection) kill connection when meeting Write mysql packet failed
error #36559 (#36616)
bp #36559
---
.../apache/doris/common/ConnectionException.java | 35 ++++++++++++++++++++++
.../java/org/apache/doris/mysql/MysqlChannel.java | 7 +++--
.../java/org/apache/doris/qe/ConnectProcessor.java | 17 +++++++----
.../org/apache/doris/qe/MysqlConnectProcessor.java | 5 ++--
.../arrowflight/FlightSqlConnectProcessor.java | 3 +-
5 files changed, 57 insertions(+), 10 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/ConnectionException.java
b/fe/fe-core/src/main/java/org/apache/doris/common/ConnectionException.java
new file mode 100644
index 00000000000..3f1de2ae2b8
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ConnectionException.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common;
+
+import java.io.IOException;
+
+/**
+ * This is a special exception.
+ * If this exception is thrown, it means that the connection to the server is
abnormal.
+ * We need to kill the connection actively.
+ */
+public class ConnectionException extends IOException {
+ public ConnectionException(String message) {
+ super(message);
+ }
+
+ public ConnectionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
index d22ba393699..392b0587585 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
@@ -17,6 +17,7 @@
package org.apache.doris.mysql;
+import org.apache.doris.common.ConnectionException;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
@@ -401,11 +402,13 @@ public class MysqlChannel implements BytesChannel {
protected void realNetSend(ByteBuffer buffer) throws IOException {
buffer = encryptData(buffer);
long bufLen = buffer.remaining();
+ long start = System.currentTimeMillis();
long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer,
context.getNetWriteTimeout(),
TimeUnit.SECONDS);
if (bufLen != writeLen) {
- throw new IOException("Write mysql packet failed.[write=" +
writeLen
- + ", needToWrite=" + bufLen + "]");
+ long duration = System.currentTimeMillis() - start;
+ throw new ConnectionException("Write mysql packet failed.[write="
+ writeLen
+ + ", needToWrite=" + bufLen + "], duration: " + duration +
" ms");
}
Channels.flushBlocking(conn.getSinkChannel(),
context.getNetWriteTimeout(), TimeUnit.SECONDS);
isSend = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
index d54b708e818..51911c03333 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
@@ -32,6 +32,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
+import org.apache.doris.common.ConnectionException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.NotImplementedException;
@@ -198,9 +199,11 @@ public abstract class ConnectProcessor {
}
// only throw an exception when there is a problem interacting with the
requesting client
- protected void handleQuery(MysqlCommand mysqlCommand, String originStmt) {
+ protected void handleQuery(MysqlCommand mysqlCommand, String originStmt)
throws ConnectionException {
try {
executeQuery(mysqlCommand, originStmt);
+ } catch (ConnectionException exception) {
+ throw exception;
} catch (Exception ignored) {
// saved use handleQueryException
}
@@ -414,14 +417,18 @@ public abstract class ConnectProcessor {
// Use a handler for exception to avoid big try catch block which is a
little hard to understand
protected void handleQueryException(Throwable throwable, String origStmt,
- StatementBase parsedStmt, Data.PQueryStatistics statistics) {
+ StatementBase parsedStmt, Data.PQueryStatistics statistics) throws
ConnectionException {
if (ctx.getMinidump() != null) {
MinidumpUtils.saveMinidumpString(ctx.getMinidump(),
DebugUtil.printId(ctx.queryId()));
}
- if (throwable instanceof IOException) {
+ if (throwable instanceof ConnectionException) {
+ // Throw this exception to close the connection outside.
+ LOG.warn("Process one query failed because ConnectionException: ",
throwable);
+ throw (ConnectionException) throwable;
+ } else if (throwable instanceof IOException) {
// Client failed.
LOG.warn("Process one query failed because IOException: ",
throwable);
- ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Doris
process failed");
+ ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Doris
process failed: " + throwable.getMessage());
} else if (throwable instanceof UserException) {
LOG.warn("Process one query failed because.", throwable);
ctx.getState().setError(((UserException)
throwable).getMysqlErrorCode(), throwable.getMessage());
@@ -479,7 +486,7 @@ public abstract class ConnectProcessor {
// Get the column definitions of a table
@SuppressWarnings("rawtypes")
- protected void handleFieldList(String tableName) {
+ protected void handleFieldList(String tableName) throws
ConnectionException {
// Already get command code.
if (Strings.isNullOrEmpty(tableName)) {
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_TABLE, "Empty
tableName");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java
index fc19330268c..6637f62f1ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MysqlConnectProcessor.java
@@ -24,6 +24,7 @@ import org.apache.doris.analysis.PrepareStmt;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.MysqlColType;
+import org.apache.doris.common.ConnectionException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.mysql.MysqlChannel;
@@ -245,7 +246,7 @@ public class MysqlConnectProcessor extends ConnectProcessor
{
}
// Process COM_QUERY statement,
- private void handleQuery(MysqlCommand mysqlCommand) {
+ private void handleQuery(MysqlCommand mysqlCommand) throws
ConnectionException {
// convert statement to Java string
byte[] bytes = packetBuf.array();
int ending = packetBuf.limit() - 1;
@@ -307,7 +308,7 @@ public class MysqlConnectProcessor extends ConnectProcessor
{
}
}
- private void handleFieldList() {
+ private void handleFieldList() throws ConnectionException {
String tableName = new
String(MysqlProto.readNulTerminateString(packetBuf), StandardCharsets.UTF_8);
handleFieldList(tableName);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
index 6f51c0391af..a4aa5a88c8f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
@@ -18,6 +18,7 @@
package org.apache.doris.service.arrowflight;
import org.apache.doris.analysis.Expr;
+import org.apache.doris.common.ConnectionException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Status;
@@ -81,7 +82,7 @@ public class FlightSqlConnectProcessor extends
ConnectProcessor implements AutoC
ctx.setStartTime();
}
- public void handleQuery(String query) {
+ public void handleQuery(String query) throws ConnectionException {
MysqlCommand command = MysqlCommand.COM_QUERY;
prepare(command);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]