This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new a9855ca1738 [Improvement](set) enable admin_set_frontend_config can
apply to all fe (#37022)
a9855ca1738 is described below
commit a9855ca17387369849cb489e376b11b16c0757cc
Author: Yulei-Yang <[email protected]>
AuthorDate: Sat Jun 29 09:17:38 2024 +0800
[Improvement](set) enable admin_set_frontend_config can apply to all fe
(#37022)
bp #34685
---
fe/fe-core/src/main/cup/sql_parser.cup | 10 +-
.../apache/doris/analysis/AdminSetConfigStmt.java | 19 +-
.../main/java/org/apache/doris/catalog/Env.java | 19 +-
.../java/org/apache/doris/qe/FEOpExecutor.java | 216 +++++++++++++++++++++
4 files changed, 261 insertions(+), 3 deletions(-)
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup
b/fe/fe-core/src/main/cup/sql_parser.cup
index a62f3dcf0f1..871c0261e0c 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -7517,7 +7517,15 @@ admin_stmt ::=
:}
| KW_ADMIN KW_SET KW_FRONTEND KW_CONFIG opt_key_value_map:configs
{:
- RESULT = new
AdminSetConfigStmt(AdminSetConfigStmt.ConfigType.FRONTEND, configs);
+ RESULT = new
AdminSetConfigStmt(AdminSetConfigStmt.ConfigType.FRONTEND, configs, false);
+ :}
+ | KW_ADMIN KW_SET KW_ALL KW_FRONTENDS KW_CONFIG opt_key_value_map:configs
+ {:
+ RESULT = new
AdminSetConfigStmt(AdminSetConfigStmt.ConfigType.FRONTEND, configs, true);
+ :}
+ | KW_ADMIN KW_SET KW_FRONTEND KW_CONFIG opt_key_value_map:configs KW_ALL
+ {:
+ RESULT = new
AdminSetConfigStmt(AdminSetConfigStmt.ConfigType.FRONTEND, configs, true);
:}
// deprecated
| KW_ADMIN KW_SHOW KW_FRONTEND KW_CONFIG opt_wild_where
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminSetConfigStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminSetConfigStmt.java
index 166f9a70096..1d2e22ee878 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminSetConfigStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AdminSetConfigStmt.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.OriginStatement;
import com.google.common.collect.Maps;
@@ -38,22 +39,25 @@ public class AdminSetConfigStmt extends DdlStmt {
BACKEND
}
+ private boolean applyToAll;
private ConfigType type;
private Map<String, String> configs;
private RedirectStatus redirectStatus = RedirectStatus.NO_FORWARD;
- public AdminSetConfigStmt(ConfigType type, Map<String, String> configs) {
+ public AdminSetConfigStmt(ConfigType type, Map<String, String> configs,
boolean applyToAll) {
this.type = type;
this.configs = configs;
if (this.configs == null) {
this.configs = Maps.newHashMap();
}
+ this.applyToAll = applyToAll;
// we have to analyze configs here to determine whether to forward it
to master
for (String key : this.configs.keySet()) {
if (ConfigBase.checkIsMasterOnly(key)) {
redirectStatus = RedirectStatus.FORWARD_NO_SYNC;
+ this.applyToAll = false;
}
}
}
@@ -66,6 +70,10 @@ public class AdminSetConfigStmt extends DdlStmt {
return configs;
}
+ public boolean isApplyToAll() {
+ return applyToAll;
+ }
+
@Override
public void analyze(Analyzer analyzer) throws AnalysisException,
UserException {
super.analyze(analyzer);
@@ -87,4 +95,13 @@ public class AdminSetConfigStmt extends DdlStmt {
public RedirectStatus getRedirectStatus() {
return redirectStatus;
}
+
+ public OriginStatement getLocalSetStmt() {
+ OriginStatement stmt = this.getOrigStmt();
+ Object[] keyArr = configs.keySet().toArray();
+ String sql = String.format("ADMIN SET FRONTEND CONFIG (\"%s\" =
\"%s\");",
+ keyArr[0].toString(), configs.get(keyArr[0].toString()));
+
+ return new OriginStatement(sql, stmt.idx);
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 380eb361c00..6f8cd9e727d 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -233,6 +233,7 @@ import org.apache.doris.plugin.PluginMgr;
import org.apache.doris.policy.PolicyMgr;
import org.apache.doris.qe.AuditEventProcessor;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.FEOpExecutor;
import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.qe.JournalObservable;
import org.apache.doris.qe.QueryCancelWorker;
@@ -5386,7 +5387,7 @@ public class Env {
globalFunctionMgr.replayDropFunction(functionSearchDesc);
}
- public void setConfig(AdminSetConfigStmt stmt) throws DdlException {
+ public void setConfig(AdminSetConfigStmt stmt) throws Exception {
Map<String, String> configs = stmt.getConfigs();
Preconditions.checkState(configs.size() == 1);
@@ -5397,6 +5398,22 @@ public class Env {
throw new DdlException(e.getMessage());
}
}
+
+ if (stmt.isApplyToAll()) {
+ for (Frontend fe : Env.getCurrentEnv().getFrontends(null /* all
*/)) {
+ if (!fe.isAlive() ||
fe.getHost().equals(Env.getCurrentEnv().getSelfNode().getHost())) {
+ continue;
+ }
+
+ TNetworkAddress feAddr = new TNetworkAddress(fe.getHost(),
fe.getRpcPort());
+ FEOpExecutor executor = new FEOpExecutor(feAddr,
stmt.getLocalSetStmt(), ConnectContext.get(), false);
+ executor.execute();
+ if (executor.getStatusCode() != TStatusCode.OK.getValue()) {
+ throw new DdlException(String.format("failed to apply to
fe %s:%s, error message: %s",
+ fe.getHost(), fe.getRpcPort(),
executor.getErrMsg()));
+ }
+ }
+ }
}
public void replayBackendReplicasInfo(BackendReplicasInfo
backendReplicasInfo) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
new file mode 100644
index 00000000000..162bc0623be
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/FEOpExecutor.java
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.qe;
+
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.ErrorCode;
+import org.apache.doris.thrift.FrontendService;
+import org.apache.doris.thrift.TExpr;
+import org.apache.doris.thrift.TExprNode;
+import org.apache.doris.thrift.TMasterOpRequest;
+import org.apache.doris.thrift.TMasterOpResult;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+
+import java.util.Map;
+
+public class FEOpExecutor {
+ private static final Logger LOG = LogManager.getLogger(FEOpExecutor.class);
+
+ private static final float RPC_TIMEOUT_COEFFICIENT = 1.2f;
+
+ private final OriginStatement originStmt;
+ private final ConnectContext ctx;
+ private TMasterOpResult result;
+ private TNetworkAddress feAddr;
+
+ // the total time of thrift connectTime, readTime and writeTime
+ private int thriftTimeoutMs;
+
+ private boolean shouldNotRetry;
+
+ public FEOpExecutor(TNetworkAddress feAddress, OriginStatement originStmt,
ConnectContext ctx, boolean isQuery) {
+ this.feAddr = feAddress;
+ this.originStmt = originStmt;
+ this.ctx = ctx;
+ this.thriftTimeoutMs = (int) (ctx.getExecTimeout() * 1000 *
RPC_TIMEOUT_COEFFICIENT);
+ // if isQuery=false, we shouldn't retry twice when catch exception
because of Idempotency
+ this.shouldNotRetry = !isQuery;
+ }
+
+ public void execute() throws Exception {
+ result = forward(feAddr, buildStmtForwardParams());
+ }
+
+ public void cancel() throws Exception {
+ TUniqueId queryId = ctx.queryId();
+ if (queryId == null) {
+ return;
+ }
+ Preconditions.checkNotNull(feAddr, "query with id %s is not forwarded
to fe", queryId);
+ TMasterOpRequest request = new TMasterOpRequest();
+ request.setCancelQeury(true);
+ request.setQueryId(queryId);
+ request.setDb(ctx.getDatabase());
+ request.setUser(ctx.getQualifiedUser());
+ request.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
+ request.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
+ // just make the protocol happy
+ request.setSql("");
+ result = forward(feAddr, request);
+ }
+
+ // Send request to specific fe
+ private TMasterOpResult forward(TNetworkAddress thriftAddress,
TMasterOpRequest params) throws Exception {
+ ctx.getEnv().checkReadyOrThrow();
+
+ FrontendService.Client client;
+ try {
+ client = ClientPool.frontendPool.borrowObject(thriftAddress,
thriftTimeoutMs);
+ } catch (Exception e) {
+ // may throw NullPointerException. add err msg
+ throw new Exception("Failed to get fe client: " +
thriftAddress.toString(), e);
+ }
+ final StringBuilder forwardMsg = new StringBuilder("forward to FE " +
thriftAddress.toString());
+ forwardMsg.append(", statement id: ").append(ctx.getStmtId());
+ LOG.info(forwardMsg.toString());
+
+ boolean isReturnToPool = false;
+ try {
+ final TMasterOpResult result = client.forward(params);
+ isReturnToPool = true;
+ return result;
+ } catch (TTransportException e) {
+ // wrap the raw exception.
+ forwardMsg.append(" : failed");
+ Exception exception = new
ForwardToFEException(forwardMsg.toString(), e);
+
+ boolean ok = ClientPool.frontendPool.reopen(client,
thriftTimeoutMs);
+ if (!ok) {
+ throw exception;
+ }
+ if (shouldNotRetry || e.getType() ==
TTransportException.TIMED_OUT) {
+ throw exception;
+ } else {
+ LOG.warn(forwardMsg.append(" twice").toString(), e);
+ try {
+ TMasterOpResult result = client.forward(params);
+ isReturnToPool = true;
+ return result;
+ } catch (TException ex) {
+ throw exception;
+ }
+ }
+ } finally {
+ if (isReturnToPool) {
+ ClientPool.frontendPool.returnObject(thriftAddress, client);
+ } else {
+ ClientPool.frontendPool.invalidateObject(thriftAddress,
client);
+ }
+ }
+ }
+
+ private TMasterOpRequest buildStmtForwardParams() {
+ TMasterOpRequest params = new TMasterOpRequest();
+ // node ident
+ params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
+ params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
+ params.setSql(originStmt.originStmt);
+ params.setStmtIdx(originStmt.idx);
+ params.setUser(ctx.getQualifiedUser());
+ params.setDefaultCatalog(ctx.getDefaultCatalog());
+ params.setDefaultDatabase(ctx.getDatabase());
+ params.setDb(ctx.getDatabase());
+ params.setUserIp(ctx.getRemoteIP());
+ params.setStmtId(ctx.getStmtId());
+ params.setCurrentUserIdent(ctx.getCurrentUserIdentity().toThrift());
+ params.setCluster(String.valueOf(ctx.getEnv().getClusterId()));
+
+ // query options
+
params.setQueryOptions(ctx.getSessionVariable().getQueryOptionVariables());
+ // session variables
+
params.setSessionVariables(ctx.getSessionVariable().getForwardVariables());
+ params.setUserVariables(getForwardUserVariables(ctx.getUserVars()));
+ if (null != ctx.queryId()) {
+ params.setQueryId(ctx.queryId());
+ }
+ return params;
+ }
+
+ public int getStatusCode() {
+ if (result == null || !result.isSetStatusCode()) {
+ return ErrorCode.ERR_UNKNOWN_ERROR.getCode();
+ }
+ return result.getStatusCode();
+ }
+
+ public String getErrMsg() {
+ if (result == null) {
+ return ErrorCode.ERR_UNKNOWN_ERROR.getErrorMsg();
+ }
+ if (!result.isSetErrMessage()) {
+ return "";
+ }
+ return result.getErrMessage();
+ }
+
+ private Map<String, TExprNode> getForwardUserVariables(Map<String,
LiteralExpr> userVariables) {
+ Map<String, TExprNode> forwardVariables = Maps.newHashMap();
+ for (Map.Entry<String, LiteralExpr> entry : userVariables.entrySet()) {
+ LiteralExpr literalExpr = entry.getValue();
+ TExpr tExpr = literalExpr.treeToThrift();
+ TExprNode tExprNode = tExpr.nodes.get(0);
+ forwardVariables.put(entry.getKey(), tExprNode);
+ }
+ return forwardVariables;
+ }
+
+ public static class ForwardToFEException extends RuntimeException {
+
+ private static final Map<Integer, String> TYPE_MSG_MAP =
+ ImmutableMap.<Integer, String>builder()
+ .put(TTransportException.UNKNOWN, "Unknown exception")
+ .put(TTransportException.NOT_OPEN, "Connection is not
open")
+ .put(TTransportException.ALREADY_OPEN, "Connection has
already opened up")
+ .put(TTransportException.TIMED_OUT, "Connection
timeout")
+ .put(TTransportException.END_OF_FILE, "EOF")
+ .put(TTransportException.CORRUPTED_DATA, "Corrupted
data")
+ .build();
+
+ private final String msg;
+
+ public ForwardToFEException(String msg, TTransportException exception)
{
+ this.msg = msg + ", cause: " +
TYPE_MSG_MAP.get(exception.getType()) + ", " + exception.getMessage();
+ }
+
+ @Override
+ public String getMessage() {
+ return msg;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]