This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 6f2bac012ad [pick](branch-2.1) pick #39398 #41754 #41770 (#42231)
6f2bac012ad is described below

commit 6f2bac012ad1e4c1419dd57b511e2d1688de7d6a
Author: Xinyi Zou <[email protected]>
AuthorDate: Tue Oct 22 18:05:40 2024 +0800

    [pick](branch-2.1) pick #39398 #41754 #41770 (#42231)
    
    pick #39398 #41754 #41770
---
 be/src/util/cgroup_util.cpp                        |   4 +
 be/src/util/cgroup_util.h                          |   9 +-
 .../java/org/apache/doris/qe/ConnectContext.java   |  21 ++--
 .../arrowflight/DorisFlightSqlProducer.java        | 110 ++++++++++-----------
 .../arrowflight/FlightSqlConnectProcessor.java     |   3 +
 .../sessions/FlightSqlConnectContext.java          |   3 +-
 .../arrowflight/tokens/FlightTokenManagerImpl.java |  22 +++--
 7 files changed, 99 insertions(+), 73 deletions(-)

diff --git a/be/src/util/cgroup_util.cpp b/be/src/util/cgroup_util.cpp
index 8109e38559f..8f64fe699c6 100644
--- a/be/src/util/cgroup_util.cpp
+++ b/be/src/util/cgroup_util.cpp
@@ -193,6 +193,7 @@ std::string CGroupUtil::cgroupv2_of_process() {
 }
 
 std::optional<std::string> CGroupUtil::get_cgroupsv2_path(const std::string& 
subsystem) {
+#if defined(OS_LINUX)
     if (!CGroupUtil::cgroupsv2_enable()) {
         return {};
     }
@@ -209,6 +210,9 @@ std::optional<std::string> 
CGroupUtil::get_cgroupsv2_path(const std::string& sub
         current_cgroup = current_cgroup.parent_path();
     }
     return {};
+#else
+    return {};
+#endif
 }
 
 Status CGroupUtil::read_int_line_from_cgroup_file(const std::filesystem::path& 
file_path,
diff --git a/be/src/util/cgroup_util.h b/be/src/util/cgroup_util.h
index cf922ba5063..bc1417453f4 100644
--- a/be/src/util/cgroup_util.h
+++ b/be/src/util/cgroup_util.h
@@ -26,11 +26,9 @@
 #include "common/status.h"
 namespace doris {
 
-#if defined(OS_LINUX)
 // I think it is possible to mount the cgroups hierarchy somewhere else (e.g. 
when in containers).
 // /sys/fs/cgroup was still symlinked to the actual mount in the cases that I 
have seen.
 static inline const std::filesystem::path default_cgroups_mount = 
"/sys/fs/cgroup";
-#endif
 
 /* Cgroup debugging steps
  * CgroupV1:
@@ -52,7 +50,12 @@ class CGroupUtil {
 public:
     enum class CgroupsVersion : uint8_t { V1, V2 };
 
-    // detect if cgroup is enabled
+    // Detect if cgroup is enabled.
+    // If true, it only means that the OS allows the use of Cgroup v1 or v2,
+    // not that the current BE process is using Cgroup.
+    // To confirm whether the process is using Cgroup need to use 
`find_global_cgroupv1` or `cgroupv2_of_process`.
+    // To confirm whether the process is using a subsystem of Cgroup,
+    // need to use `find_abs_cgroupv1_path` or `get_cgroupsv2_path`.
     static bool cgroupsv1_enable();
     static bool cgroupsv2_enable();
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index c5622d54a14..d5bf9b9ef2d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -839,6 +839,11 @@ public class ConnectContext {
         return executor;
     }
 
+    public void clear() {
+        executor = null;
+        statementContext = null;
+    }
+
     public PlSqlOperation getPlSqlOperation() {
         if (plSqlOperation == null) {
             plSqlOperation = new PlSqlOperation();
@@ -927,7 +932,8 @@ public class ConnectContext {
 
     // kill operation with no protect.
     public void kill(boolean killConnection) {
-        LOG.warn("kill query from {}, kill mysql connection: {}", 
getRemoteHostPortString(), killConnection);
+        LOG.warn("kill query from {}, kill {} connection: {}", 
getRemoteHostPortString(), getConnectType(),
+                killConnection);
 
         if (killConnection) {
             isKilled = true;
@@ -940,10 +946,10 @@ public class ConnectContext {
 
     // kill operation with no protect by timeout.
     private void killByTimeout(boolean killConnection) {
-        LOG.warn("kill query from {}, kill mysql connection: {} reason time 
out", getRemoteHostPortString(),
-                killConnection);
-
         if (killConnection) {
+            LOG.warn("kill wait timeout connection, connection type: {}, 
connectionId: {}, remote: {}, "
+                            + "wait timeout: {}",
+                    getConnectType(), connectionId, getRemoteHostPortString(), 
sessionVariable.getWaitTimeoutS());
             isKilled = true;
             // Close channel to break connection with client
             closeChannel();
@@ -952,6 +958,10 @@ public class ConnectContext {
         // cancelQuery by time out
         StmtExecutor executorRef = executor;
         if (executorRef != null) {
+            LOG.warn("kill time out query, remote: {}, at the same time kill 
connection is {},"
+                            + " connection type: {}, connectionId: {}",
+                    getRemoteHostPortString(), killConnection,
+                    getConnectType(), connectionId);
             executorRef.cancel(Types.PPlanFragmentCancelReason.TIMEOUT);
         }
     }
@@ -974,9 +984,6 @@ public class ConnectContext {
         if (command == MysqlCommand.COM_SLEEP) {
             if (delta > sessionVariable.getWaitTimeoutS() * 1000L) {
                 // Need kill this connection.
-                LOG.warn("kill wait timeout connection, remote: {}, wait 
timeout: {}",
-                        getRemoteHostPortString(), 
sessionVariable.getWaitTimeoutS());
-
                 killFlag = true;
                 killConnection = true;
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
index 2c7aaae4f2a..b0367e8c578 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
@@ -90,7 +90,7 @@ import java.util.concurrent.Executors;
 
 /**
  * Implementation of Arrow Flight SQL service
- *
+ * <p>
  * All methods must catch all possible Exceptions, print and throw CallStatus,
  * otherwise error message will be discarded.
  */
@@ -186,59 +186,61 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
             Preconditions.checkState(!query.isEmpty());
             // After the previous query was executed, there was no 
getStreamStatement to take away the result.
             connectContext.getFlightSqlChannel().reset();
-            final FlightSqlConnectProcessor flightSQLConnectProcessor = new 
FlightSqlConnectProcessor(connectContext);
-
-            flightSQLConnectProcessor.handleQuery(query);
-            if (connectContext.getState().getStateType() == 
MysqlStateType.ERR) {
-                throw new RuntimeException("after executeQueryStatement 
handleQuery");
-            }
-
-            if (connectContext.isReturnResultFromLocal()) {
-                // set/use etc. stmt returns an OK result by default.
-                if (connectContext.getFlightSqlChannel().resultNum() == 0) {
-                    // a random query id and add empty results
-                    String queryId = UUID.randomUUID().toString();
-                    connectContext.getFlightSqlChannel().addOKResult(queryId, 
query);
+            try (FlightSqlConnectProcessor flightSQLConnectProcessor = new 
FlightSqlConnectProcessor(connectContext)) {
+                flightSQLConnectProcessor.handleQuery(query);
+                if (connectContext.getState().getStateType() == 
MysqlStateType.ERR) {
+                    throw new RuntimeException("after executeQueryStatement 
handleQuery");
+                }
 
-                    final ByteString handle = 
ByteString.copyFromUtf8(peerIdentity + ":" + queryId);
-                    TicketStatementQuery ticketStatement = 
TicketStatementQuery.newBuilder().setStatementHandle(handle)
-                            .build();
-                    return getFlightInfoForSchema(ticketStatement, descriptor,
-                            
connectContext.getFlightSqlChannel().getResult(queryId).getVectorSchemaRoot().getSchema());
+                if (connectContext.isReturnResultFromLocal()) {
+                    // set/use etc. stmt returns an OK result by default.
+                    if (connectContext.getFlightSqlChannel().resultNum() == 0) 
{
+                        // a random query id and add empty results
+                        String queryId = UUID.randomUUID().toString();
+                        
connectContext.getFlightSqlChannel().addOKResult(queryId, query);
+
+                        final ByteString handle = 
ByteString.copyFromUtf8(peerIdentity + ":" + queryId);
+                        TicketStatementQuery ticketStatement = 
TicketStatementQuery.newBuilder()
+                                .setStatementHandle(handle).build();
+                        return getFlightInfoForSchema(ticketStatement, 
descriptor,
+                                
connectContext.getFlightSqlChannel().getResult(queryId).getVectorSchemaRoot()
+                                        .getSchema());
+                    } else {
+                        // A Flight Sql request can only contain one statement 
that returns result,
+                        // otherwise expected thrown exception during 
execution.
+                        
Preconditions.checkState(connectContext.getFlightSqlChannel().resultNum() == 1);
+
+                        // The tokens used for authentication between 
getStreamStatement and getFlightInfoStatement
+                        // are different. So put the peerIdentity into the 
ticket and then getStreamStatement is used to
+                        // find the correct ConnectContext.
+                        // queryId is used to find query results.
+                        final ByteString handle = ByteString.copyFromUtf8(
+                                peerIdentity + ":" + 
DebugUtil.printId(connectContext.queryId()));
+                        TicketStatementQuery ticketStatement = 
TicketStatementQuery.newBuilder()
+                                .setStatementHandle(handle).build();
+                        return getFlightInfoForSchema(ticketStatement, 
descriptor, connectContext.getFlightSqlChannel()
+                                
.getResult(DebugUtil.printId(connectContext.queryId())).getVectorSchemaRoot()
+                                .getSchema());
+                    }
                 } else {
-                    // A Flight Sql request can only contain one statement 
that returns result,
-                    // otherwise expected thrown exception during execution.
-                    
Preconditions.checkState(connectContext.getFlightSqlChannel().resultNum() == 1);
-
-                    // The tokens used for authentication between 
getStreamStatement and getFlightInfoStatement
-                    // are different. So put the peerIdentity into the ticket 
and then getStreamStatement is used to
-                    // find the correct ConnectContext.
-                    // queryId is used to find query results.
+                    // Now only query stmt will pull results from BE.
                     final ByteString handle = ByteString.copyFromUtf8(
-                            peerIdentity + ":" + 
DebugUtil.printId(connectContext.queryId()));
+                            DebugUtil.printId(connectContext.getFinstId()) + 
":" + query);
+                    Schema schema = 
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
+                    if (schema == null) {
+                        throw CallStatus.INTERNAL.withDescription("fetch arrow 
flight schema is null")
+                                .toRuntimeException();
+                    }
                     TicketStatementQuery ticketStatement = 
TicketStatementQuery.newBuilder().setStatementHandle(handle)
                             .build();
-                    return getFlightInfoForSchema(ticketStatement, descriptor,
-                            
connectContext.getFlightSqlChannel().getResult(DebugUtil.printId(connectContext.queryId()))
-                                    .getVectorSchemaRoot().getSchema());
-                }
-            } else {
-                // Now only query stmt will pull results from BE.
-                final ByteString handle = ByteString.copyFromUtf8(
-                        DebugUtil.printId(connectContext.getFinstId()) + ":" + 
query);
-                Schema schema = 
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
-                if (schema == null) {
-                    throw CallStatus.INTERNAL.withDescription("fetch arrow 
flight schema is null").toRuntimeException();
+                    Ticket ticket = new 
Ticket(Any.pack(ticketStatement).toByteArray());
+                    // TODO Support multiple endpoints.
+                    Location location = 
Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
+                            connectContext.getResultFlightServerAddr().port);
+                    List<FlightEndpoint> endpoints = 
Collections.singletonList(new FlightEndpoint(ticket, location));
+                    // TODO Set in BE callback after query end, Client will 
not callback.
+                    return new FlightInfo(schema, descriptor, endpoints, -1, 
-1);
                 }
-                TicketStatementQuery ticketStatement = 
TicketStatementQuery.newBuilder().setStatementHandle(handle)
-                        .build();
-                Ticket ticket = new 
Ticket(Any.pack(ticketStatement).toByteArray());
-                // TODO Support multiple endpoints.
-                Location location = 
Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
-                        connectContext.getResultFlightServerAddr().port);
-                List<FlightEndpoint> endpoints = Collections.singletonList(new 
FlightEndpoint(ticket, location));
-                // TODO Set in BE callback after query end, Client will not 
callback.
-                return new FlightInfo(schema, descriptor, endpoints, -1, -1);
             }
         } catch (Exception e) {
             String errMsg = "get flight info statement failed, " + 
e.getMessage() + ", " + Util.getRootCauseMessage(e)
@@ -291,8 +293,7 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
         final ByteString bytes = Objects.isNull(parameterSchema) ? 
ByteString.EMPTY
                 : ByteString.copyFrom(serializeMetadata(parameterSchema));
         return ActionCreatePreparedStatementResult.newBuilder()
-                
.setDatasetSchema(ByteString.copyFrom(serializeMetadata(metaData)))
-                .setParameterSchema(bytes)
+                
.setDatasetSchema(ByteString.copyFrom(serializeMetadata(metaData))).setParameterSchema(bytes)
                 .setPreparedStatementHandle(handle).build();
     }
 
@@ -321,12 +322,11 @@ public class DorisFlightSqlProducer implements 
FlightSqlProducer, AutoCloseable
                 Schema metaData = connectContext.getFlightSqlChannel()
                         .createOneOneSchemaRoot("ResultMeta", 
"UNIMPLEMENTED").getSchema();
                 listener.onNext(new Result(
-                        Any.pack(buildCreatePreparedStatementResult(handle, 
parameterSchema, metaData))
-                                .toByteArray()));
+                        Any.pack(buildCreatePreparedStatementResult(handle, 
parameterSchema, metaData)).toByteArray()));
             } catch (Exception e) {
-                String errMsg = "create prepared statement failed, " + 
e.getMessage() + ", "
-                        + Util.getRootCauseMessage(e) + ", error code: " + 
connectContext.getState().getErrorCode()
-                        + ", error msg: " + 
connectContext.getState().getErrorMessage();
+                String errMsg = "create prepared statement failed, " + 
e.getMessage() + ", " + Util.getRootCauseMessage(
+                        e) + ", error code: " + 
connectContext.getState().getErrorCode() + ", error msg: "
+                        + connectContext.getState().getErrorMessage();
                 LOG.warn(errMsg, e);
                 
listener.onError(CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException());
                 return;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
index a4aa5a88c8f..a816cf184ca 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
@@ -53,6 +53,8 @@ import java.util.concurrent.TimeoutException;
 
 /**
  * Process one flgiht sql connection.
+ *
+ * Must use try-with-resources.
  */
 public class FlightSqlConnectProcessor extends ConnectProcessor implements 
AutoCloseable {
     private static final Logger LOG = 
LogManager.getLogger(FlightSqlConnectProcessor.class);
@@ -171,6 +173,7 @@ public class FlightSqlConnectProcessor extends 
ConnectProcessor implements AutoC
     @Override
     public void close() throws Exception {
         ctx.setCommand(MysqlCommand.COM_SLEEP);
+        ctx.clear();
         // TODO support query profile
         for (StmtExecutor asynExecutor : returnResultFromRemoteExecutor) {
             asynExecutor.finalizeQuery();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
index 406497c77db..50b34845336 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSqlConnectContext.java
@@ -61,6 +61,7 @@ public class FlightSqlConnectContext extends ConnectContext {
         if (flightSqlChannel != null) {
             flightSqlChannel.close();
         }
+        connectScheduler.unregisterConnection(this);
     }
 
     // kill operation with no protect.
@@ -70,8 +71,8 @@ public class FlightSqlConnectContext extends ConnectContext {
 
         if (killConnection) {
             isKilled = true;
+            // Close channel and break connection with client.
             closeChannel();
-            connectScheduler.unregisterConnection(this);
         }
         // Now, cancel running query.
         cancelQuery("arrow flight query killed by user");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
index 57101d995e0..bd3e85cd0c2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java
@@ -20,6 +20,7 @@
 package org.apache.doris.service.arrowflight.tokens;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.CustomThreadFactory;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.service.ExecuteEnv;
 import org.apache.doris.service.arrowflight.auth2.FlightAuthResult;
@@ -38,6 +39,8 @@ import java.math.BigInteger;
 import java.security.SecureRandom;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -53,6 +56,7 @@ public class FlightTokenManagerImpl implements 
FlightTokenManager {
     private final LoadingCache<String, FlightTokenDetails> tokenCache;
     // <username, <token, 1>>
     private final ConcurrentHashMap<String, LoadingCache<String, Integer>> 
usersTokenLRU = new ConcurrentHashMap<>();
+    private ScheduledExecutorService cleanupExecutor;
 
     public FlightTokenManagerImpl(final int cacheSize, final int 
cacheExpiration) {
         this.cacheSize = cacheSize;
@@ -66,13 +70,15 @@ public class FlightTokenManagerImpl implements 
FlightTokenManager {
                         // TODO: broadcast this message to other FE
                         String token = notification.getKey();
                         FlightTokenDetails tokenDetails = 
notification.getValue();
-                        LOG.info("evict bearer token: " + token + ", reason: 
token number exceeded, "
-                                + notification.getCause());
-                        ConnectContext context = 
ExecuteEnv.getInstance().getScheduler()
-                                .getContext(token);
+                        ConnectContext context = 
ExecuteEnv.getInstance().getScheduler().getContext(token);
                         if (context != null) {
                             
ExecuteEnv.getInstance().getScheduler().unregisterConnection(context);
-                            LOG.info("unregister flight connect context after 
evict bearer token: " + token);
+                            LOG.info("evict bearer token: " + token + " from 
tokenCache, reason: "
+                                    + notification.getCause()
+                                    + ", and unregister flight connection 
context after evict bearer token");
+                        } else {
+                            LOG.info("evict bearer token: " + token + " from 
tokenCache, reason: "
+                                    + notification.getCause() + ", and flight 
connection context not exist");
                         }
                         
usersTokenLRU.get(tokenDetails.getUsername()).invalidate(token);
                     }
@@ -82,6 +88,8 @@ public class FlightTokenManagerImpl implements 
FlightTokenManager {
                         return new FlightTokenDetails();
                     }
                 });
+        this.cleanupExecutor = Executors.newScheduledThreadPool(1, new 
CustomThreadFactory("flight-token-cleanup"));
+        this.cleanupExecutor.scheduleAtFixedRate(() -> 
this.tokenCache.cleanUp(), 1, 1, TimeUnit.MINUTES);
     }
 
     // From 
https://stackoverflow.com/questions/41107/how-to-generate-a-random-alpha-numeric-string
@@ -114,9 +122,9 @@ public class FlightTokenManagerImpl implements 
FlightTokenManager {
                                 public void onRemoval(@NotNull 
RemovalNotification<String, Integer> notification) {
                                     // TODO: broadcast this message to other FE
                                     assert notification.getKey() != null;
-                                    
tokenCache.invalidate(notification.getKey());
                                     LOG.info("evict bearer token: " + 
notification.getKey()
-                                            + ", reason: user connection 
exceeded, " + notification.getCause());
+                                            + " from usersTokenLRU, reason: " 
+ notification.getCause());
+                                    
tokenCache.invalidate(notification.getKey());
                                 }
                             }).build(new CacheLoader<String, Integer>() {
                                 @NotNull


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to