This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cbbd25d8c9 [fix](ctx) manager the lifecycle of connection context 
(#29346)
4cbbd25d8c9 is described below

commit 4cbbd25d8c9f0a4ef8178fb38f49738c6ec45963
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Mon Jan 1 23:32:28 2024 +0800

    [fix](ctx) manager the lifecycle of connection context (#29346)
    
    In FrontendService, we may create some connection context and set it as a 
thread local varaible.
    These context should be removed from thread local after call.
    Otherwise, it may be reused by other thread incorrectly.
---
 be/src/runtime/group_commit_mgr.cpp                |  2 ++
 .../planner/external/FederationBackendPolicy.java  | 11 ++++--
 .../apache/doris/service/FrontendServiceImpl.java  | 41 ++++++++++++----------
 3 files changed, 32 insertions(+), 22 deletions(-)

diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index acb40ae6c78..00b3a559fbf 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -235,6 +235,8 @@ Status GroupCommitTable::_create_group_commit_load(
     request.__set_token("group_commit"); // this is a fake, fe not check it now
     request.__set_max_filter_ratio(1.0);
     request.__set_strictMode(false);
+    // this is an internal interface, use admin to pass the auth check
+    request.__set_user("admin");
     if (_exec_env->master_info()->__isset.backend_id) {
         request.__set_backend_id(_exec_env->master_info()->backend_id);
     } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
index d1d7e90d35a..ade03291c30 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
@@ -30,6 +30,7 @@ import org.apache.doris.thrift.TFileRangeDesc;
 import org.apache.doris.thrift.TScanRangeLocations;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
@@ -122,9 +123,13 @@ public class FederationBackendPolicy {
         Set<Tag> tags = Sets.newHashSet();
         if (ConnectContext.get() != null && 
ConnectContext.get().getCurrentUserIdentity() != null) {
             String qualifiedUser = 
ConnectContext.get().getCurrentUserIdentity().getQualifiedUser();
-            tags = 
Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
-            if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
-                throw new UserException("No valid resource tag for user: " + 
qualifiedUser);
+            // Some request from stream load(eg, mysql load) may not set user 
info in ConnectContext
+            // just ignore it.
+            if (!Strings.isNullOrEmpty(qualifiedUser)) {
+                tags = 
Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
+                if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
+                    throw new UserException("No valid resource tag for user: " 
+ qualifiedUser);
+                }
             }
         } else {
             LOG.debug("user info in ExternalFileScanNode should not be null, 
add log to observer");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index e5b0f651016..136084f5f1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1786,9 +1786,19 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         TStreamLoadPutResult result = new TStreamLoadPutResult();
         TStatus status = new TStatus(TStatusCode.OK);
         result.setStatus(status);
+
+        // create connect context
+        ConnectContext ctx = new ConnectContext();
+        ctx.setEnv(Env.getCurrentEnv());
+        ctx.setQueryId(request.getLoadId());
+        
ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(request.getUser(),
 "%"));
+        ctx.setQualifiedUser(request.getUser());
+        ctx.setBackendId(request.getBackendId());
+        ctx.setThreadLocalInfo();
+
         try {
             if (!Strings.isNullOrEmpty(request.getLoadSql())) {
-                httpStreamPutImpl(request, result);
+                httpStreamPutImpl(request, result, ctx);
                 return result;
             } else {
                 if (Config.enable_pipeline_load) {
@@ -1806,6 +1816,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             status.setStatusCode(TStatusCode.INTERNAL_ERROR);
             status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + 
Strings.nullToEmpty(e.getMessage()));
             return result;
+        } finally {
+            ConnectContext.remove();
         }
         return result;
     }
@@ -1917,12 +1929,12 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         return result;
     }
 
-
-    private void httpStreamPutImpl(TStreamLoadPutRequest request, 
TStreamLoadPutResult result)
+    private void httpStreamPutImpl(TStreamLoadPutRequest request, 
TStreamLoadPutResult result, ConnectContext ctx)
             throws UserException {
-        LOG.info("receive http stream put request");
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("receive http stream put request: {}", request);
+        }
         String originStmt = request.getLoadSql();
-        ConnectContext ctx = new ConnectContext();
         if (request.isSetAuthCode()) {
             // TODO(cmy): find a way to check
         } else if (Strings.isNullOrEmpty(request.getToken())) {
@@ -1930,12 +1942,6 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                     request.getTbl(),
                     request.getUserIp(), PrivPredicate.LOAD);
         }
-        ctx.setEnv(Env.getCurrentEnv());
-        ctx.setQueryId(request.getLoadId());
-        ctx.setCurrentUserIdentity(UserIdentity.ROOT);
-        ctx.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser());
-        ctx.setBackendId(request.getBackendId());
-        ctx.setThreadLocalInfo();
         SqlScanner input = new SqlScanner(new StringReader(originStmt), 
ctx.getSessionVariable().getSqlMode());
         SqlParser parser = new SqlParser(input);
         try {
@@ -2859,16 +2865,11 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         restoreStmt.setIsBeingSynced();
         LOG.trace("restore snapshot info, restoreStmt: {}", restoreStmt);
         try {
-            ConnectContext ctx = ConnectContext.get();
-            if (ctx == null) {
-                ctx = new ConnectContext();
-                ctx.setThreadLocalInfo();
-            }
+            ConnectContext ctx = new ConnectContext();
             ctx.setQualifiedUser(request.getUser());
             String fullUserName = 
ClusterNamespace.getNameFromFullName(request.getUser());
-            UserIdentity currentUserIdentity = new UserIdentity(fullUserName, 
"%");
-            ctx.setCurrentUserIdentity(currentUserIdentity);
-
+            
ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(fullUserName,
 "%"));
+            ctx.setThreadLocalInfo();
             Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx);
             restoreStmt.analyze(analyzer);
             DdlExecutor.execute(Env.getCurrentEnv(), restoreStmt);
@@ -2880,6 +2881,8 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             LOG.warn("catch unknown result.", e);
             status.setStatusCode(TStatusCode.INTERNAL_ERROR);
             status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+        } finally {
+            ConnectContext.remove();
         }
 
         return result;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to