github-actions[bot] commented on code in PR #65173:
URL: https://github.com/apache/doris/pull/65173#discussion_r3518576075
##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -1104,90 +1105,130 @@ public TFetchResourceResult fetchResource() throws
TException {
@Override
public TMasterOpResult forward(TMasterOpRequest params) throws TException {
+ Frontend requester = validateForwardRequester(params);
+ TMasterOpResult shortcut = handleForwardShortcut(params);
+ if (shortcut != null) {
+ return shortcut;
+ }
+ logForwardRequest(params);
+ ConnectContext context = createForwardContext(params, requester);
+ ConnectProcessor processor = createForwardProcessor(context);
+ Runnable clearCallback = registerProxyQuery(params, context);
+ try {
+ return executeForward(params, context, processor);
+ } finally {
+ ConnectContext.remove();
+ clearCallback.run();
+ }
+ }
+
+ private Frontend validateForwardRequester(TMasterOpRequest params) throws
TException {
Frontend fe =
Env.getCurrentEnv().checkFeExist(params.getClientNodeHost(),
params.getClientNodePort());
- if (fe == null) {
- LOG.warn("reject request from invalid host. client: {}",
params.getClientNodeHost());
- throw new TException("request from invalid host was rejected.");
+ if (fe != null) {
+ return fe;
}
+ LOG.warn("reject request from invalid host. client: {}",
params.getClientNodeHost());
+ throw new TException("request from invalid host was rejected.");
+ }
+
+ private TMasterOpResult handleForwardShortcut(TMasterOpRequest params)
throws TException {
if (params.isSyncJournalOnly()) {
- final TMasterOpResult result = new TMasterOpResult();
- result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
- // just make the protocol happy
- result.setPacket("".getBytes());
- return result;
+ return createForwardAckResult();
}
if (params.getGroupCommitInfo() != null &&
params.getGroupCommitInfo().isGetGroupCommitLoadBeId()) {
- final TGroupCommitInfo info = params.getGroupCommitInfo();
- final TMasterOpResult result = new TMasterOpResult();
- try {
-
result.setGroupCommitLoadBeId(Env.getCurrentEnv().getGroupCommitManager()
-
.selectBackendForGroupCommitInternal(info.groupCommitLoadTableId,
info.cluster));
- } catch (LoadException | DdlException e) {
- throw new TException(e.getMessage());
- }
- // just make the protocol happy
- result.setPacket("".getBytes());
- return result;
+ return handleGroupCommitLoadBeId(params.getGroupCommitInfo());
}
if (params.getGroupCommitInfo() != null &&
params.getGroupCommitInfo().isUpdateLoadData()) {
- final TGroupCommitInfo info = params.getGroupCommitInfo();
- final TMasterOpResult result = new TMasterOpResult();
Env.getCurrentEnv().getGroupCommitManager()
- .updateLoadData(info.tableId, info.receiveData);
- // just make the protocol happy
- result.setPacket("".getBytes());
- return result;
+ .updateLoadData(params.getGroupCommitInfo().tableId,
params.getGroupCommitInfo().receiveData);
+ return createForwardAckResult();
}
- if (params.isSetCancelQeury() && params.isCancelQeury()) {
- if (!params.isSetQueryId()) {
- throw new TException("a query id is needed to cancel a query");
- }
- TUniqueId queryId = params.getQueryId();
- ConnectContext ctx = proxyQueryIdToConnCtx.get(queryId);
- if (ctx != null) {
- ctx.cancelQuery(new Status(TStatusCode.CANCELLED, "cancel
query by forward request."));
- }
- final TMasterOpResult result = new TMasterOpResult();
- result.setStatusCode(0);
- result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
- // just make the protocol happy
- result.setPacket("".getBytes());
- return result;
+ if (!params.isSetCancelQeury() || !params.isCancelQeury()) {
+ return null;
+ }
+ return handleForwardCancel(params);
+ }
+
+ private TMasterOpResult createForwardAckResult() {
+ TMasterOpResult result = new TMasterOpResult();
+ result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
+ result.setPacket("".getBytes());
+ return result;
+ }
+
+ private TMasterOpResult handleGroupCommitLoadBeId(TGroupCommitInfo info) {
+ TMasterOpResult result = createForwardAckResult();
+ try {
+
result.setGroupCommitLoadBeId(Env.getCurrentEnv().getGroupCommitManager()
+
.selectBackendForGroupCommitInternal(info.groupCommitLoadTableId, info.cluster,
+
ResourceGroupAffinityPolicyFactory.get().forwardedLoadDecision(
+ info.getLoadAffinityPreferredGroup(),
info.getLoadAffinityPolicy())));
+ } catch (LoadException | DdlException e) {
+ // Throwing TException here surfaces on the requesting FE as a
transport error with a
+ // null message; carry the selection error through the result
instead.
+ result.setStatusCode(1);
+ result.setErrMessage(e.getMessage() == null ? e.toString() :
e.getMessage());
}
+ return result;
+ }
+
+ private TMasterOpResult handleForwardCancel(TMasterOpRequest params)
throws TException {
+ if (!params.isSetQueryId()) {
+ throw new TException("a query id is needed to cancel a query");
+ }
+ ConnectContext context =
proxyQueryIdToConnCtx.get(params.getQueryId());
+ if (context != null) {
+ context.cancelQuery(new Status(TStatusCode.CANCELLED, "cancel
query by forward request."));
+ }
+ TMasterOpResult result = createForwardAckResult();
+ result.setStatusCode(0);
+ return result;
+ }
- // add this log so that we can track this stmt
+ private void logForwardRequest(TMasterOpRequest params) {
if (LOG.isDebugEnabled()) {
LOG.debug("receive forwarded stmt {} from FE: {}",
params.getStmtId(), params.getClientNodeHost());
}
+ }
+
+ private ConnectContext createForwardContext(TMasterOpRequest params,
Frontend requester) {
ConnectContext context = new ConnectContext(null, true,
params.getSessionId());
// Set current connected FE to the client address, so that we can know
where
// this request come from.
context.setCurrentConnectedFEIp(params.getClientNodeHost());
+
context.setConnectingFeLocalResourceGroup(requester.getLocalResourceGroup());
Review Comment:
This forwarded context can use a stale requester local group.
`FEOpExecutor.buildStmtForwardParams()` only sends the requester host/port, and
this line resolves the group from the master-side heartbeat-cached `Frontend`
object. That field is transient and only refreshed by later OK FE heartbeats,
so a follower that restarts with a different
`--local_resource_group`/`DORIS_LOCAL_RESOURCE_GROUP` can forward SQL before
the heartbeat arrives and default-affinity code will see the old or empty
group. Please carry the effective local group in `TMasterOpRequest` as an
optional field and prefer that per-request value here.
##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java:
##########
@@ -64,6 +64,7 @@ public class FrontendsTableValuedFunction extends
MetadataTableValuedFunction {
new Column("ErrMsg", ScalarType.createStringType()),
new Column("Version", ScalarType.createStringType()),
new Column("CurrentConnected", ScalarType.createStringType()),
+ new Column("LocalResourceGroup", ScalarType.createStringType()),
Review Comment:
This schema change leaves `test_frontends_tvf` stale. `frontends()` now
exposes `LocalResourceGroup` before `LiveSince`, so `select *` returns 21
columns, but the regression still asserts `table[0].size() == 20` and
`titleNames[19][0] == "LiveSince"` in
`regression-test/suites/external_table_p0/tvf/test_frontends_tvf.groovy`.
Please update the regression expectations and full-column projection to include
`LocalResourceGroup` at index 19 and `LiveSince` at index 20.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]