This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2973d0f9d6c [fix](arrow-flight-sql) Fix kill timeout
FlightSqlConnection and FlightSqlConnectProcessor close (#41770)
2973d0f9d6c is described below
commit 2973d0f9d6c93dfe1636688c2e272c5dda1fd1af
Author: Xinyi Zou <[email protected]>
AuthorDate: Wed Oct 16 17:07:34 2024 +0800
[fix](arrow-flight-sql) Fix kill timeout FlightSqlConnection and
FlightSqlConnectProcessor close (#41770)
1. Doris will cancel the connection that has not responded for a long
time, Mysql Conenction will exit directly, but Arrow Flight Conenction
does not exit immediately, may be frequently cancel and print logs.
timeout is `wait_timeout` in session veriable.
2. FlightSqlConnectProcessor Use try-with-resources to close correctly.
---
.../java/org/apache/doris/qe/ConnectContext.java | 16 +--
.../arrowflight/DorisFlightSqlProducer.java | 120 ++++++++++-----------
.../arrowflight/FlightSqlConnectProcessor.java | 3 +
.../sessions/FlightSqlConnectContext.java | 3 +-
4 files changed, 74 insertions(+), 68 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 783844f12f5..2493b8e6203 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -951,7 +951,8 @@ public class ConnectContext {
// kill operation with no protect.
public void kill(boolean killConnection) {
- LOG.warn("kill query from {}, kill mysql connection: {}",
getRemoteHostPortString(), killConnection);
+ LOG.warn("kill query from {}, kill {} connection: {}",
getRemoteHostPortString(), getConnectType(),
+ killConnection);
if (killConnection) {
isKilled = true;
@@ -964,10 +965,10 @@ public class ConnectContext {
// kill operation with no protect by timeout.
private void killByTimeout(boolean killConnection) {
- LOG.warn("kill query from {}, kill mysql connection: {} reason time
out", getRemoteHostPortString(),
- killConnection);
-
if (killConnection) {
+ LOG.warn("kill wait timeout connection, connection type: {},
connectionId: {}, remote: {}, "
+ + "wait timeout: {}",
+ getConnectType(), connectionId, getRemoteHostPortString(),
sessionVariable.getWaitTimeoutS());
isKilled = true;
// Close channel to break connection with client
closeChannel();
@@ -976,6 +977,10 @@ public class ConnectContext {
// cancelQuery by time out
StmtExecutor executorRef = executor;
if (executorRef != null) {
+ LOG.warn("kill time out query, remote: {}, at the same time kill
connection is {},"
+ + " connection type: {}, connectionId: {}",
+ getRemoteHostPortString(), killConnection,
+ getConnectType(), connectionId);
executorRef.cancel(new Status(TStatusCode.TIMEOUT,
"query is timeout, killed by timeout checker"));
}
@@ -999,9 +1004,6 @@ public class ConnectContext {
if (command == MysqlCommand.COM_SLEEP) {
if (delta > sessionVariable.getWaitTimeoutS() * 1000L) {
// Need kill this connection.
- LOG.warn("kill wait timeout connection, remote: {}, wait
timeout: {}",
- getRemoteHostPortString(),
sessionVariable.getWaitTimeoutS());
-
killFlag = true;
killConnection = true;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
index 16195469af9..5d431b386b7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
@@ -186,64 +186,66 @@ public class DorisFlightSqlProducer implements
FlightSqlProducer, AutoCloseable
Preconditions.checkState(!query.isEmpty());
// After the previous query was executed, there was no
getStreamStatement to take away the result.
connectContext.getFlightSqlChannel().reset();
- final FlightSqlConnectProcessor flightSQLConnectProcessor = new
FlightSqlConnectProcessor(connectContext);
-
- flightSQLConnectProcessor.handleQuery(query);
- if (connectContext.getState().getStateType() ==
MysqlStateType.ERR) {
- throw new RuntimeException("after executeQueryStatement
handleQuery");
- }
-
- if (connectContext.isReturnResultFromLocal()) {
- // set/use etc. stmt returns an OK result by default.
- if (connectContext.getFlightSqlChannel().resultNum() == 0) {
- // a random query id and add empty results
- String queryId = UUID.randomUUID().toString();
- connectContext.getFlightSqlChannel().addOKResult(queryId,
query);
+ try (FlightSqlConnectProcessor flightSQLConnectProcessor = new
FlightSqlConnectProcessor(connectContext)) {
+ flightSQLConnectProcessor.handleQuery(query);
+ if (connectContext.getState().getStateType() ==
MysqlStateType.ERR) {
+ throw new RuntimeException("after executeQueryStatement
handleQuery");
+ }
- final ByteString handle =
ByteString.copyFromUtf8(peerIdentity + ":" + queryId);
- TicketStatementQuery ticketStatement =
TicketStatementQuery.newBuilder().setStatementHandle(handle)
- .build();
- return getFlightInfoForSchema(ticketStatement, descriptor,
-
connectContext.getFlightSqlChannel().getResult(queryId).getVectorSchemaRoot().getSchema());
+ if (connectContext.isReturnResultFromLocal()) {
+ // set/use etc. stmt returns an OK result by default.
+ if (connectContext.getFlightSqlChannel().resultNum() == 0)
{
+ // a random query id and add empty results
+ String queryId = UUID.randomUUID().toString();
+
connectContext.getFlightSqlChannel().addOKResult(queryId, query);
+
+ final ByteString handle =
ByteString.copyFromUtf8(peerIdentity + ":" + queryId);
+ TicketStatementQuery ticketStatement =
TicketStatementQuery.newBuilder()
+ .setStatementHandle(handle).build();
+ return getFlightInfoForSchema(ticketStatement,
descriptor,
+
connectContext.getFlightSqlChannel().getResult(queryId).getVectorSchemaRoot()
+ .getSchema());
+ } else {
+ // A Flight Sql request can only contain one statement
that returns result,
+ // otherwise expected thrown exception during
execution.
+
Preconditions.checkState(connectContext.getFlightSqlChannel().resultNum() == 1);
+
+ // The tokens used for authentication between
getStreamStatement and getFlightInfoStatement
+ // are different. So put the peerIdentity into the
ticket and then getStreamStatement is used to
+ // find the correct ConnectContext.
+ // queryId is used to find query results.
+ final ByteString handle = ByteString.copyFromUtf8(
+ peerIdentity + ":" +
DebugUtil.printId(connectContext.queryId()));
+ TicketStatementQuery ticketStatement =
TicketStatementQuery.newBuilder()
+ .setStatementHandle(handle).build();
+ return getFlightInfoForSchema(ticketStatement,
descriptor, connectContext.getFlightSqlChannel()
+
.getResult(DebugUtil.printId(connectContext.queryId())).getVectorSchemaRoot()
+ .getSchema());
+ }
} else {
- // A Flight Sql request can only contain one statement
that returns result,
- // otherwise expected thrown exception during execution.
-
Preconditions.checkState(connectContext.getFlightSqlChannel().resultNum() == 1);
-
- // The tokens used for authentication between
getStreamStatement and getFlightInfoStatement
- // are different. So put the peerIdentity into the ticket
and then getStreamStatement is used to
- // find the correct ConnectContext.
- // queryId is used to find query results.
- final ByteString handle = ByteString.copyFromUtf8(
- peerIdentity + ":" +
DebugUtil.printId(connectContext.queryId()));
+ // Now only query stmt will pull results from BE.
+ final ByteString handle;
+ if
(connectContext.getSessionVariable().enableParallelResultSink()) {
+ handle =
ByteString.copyFromUtf8(DebugUtil.printId(connectContext.queryId()) + ":" +
query);
+ } else {
+ // only one instance
+ handle =
ByteString.copyFromUtf8(DebugUtil.printId(connectContext.getFinstId()) + ":" +
query);
+ }
+ Schema schema =
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
+ if (schema == null) {
+ throw CallStatus.INTERNAL.withDescription("fetch arrow
flight schema is null")
+ .toRuntimeException();
+ }
TicketStatementQuery ticketStatement =
TicketStatementQuery.newBuilder().setStatementHandle(handle)
.build();
- return getFlightInfoForSchema(ticketStatement, descriptor,
-
connectContext.getFlightSqlChannel().getResult(DebugUtil.printId(connectContext.queryId()))
- .getVectorSchemaRoot().getSchema());
- }
- } else {
- // Now only query stmt will pull results from BE.
- final ByteString handle;
- if
(connectContext.getSessionVariable().enableParallelResultSink()) {
- handle =
ByteString.copyFromUtf8(DebugUtil.printId(connectContext.queryId()) + ":" +
query);
- } else {
- // only one instance
- handle =
ByteString.copyFromUtf8(DebugUtil.printId(connectContext.getFinstId()) + ":" +
query);
- }
- Schema schema =
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
- if (schema == null) {
- throw CallStatus.INTERNAL.withDescription("fetch arrow
flight schema is null").toRuntimeException();
+ Ticket ticket = new
Ticket(Any.pack(ticketStatement).toByteArray());
+ // TODO Support multiple endpoints.
+ Location location =
Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
+ connectContext.getResultFlightServerAddr().port);
+ List<FlightEndpoint> endpoints =
Collections.singletonList(new FlightEndpoint(ticket, location));
+ // TODO Set in BE callback after query end, Client will
not callback.
+ return new FlightInfo(schema, descriptor, endpoints, -1,
-1);
}
- TicketStatementQuery ticketStatement =
TicketStatementQuery.newBuilder().setStatementHandle(handle)
- .build();
- Ticket ticket = new
Ticket(Any.pack(ticketStatement).toByteArray());
- // TODO Support multiple endpoints.
- Location location =
Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
- connectContext.getResultFlightServerAddr().port);
- List<FlightEndpoint> endpoints = Collections.singletonList(new
FlightEndpoint(ticket, location));
- // TODO Set in BE callback after query end, Client will not
callback.
- return new FlightInfo(schema, descriptor, endpoints, -1, -1);
}
} catch (Exception e) {
String errMsg = "get flight info statement failed, " +
e.getMessage() + ", " + Util.getRootCauseMessage(e)
@@ -296,8 +298,7 @@ public class DorisFlightSqlProducer implements
FlightSqlProducer, AutoCloseable
final ByteString bytes = Objects.isNull(parameterSchema) ?
ByteString.EMPTY
: ByteString.copyFrom(serializeMetadata(parameterSchema));
return ActionCreatePreparedStatementResult.newBuilder()
-
.setDatasetSchema(ByteString.copyFrom(serializeMetadata(metaData)))
- .setParameterSchema(bytes)
+
.setDatasetSchema(ByteString.copyFrom(serializeMetadata(metaData))).setParameterSchema(bytes)
.setPreparedStatementHandle(handle).build();
}
@@ -326,12 +327,11 @@ public class DorisFlightSqlProducer implements
FlightSqlProducer, AutoCloseable
Schema metaData = connectContext.getFlightSqlChannel()
.createOneOneSchemaRoot("ResultMeta",
"UNIMPLEMENTED").getSchema();
listener.onNext(new Result(
- Any.pack(buildCreatePreparedStatementResult(handle,
parameterSchema, metaData))
- .toByteArray()));
+ Any.pack(buildCreatePreparedStatementResult(handle,
parameterSchema, metaData)).toByteArray()));
} catch (Exception e) {
- String errMsg = "create prepared statement failed, " +
e.getMessage() + ", "
- + Util.getRootCauseMessage(e) + ", error code: " +
connectContext.getState().getErrorCode()
- + ", error msg: " +
connectContext.getState().getErrorMessage();
+ String errMsg = "create prepared statement failed, " +
e.getMessage() + ", " + Util.getRootCauseMessage(
+ e) + ", error code: " +
connectContext.getState().getErrorCode() + ", error msg: "
+ + connectContext.getState().getErrorMessage();
LOG.warn(errMsg, e);
listener.onError(CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException());
return;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
index fe0648a0680..b812bf81d8a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
@@ -53,6 +53,8 @@ import java.util.concurrent.TimeoutException;
/**
* Process one flgiht sql connection.
+ *
+ * Must use try-with-resources.
*/
public class FlightSqlConnectProcessor extends ConnectProcessor implements
AutoCloseable {
private static final Logger LOG =
LogManager.getLogger(FlightSqlConnectProcessor.class);
@@ -177,6 +179,7 @@ public class FlightSqlConnectProcessor extends
ConnectProcessor implements AutoC
@Override
public void close() throws Exception {
ctx.setCommand(MysqlCommand.COM_SLEEP);
+ ctx.clear();
// TODO support query profile
for (StmtExecutor asynExecutor : returnResultFromRemoteExecutor) {
asynExecutor.finalizeQuery();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
index 4badae03b31..b90d7505923 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
@@ -63,6 +63,7 @@ public class FlightSqlConnectContext extends ConnectContext {
if (flightSqlChannel != null) {
flightSqlChannel.close();
}
+ connectScheduler.unregisterConnection(this);
}
// kill operation with no protect.
@@ -72,8 +73,8 @@ public class FlightSqlConnectContext extends ConnectContext {
if (killConnection) {
isKilled = true;
+ // Close channel and break connection with client.
closeChannel();
- connectScheduler.unregisterConnection(this);
}
// Now, cancel running query.
cancelQuery(new Status(TStatusCode.CANCELLED, "arrow flight query
killed by user"));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]