github-actions[bot] commented on code in PR #65173:
URL: https://github.com/apache/doris/pull/65173#discussion_r3518576075


##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -1104,90 +1105,130 @@ public TFetchResourceResult fetchResource() throws 
TException {
 
     @Override
     public TMasterOpResult forward(TMasterOpRequest params) throws TException {
+        Frontend requester = validateForwardRequester(params);
+        TMasterOpResult shortcut = handleForwardShortcut(params);
+        if (shortcut != null) {
+            return shortcut;
+        }
+        logForwardRequest(params);
+        ConnectContext context = createForwardContext(params, requester);
+        ConnectProcessor processor = createForwardProcessor(context);
+        Runnable clearCallback = registerProxyQuery(params, context);
+        try {
+            return executeForward(params, context, processor);
+        } finally {
+            ConnectContext.remove();
+            clearCallback.run();
+        }
+    }
+
+    private Frontend validateForwardRequester(TMasterOpRequest params) throws 
TException {
         Frontend fe = 
Env.getCurrentEnv().checkFeExist(params.getClientNodeHost(), 
params.getClientNodePort());
-        if (fe == null) {
-            LOG.warn("reject request from invalid host. client: {}", 
params.getClientNodeHost());
-            throw new TException("request from invalid host was rejected.");
+        if (fe != null) {
+            return fe;
         }
+        LOG.warn("reject request from invalid host. client: {}", 
params.getClientNodeHost());
+        throw new TException("request from invalid host was rejected.");
+    }
+
+    private TMasterOpResult handleForwardShortcut(TMasterOpRequest params) 
throws TException {
         if (params.isSyncJournalOnly()) {
-            final TMasterOpResult result = new TMasterOpResult();
-            result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
-            // just make the protocol happy
-            result.setPacket("".getBytes());
-            return result;
+            return createForwardAckResult();
         }
         if (params.getGroupCommitInfo() != null && 
params.getGroupCommitInfo().isGetGroupCommitLoadBeId()) {
-            final TGroupCommitInfo info = params.getGroupCommitInfo();
-            final TMasterOpResult result = new TMasterOpResult();
-            try {
-                
result.setGroupCommitLoadBeId(Env.getCurrentEnv().getGroupCommitManager()
-                        
.selectBackendForGroupCommitInternal(info.groupCommitLoadTableId, 
info.cluster));
-            } catch (LoadException | DdlException e) {
-                throw new TException(e.getMessage());
-            }
-            // just make the protocol happy
-            result.setPacket("".getBytes());
-            return result;
+            return handleGroupCommitLoadBeId(params.getGroupCommitInfo());
         }
         if (params.getGroupCommitInfo() != null && 
params.getGroupCommitInfo().isUpdateLoadData()) {
-            final TGroupCommitInfo info = params.getGroupCommitInfo();
-            final TMasterOpResult result = new TMasterOpResult();
             Env.getCurrentEnv().getGroupCommitManager()
-                    .updateLoadData(info.tableId, info.receiveData);
-            // just make the protocol happy
-            result.setPacket("".getBytes());
-            return result;
+                    .updateLoadData(params.getGroupCommitInfo().tableId, 
params.getGroupCommitInfo().receiveData);
+            return createForwardAckResult();
         }
-        if (params.isSetCancelQeury() && params.isCancelQeury()) {
-            if (!params.isSetQueryId()) {
-                throw new TException("a query id is needed to cancel a query");
-            }
-            TUniqueId queryId = params.getQueryId();
-            ConnectContext ctx = proxyQueryIdToConnCtx.get(queryId);
-            if (ctx != null) {
-                ctx.cancelQuery(new Status(TStatusCode.CANCELLED, "cancel 
query by forward request."));
-            }
-            final TMasterOpResult result = new TMasterOpResult();
-            result.setStatusCode(0);
-            result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
-            // just make the protocol happy
-            result.setPacket("".getBytes());
-            return result;
+        if (!params.isSetCancelQeury() || !params.isCancelQeury()) {
+            return null;
+        }
+        return handleForwardCancel(params);
+    }
+
+    private TMasterOpResult createForwardAckResult() {
+        TMasterOpResult result = new TMasterOpResult();
+        result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
+        result.setPacket("".getBytes());
+        return result;
+    }
+
+    private TMasterOpResult handleGroupCommitLoadBeId(TGroupCommitInfo info) {
+        TMasterOpResult result = createForwardAckResult();
+        try {
+            
result.setGroupCommitLoadBeId(Env.getCurrentEnv().getGroupCommitManager()
+                    
.selectBackendForGroupCommitInternal(info.groupCommitLoadTableId, info.cluster,
+                            
ResourceGroupAffinityPolicyFactory.get().forwardedLoadDecision(
+                                    info.getLoadAffinityPreferredGroup(), 
info.getLoadAffinityPolicy())));
+        } catch (LoadException | DdlException e) {
+            // Throwing TException here surfaces on the requesting FE as a 
transport error with a
+            // null message; carry the selection error through the result 
instead.
+            result.setStatusCode(1);
+            result.setErrMessage(e.getMessage() == null ? e.toString() : 
e.getMessage());
         }
+        return result;
+    }
+
+    private TMasterOpResult handleForwardCancel(TMasterOpRequest params) 
throws TException {
+        if (!params.isSetQueryId()) {
+            throw new TException("a query id is needed to cancel a query");
+        }
+        ConnectContext context = 
proxyQueryIdToConnCtx.get(params.getQueryId());
+        if (context != null) {
+            context.cancelQuery(new Status(TStatusCode.CANCELLED, "cancel 
query by forward request."));
+        }
+        TMasterOpResult result = createForwardAckResult();
+        result.setStatusCode(0);
+        return result;
+    }
 
-        // add this log so that we can track this stmt
+    private void logForwardRequest(TMasterOpRequest params) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("receive forwarded stmt {} from FE: {}", 
params.getStmtId(), params.getClientNodeHost());
         }
+    }
+
+    private ConnectContext createForwardContext(TMasterOpRequest params, 
Frontend requester) {
         ConnectContext context = new ConnectContext(null, true, 
params.getSessionId());
         // Set current connected FE to the client address, so that we can know 
where
         // this request come from.
         context.setCurrentConnectedFEIp(params.getClientNodeHost());
+        
context.setConnectingFeLocalResourceGroup(requester.getLocalResourceGroup());

Review Comment:
   This forwarded context can use a stale requester local group. 
`FEOpExecutor.buildStmtForwardParams()` only sends the requester host/port, and 
this line resolves the group from the master-side heartbeat-cached `Frontend` 
object. That field is transient and only refreshed by later OK FE heartbeats, 
so a follower that restarts with a different 
`--local_resource_group`/`DORIS_LOCAL_RESOURCE_GROUP` can forward SQL before 
the heartbeat arrives and default-affinity code will see the old or empty 
group. Please carry the effective local group in `TMasterOpRequest` as an 
optional field and prefer that per-request value here.



##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java:
##########
@@ -64,6 +64,7 @@ public class FrontendsTableValuedFunction extends 
MetadataTableValuedFunction {
             new Column("ErrMsg", ScalarType.createStringType()),
             new Column("Version", ScalarType.createStringType()),
             new Column("CurrentConnected", ScalarType.createStringType()),
+            new Column("LocalResourceGroup", ScalarType.createStringType()),

Review Comment:
   This schema change leaves `test_frontends_tvf` stale. `frontends()` now 
exposes `LocalResourceGroup` before `LiveSince`, so `select *` returns 21 
columns, but the regression still asserts `table[0].size() == 20` and 
`titleNames[19][0] == "LiveSince"` in 
`regression-test/suites/external_table_p0/tvf/test_frontends_tvf.groovy`. 
Please update the regression expectations and full-column projection to include 
`LocalResourceGroup` at index 19 and `LiveSince` at index 20.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to