This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4cbbd25d8c9 [fix](ctx) manager the lifecycle of connection context
(#29346)
4cbbd25d8c9 is described below
commit 4cbbd25d8c9f0a4ef8178fb38f49738c6ec45963
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon Jan 1 23:32:28 2024 +0800
[fix](ctx) manager the lifecycle of connection context (#29346)
In FrontendService, we may create some connection context and set it as a
thread local varaible.
These context should be removed from thread local after call.
Otherwise, it may be reused by other thread incorrectly.
---
be/src/runtime/group_commit_mgr.cpp | 2 ++
.../planner/external/FederationBackendPolicy.java | 11 ++++--
.../apache/doris/service/FrontendServiceImpl.java | 41 ++++++++++++----------
3 files changed, 32 insertions(+), 22 deletions(-)
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index acb40ae6c78..00b3a559fbf 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -235,6 +235,8 @@ Status GroupCommitTable::_create_group_commit_load(
request.__set_token("group_commit"); // this is a fake, fe not check it now
request.__set_max_filter_ratio(1.0);
request.__set_strictMode(false);
+ // this is an internal interface, use admin to pass the auth check
+ request.__set_user("admin");
if (_exec_env->master_info()->__isset.backend_id) {
request.__set_backend_id(_exec_env->master_info()->backend_id);
} else {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
index d1d7e90d35a..ade03291c30 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FederationBackendPolicy.java
@@ -30,6 +30,7 @@ import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@@ -122,9 +123,13 @@ public class FederationBackendPolicy {
Set<Tag> tags = Sets.newHashSet();
if (ConnectContext.get() != null &&
ConnectContext.get().getCurrentUserIdentity() != null) {
String qualifiedUser =
ConnectContext.get().getCurrentUserIdentity().getQualifiedUser();
- tags =
Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
- if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
- throw new UserException("No valid resource tag for user: " +
qualifiedUser);
+ // Some request from stream load(eg, mysql load) may not set user
info in ConnectContext
+ // just ignore it.
+ if (!Strings.isNullOrEmpty(qualifiedUser)) {
+ tags =
Env.getCurrentEnv().getAuth().getResourceTags(qualifiedUser);
+ if (tags == UserProperty.INVALID_RESOURCE_TAGS) {
+ throw new UserException("No valid resource tag for user: "
+ qualifiedUser);
+ }
}
} else {
LOG.debug("user info in ExternalFileScanNode should not be null,
add log to observer");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index e5b0f651016..136084f5f1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1786,9 +1786,19 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
TStreamLoadPutResult result = new TStreamLoadPutResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
+
+ // create connect context
+ ConnectContext ctx = new ConnectContext();
+ ctx.setEnv(Env.getCurrentEnv());
+ ctx.setQueryId(request.getLoadId());
+
ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(request.getUser(),
"%"));
+ ctx.setQualifiedUser(request.getUser());
+ ctx.setBackendId(request.getBackendId());
+ ctx.setThreadLocalInfo();
+
try {
if (!Strings.isNullOrEmpty(request.getLoadSql())) {
- httpStreamPutImpl(request, result);
+ httpStreamPutImpl(request, result, ctx);
return result;
} else {
if (Config.enable_pipeline_load) {
@@ -1806,6 +1816,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(e.getClass().getSimpleName() + ": " +
Strings.nullToEmpty(e.getMessage()));
return result;
+ } finally {
+ ConnectContext.remove();
}
return result;
}
@@ -1917,12 +1929,12 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
return result;
}
-
- private void httpStreamPutImpl(TStreamLoadPutRequest request,
TStreamLoadPutResult result)
+ private void httpStreamPutImpl(TStreamLoadPutRequest request,
TStreamLoadPutResult result, ConnectContext ctx)
throws UserException {
- LOG.info("receive http stream put request");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("receive http stream put request: {}", request);
+ }
String originStmt = request.getLoadSql();
- ConnectContext ctx = new ConnectContext();
if (request.isSetAuthCode()) {
// TODO(cmy): find a way to check
} else if (Strings.isNullOrEmpty(request.getToken())) {
@@ -1930,12 +1942,6 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
request.getTbl(),
request.getUserIp(), PrivPredicate.LOAD);
}
- ctx.setEnv(Env.getCurrentEnv());
- ctx.setQueryId(request.getLoadId());
- ctx.setCurrentUserIdentity(UserIdentity.ROOT);
- ctx.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser());
- ctx.setBackendId(request.getBackendId());
- ctx.setThreadLocalInfo();
SqlScanner input = new SqlScanner(new StringReader(originStmt),
ctx.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
try {
@@ -2859,16 +2865,11 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
restoreStmt.setIsBeingSynced();
LOG.trace("restore snapshot info, restoreStmt: {}", restoreStmt);
try {
- ConnectContext ctx = ConnectContext.get();
- if (ctx == null) {
- ctx = new ConnectContext();
- ctx.setThreadLocalInfo();
- }
+ ConnectContext ctx = new ConnectContext();
ctx.setQualifiedUser(request.getUser());
String fullUserName =
ClusterNamespace.getNameFromFullName(request.getUser());
- UserIdentity currentUserIdentity = new UserIdentity(fullUserName,
"%");
- ctx.setCurrentUserIdentity(currentUserIdentity);
-
+
ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(fullUserName,
"%"));
+ ctx.setThreadLocalInfo();
Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx);
restoreStmt.analyze(analyzer);
DdlExecutor.execute(Env.getCurrentEnv(), restoreStmt);
@@ -2880,6 +2881,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
+ } finally {
+ ConnectContext.remove();
}
return result;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]