This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 6f2bac012ad [pick](branch-2.1) pick #39398 #41754 #41770 (#42231)
6f2bac012ad is described below
commit 6f2bac012ad1e4c1419dd57b511e2d1688de7d6a
Author: Xinyi Zou <[email protected]>
AuthorDate: Tue Oct 22 18:05:40 2024 +0800
[pick](branch-2.1) pick #39398 #41754 #41770 (#42231)
pick #39398 #41754 #41770
---
be/src/util/cgroup_util.cpp | 4 +
be/src/util/cgroup_util.h | 9 +-
.../java/org/apache/doris/qe/ConnectContext.java | 21 ++--
.../arrowflight/DorisFlightSqlProducer.java | 110 ++++++++++-----------
.../arrowflight/FlightSqlConnectProcessor.java | 3 +
.../sessions/FlightSqlConnectContext.java | 3 +-
.../arrowflight/tokens/FlightTokenManagerImpl.java | 22 +++--
7 files changed, 99 insertions(+), 73 deletions(-)
diff --git a/be/src/util/cgroup_util.cpp b/be/src/util/cgroup_util.cpp
index 8109e38559f..8f64fe699c6 100644
--- a/be/src/util/cgroup_util.cpp
+++ b/be/src/util/cgroup_util.cpp
@@ -193,6 +193,7 @@ std::string CGroupUtil::cgroupv2_of_process() {
}
std::optional<std::string> CGroupUtil::get_cgroupsv2_path(const std::string&
subsystem) {
+#if defined(OS_LINUX)
if (!CGroupUtil::cgroupsv2_enable()) {
return {};
}
@@ -209,6 +210,9 @@ std::optional<std::string>
CGroupUtil::get_cgroupsv2_path(const std::string& sub
current_cgroup = current_cgroup.parent_path();
}
return {};
+#else
+ return {};
+#endif
}
Status CGroupUtil::read_int_line_from_cgroup_file(const std::filesystem::path&
file_path,
diff --git a/be/src/util/cgroup_util.h b/be/src/util/cgroup_util.h
index cf922ba5063..bc1417453f4 100644
--- a/be/src/util/cgroup_util.h
+++ b/be/src/util/cgroup_util.h
@@ -26,11 +26,9 @@
#include "common/status.h"
namespace doris {
-#if defined(OS_LINUX)
// I think it is possible to mount the cgroups hierarchy somewhere else (e.g.
when in containers).
// /sys/fs/cgroup was still symlinked to the actual mount in the cases that I
have seen.
static inline const std::filesystem::path default_cgroups_mount =
"/sys/fs/cgroup";
-#endif
/* Cgroup debugging steps
* CgroupV1:
@@ -52,7 +50,12 @@ class CGroupUtil {
public:
enum class CgroupsVersion : uint8_t { V1, V2 };
- // detect if cgroup is enabled
+ // Detect if cgroup is enabled.
+ // If true, it only means that the OS allows the use of Cgroup v1 or v2,
+ // not that the current BE process is using Cgroup.
+ // To confirm whether the process is using Cgroup need to use
`find_global_cgroupv1` or `cgroupv2_of_process`.
+ // To confirm whether the process is using a subsystem of Cgroup,
+ // need to use `find_abs_cgroupv1_path` or `get_cgroupsv2_path`.
static bool cgroupsv1_enable();
static bool cgroupsv2_enable();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index c5622d54a14..d5bf9b9ef2d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -839,6 +839,11 @@ public class ConnectContext {
return executor;
}
+ public void clear() {
+ executor = null;
+ statementContext = null;
+ }
+
public PlSqlOperation getPlSqlOperation() {
if (plSqlOperation == null) {
plSqlOperation = new PlSqlOperation();
@@ -927,7 +932,8 @@ public class ConnectContext {
// kill operation with no protect.
public void kill(boolean killConnection) {
- LOG.warn("kill query from {}, kill mysql connection: {}",
getRemoteHostPortString(), killConnection);
+ LOG.warn("kill query from {}, kill {} connection: {}",
getRemoteHostPortString(), getConnectType(),
+ killConnection);
if (killConnection) {
isKilled = true;
@@ -940,10 +946,10 @@ public class ConnectContext {
// kill operation with no protect by timeout.
private void killByTimeout(boolean killConnection) {
- LOG.warn("kill query from {}, kill mysql connection: {} reason time
out", getRemoteHostPortString(),
- killConnection);
-
if (killConnection) {
+ LOG.warn("kill wait timeout connection, connection type: {},
connectionId: {}, remote: {}, "
+ + "wait timeout: {}",
+ getConnectType(), connectionId, getRemoteHostPortString(),
sessionVariable.getWaitTimeoutS());
isKilled = true;
// Close channel to break connection with client
closeChannel();
@@ -952,6 +958,10 @@ public class ConnectContext {
// cancelQuery by time out
StmtExecutor executorRef = executor;
if (executorRef != null) {
+ LOG.warn("kill time out query, remote: {}, at the same time kill
connection is {},"
+ + " connection type: {}, connectionId: {}",
+ getRemoteHostPortString(), killConnection,
+ getConnectType(), connectionId);
executorRef.cancel(Types.PPlanFragmentCancelReason.TIMEOUT);
}
}
@@ -974,9 +984,6 @@ public class ConnectContext {
if (command == MysqlCommand.COM_SLEEP) {
if (delta > sessionVariable.getWaitTimeoutS() * 1000L) {
// Need kill this connection.
- LOG.warn("kill wait timeout connection, remote: {}, wait
timeout: {}",
- getRemoteHostPortString(),
sessionVariable.getWaitTimeoutS());
-
killFlag = true;
killConnection = true;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
index 2c7aaae4f2a..b0367e8c578 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
@@ -90,7 +90,7 @@ import java.util.concurrent.Executors;
/**
* Implementation of Arrow Flight SQL service
- *
+ * <p>
* All methods must catch all possible Exceptions, print and throw CallStatus,
* otherwise error message will be discarded.
*/
@@ -186,59 +186,61 @@ public class DorisFlightSqlProducer implements
FlightSqlProducer, AutoCloseable
Preconditions.checkState(!query.isEmpty());
// After the previous query was executed, there was no
getStreamStatement to take away the result.
connectContext.getFlightSqlChannel().reset();
- final FlightSqlConnectProcessor flightSQLConnectProcessor = new
FlightSqlConnectProcessor(connectContext);
-
- flightSQLConnectProcessor.handleQuery(query);
- if (connectContext.getState().getStateType() ==
MysqlStateType.ERR) {
- throw new RuntimeException("after executeQueryStatement
handleQuery");
- }
-
- if (connectContext.isReturnResultFromLocal()) {
- // set/use etc. stmt returns an OK result by default.
- if (connectContext.getFlightSqlChannel().resultNum() == 0) {
- // a random query id and add empty results
- String queryId = UUID.randomUUID().toString();
- connectContext.getFlightSqlChannel().addOKResult(queryId,
query);
+ try (FlightSqlConnectProcessor flightSQLConnectProcessor = new
FlightSqlConnectProcessor(connectContext)) {
+ flightSQLConnectProcessor.handleQuery(query);
+ if (connectContext.getState().getStateType() ==
MysqlStateType.ERR) {
+ throw new RuntimeException("after executeQueryStatement
handleQuery");
+ }
- final ByteString handle =
ByteString.copyFromUtf8(peerIdentity + ":" + queryId);
- TicketStatementQuery ticketStatement =
TicketStatementQuery.newBuilder().setStatementHandle(handle)
- .build();
- return getFlightInfoForSchema(ticketStatement, descriptor,
-
connectContext.getFlightSqlChannel().getResult(queryId).getVectorSchemaRoot().getSchema());
+ if (connectContext.isReturnResultFromLocal()) {
+ // set/use etc. stmt returns an OK result by default.
+ if (connectContext.getFlightSqlChannel().resultNum() == 0)
{
+ // a random query id and add empty results
+ String queryId = UUID.randomUUID().toString();
+
connectContext.getFlightSqlChannel().addOKResult(queryId, query);
+
+ final ByteString handle =
ByteString.copyFromUtf8(peerIdentity + ":" + queryId);
+ TicketStatementQuery ticketStatement =
TicketStatementQuery.newBuilder()
+ .setStatementHandle(handle).build();
+ return getFlightInfoForSchema(ticketStatement,
descriptor,
+
connectContext.getFlightSqlChannel().getResult(queryId).getVectorSchemaRoot()
+ .getSchema());
+ } else {
+ // A Flight Sql request can only contain one statement
that returns result,
+ // otherwise expected thrown exception during
execution.
+
Preconditions.checkState(connectContext.getFlightSqlChannel().resultNum() == 1);
+
+ // The tokens used for authentication between
getStreamStatement and getFlightInfoStatement
+ // are different. So put the peerIdentity into the
ticket and then getStreamStatement is used to
+ // find the correct ConnectContext.
+ // queryId is used to find query results.
+ final ByteString handle = ByteString.copyFromUtf8(
+ peerIdentity + ":" +
DebugUtil.printId(connectContext.queryId()));
+ TicketStatementQuery ticketStatement =
TicketStatementQuery.newBuilder()
+ .setStatementHandle(handle).build();
+ return getFlightInfoForSchema(ticketStatement,
descriptor, connectContext.getFlightSqlChannel()
+
.getResult(DebugUtil.printId(connectContext.queryId())).getVectorSchemaRoot()
+ .getSchema());
+ }
} else {
- // A Flight Sql request can only contain one statement
that returns result,
- // otherwise expected thrown exception during execution.
-
Preconditions.checkState(connectContext.getFlightSqlChannel().resultNum() == 1);
-
- // The tokens used for authentication between
getStreamStatement and getFlightInfoStatement
- // are different. So put the peerIdentity into the ticket
and then getStreamStatement is used to
- // find the correct ConnectContext.
- // queryId is used to find query results.
+ // Now only query stmt will pull results from BE.
final ByteString handle = ByteString.copyFromUtf8(
- peerIdentity + ":" +
DebugUtil.printId(connectContext.queryId()));
+ DebugUtil.printId(connectContext.getFinstId()) +
":" + query);
+ Schema schema =
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
+ if (schema == null) {
+ throw CallStatus.INTERNAL.withDescription("fetch arrow
flight schema is null")
+ .toRuntimeException();
+ }
TicketStatementQuery ticketStatement =
TicketStatementQuery.newBuilder().setStatementHandle(handle)
.build();
- return getFlightInfoForSchema(ticketStatement, descriptor,
-
connectContext.getFlightSqlChannel().getResult(DebugUtil.printId(connectContext.queryId()))
- .getVectorSchemaRoot().getSchema());
- }
- } else {
- // Now only query stmt will pull results from BE.
- final ByteString handle = ByteString.copyFromUtf8(
- DebugUtil.printId(connectContext.getFinstId()) + ":" +
query);
- Schema schema =
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
- if (schema == null) {
- throw CallStatus.INTERNAL.withDescription("fetch arrow
flight schema is null").toRuntimeException();
+ Ticket ticket = new
Ticket(Any.pack(ticketStatement).toByteArray());
+ // TODO Support multiple endpoints.
+ Location location =
Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
+ connectContext.getResultFlightServerAddr().port);
+ List<FlightEndpoint> endpoints =
Collections.singletonList(new FlightEndpoint(ticket, location));
+ // TODO Set in BE callback after query end, Client will
not callback.
+ return new FlightInfo(schema, descriptor, endpoints, -1,
-1);
}
- TicketStatementQuery ticketStatement =
TicketStatementQuery.newBuilder().setStatementHandle(handle)
- .build();
- Ticket ticket = new
Ticket(Any.pack(ticketStatement).toByteArray());
- // TODO Support multiple endpoints.
- Location location =
Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
- connectContext.getResultFlightServerAddr().port);
- List<FlightEndpoint> endpoints = Collections.singletonList(new
FlightEndpoint(ticket, location));
- // TODO Set in BE callback after query end, Client will not
callback.
- return new FlightInfo(schema, descriptor, endpoints, -1, -1);
}
} catch (Exception e) {
String errMsg = "get flight info statement failed, " +
e.getMessage() + ", " + Util.getRootCauseMessage(e)
@@ -291,8 +293,7 @@ public class DorisFlightSqlProducer implements
FlightSqlProducer, AutoCloseable
final ByteString bytes = Objects.isNull(parameterSchema) ?
ByteString.EMPTY
: ByteString.copyFrom(serializeMetadata(parameterSchema));
return ActionCreatePreparedStatementResult.newBuilder()
-
.setDatasetSchema(ByteString.copyFrom(serializeMetadata(metaData)))
- .setParameterSchema(bytes)
+
.setDatasetSchema(ByteString.copyFrom(serializeMetadata(metaData))).setParameterSchema(bytes)
.setPreparedStatementHandle(handle).build();
}
@@ -321,12 +322,11 @@ public class DorisFlightSqlProducer implements
FlightSqlProducer, AutoCloseable
Schema metaData = connectContext.getFlightSqlChannel()
.createOneOneSchemaRoot("ResultMeta",
"UNIMPLEMENTED").getSchema();
listener.onNext(new Result(
- Any.pack(buildCreatePreparedStatementResult(handle,
parameterSchema, metaData))
- .toByteArray()));
+ Any.pack(buildCreatePreparedStatementResult(handle,
parameterSchema, metaData)).toByteArray()));
} catch (Exception e) {
- String errMsg = "create prepared statement failed, " +
e.getMessage() + ", "
- + Util.getRootCauseMessage(e) + ", error code: " +
connectContext.getState().getErrorCode()
- + ", error msg: " +
connectContext.getState().getErrorMessage();
+ String errMsg = "create prepared statement failed, " +
e.getMessage() + ", " + Util.getRootCauseMessage(
+ e) + ", error code: " +
connectContext.getState().getErrorCode() + ", error msg: "
+ + connectContext.getState().getErrorMessage();
LOG.warn(errMsg, e);
listener.onError(CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException());
return;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
index a4aa5a88c8f..a816cf184ca 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
@@ -53,6 +53,8 @@ import java.util.concurrent.TimeoutException;
/**
* Process one flgiht sql connection.
+ *
+ * Must use try-with-resources.
*/
public class FlightSqlConnectProcessor extends ConnectProcessor implements
AutoCloseable {
private static final Logger LOG =
LogManager.getLogger(FlightSqlConnectProcessor.class);
@@ -171,6 +173,7 @@ public class FlightSqlConnectProcessor extends
ConnectProcessor implements AutoC
@Override
public void close() throws Exception {
ctx.setCommand(MysqlCommand.COM_SLEEP);
+ ctx.clear();
// TODO support query profile
for (StmtExecutor asynExecutor : returnResultFromRemoteExecutor) {
asynExecutor.finalizeQuery();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
index 406497c77db..50b34845336 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
@@ -61,6 +61,7 @@ public class FlightSqlConnectContext extends ConnectContext {
if (flightSqlChannel != null) {
flightSqlChannel.close();
}
+ connectScheduler.unregisterConnection(this);
}
// kill operation with no protect.
@@ -70,8 +71,8 @@ public class FlightSqlConnectContext extends ConnectContext {
if (killConnection) {
isKilled = true;
+ // Close channel and break connection with client.
closeChannel();
- connectScheduler.unregisterConnection(this);
}
// Now, cancel running query.
cancelQuery("arrow flight query killed by user");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
index 57101d995e0..bd3e85cd0c2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
@@ -20,6 +20,7 @@
package org.apache.doris.service.arrowflight.tokens;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.arrowflight.auth2.FlightAuthResult;
@@ -38,6 +39,8 @@ import java.math.BigInteger;
import java.security.SecureRandom;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
@@ -53,6 +56,7 @@ public class FlightTokenManagerImpl implements
FlightTokenManager {
private final LoadingCache<String, FlightTokenDetails> tokenCache;
// <username, <token, 1>>
private final ConcurrentHashMap<String, LoadingCache<String, Integer>>
usersTokenLRU = new ConcurrentHashMap<>();
+ private ScheduledExecutorService cleanupExecutor;
public FlightTokenManagerImpl(final int cacheSize, final int
cacheExpiration) {
this.cacheSize = cacheSize;
@@ -66,13 +70,15 @@ public class FlightTokenManagerImpl implements
FlightTokenManager {
// TODO: broadcast this message to other FE
String token = notification.getKey();
FlightTokenDetails tokenDetails =
notification.getValue();
- LOG.info("evict bearer token: " + token + ", reason:
token number exceeded, "
- + notification.getCause());
- ConnectContext context =
ExecuteEnv.getInstance().getScheduler()
- .getContext(token);
+ ConnectContext context =
ExecuteEnv.getInstance().getScheduler().getContext(token);
if (context != null) {
ExecuteEnv.getInstance().getScheduler().unregisterConnection(context);
- LOG.info("unregister flight connect context after
evict bearer token: " + token);
+ LOG.info("evict bearer token: " + token + " from
tokenCache, reason: "
+ + notification.getCause()
+ + ", and unregister flight connection
context after evict bearer token");
+ } else {
+ LOG.info("evict bearer token: " + token + " from
tokenCache, reason: "
+ + notification.getCause() + ", and flight
connection context not exist");
}
usersTokenLRU.get(tokenDetails.getUsername()).invalidate(token);
}
@@ -82,6 +88,8 @@ public class FlightTokenManagerImpl implements
FlightTokenManager {
return new FlightTokenDetails();
}
});
+ this.cleanupExecutor = Executors.newScheduledThreadPool(1, new
CustomThreadFactory("flight-token-cleanup"));
+ this.cleanupExecutor.scheduleAtFixedRate(() ->
this.tokenCache.cleanUp(), 1, 1, TimeUnit.MINUTES);
}
// From
https://stackoverflow.com/questions/41107/how-to-generate-a-random-alpha-numeric-string
@@ -114,9 +122,9 @@ public class FlightTokenManagerImpl implements
FlightTokenManager {
public void onRemoval(@NotNull
RemovalNotification<String, Integer> notification) {
// TODO: broadcast this message to other FE
assert notification.getKey() != null;
-
tokenCache.invalidate(notification.getKey());
LOG.info("evict bearer token: " +
notification.getKey()
- + ", reason: user connection
exceeded, " + notification.getCause());
+ + " from usersTokenLRU, reason: "
+ notification.getCause());
+
tokenCache.invalidate(notification.getKey());
}
}).build(new CacheLoader<String, Integer>() {
@NotNull
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]