This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 6d2924668e31616b694f82b11409f42dbdb63c23
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Mar 12 22:44:30 2024 +0800

    [fix](audit-loader) fix invalid token check logic  (#32095)
    
    The check of the token should be forwarded to Master FE.
    I add a new RPC method `checkToken()` in Frontend for this logic.
    Otherwise, after enable the audit loader, the log from non-master FE can 
not be loaded to audit table
    with `Invalid token` error.
---
 .../org/apache/doris/httpv2/rest/LoadAction.java   |  7 ++-
 .../org/apache/doris/load/loadv2/TokenManager.java | 61 +++++++++++++++++++---
 .../apache/doris/service/FrontendServiceImpl.java  | 28 +++++++---
 gensrc/thrift/FrontendService.thrift               |  2 +
 4 files changed, 81 insertions(+), 17 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 6952bd37b5c..6be5654a2ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.Table;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.LoadException;
+import org.apache.doris.common.UserException;
 import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
 import org.apache.doris.httpv2.entity.RestBaseResult;
 import org.apache.doris.httpv2.exception.UnauthorizedException;
@@ -362,7 +363,11 @@ public class LoadAction extends RestBaseController {
     // temporarily addressing the users' needs for audit logs.
     // So this function is not widely tested under general scenario
     private boolean checkClusterToken(String token) {
-        return 
Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token);
+        try {
+            return 
Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token);
+        } catch (UserException e) {
+            throw new UnauthorizedException(e.getMessage());
+        }
     }
 
     // NOTE: This function can only be used for AuditlogPlugin stream load for 
now.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java
index 6443e6b2322..ca714d66b29 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java
@@ -63,11 +63,6 @@ public class TokenManager {
         return UUID.randomUUID().toString();
     }
 
-    // this method only will be called in master node, since stream load only 
send message to master.
-    public boolean checkAuthToken(String token) {
-        return tokenQueue.contains(token);
-    }
-
     public String acquireToken() throws UserException {
         if (Env.getCurrentEnv().isMaster() || FeConstants.runningUnitTest) {
             return tokenQueue.peek();
@@ -81,9 +76,8 @@ public class TokenManager {
         }
     }
 
-    public String acquireTokenFromMaster() throws TException {
+    private String acquireTokenFromMaster() throws TException {
         TNetworkAddress thriftAddress = getMasterAddress();
-
         FrontendService.Client client = getClient(thriftAddress);
 
         if (LOG.isDebugEnabled()) {
@@ -108,7 +102,7 @@ public class TokenManager {
             } else {
                 TMySqlLoadAcquireTokenResult result = client.acquireToken();
                 if (result.getStatus().getStatusCode() != TStatusCode.OK) {
-                    throw new TException("commit failed.");
+                    throw new TException("acquire token from master failed. " 
+ result.getStatus());
                 }
                 isReturnToPool = true;
                 return result.getToken();
@@ -122,6 +116,57 @@ public class TokenManager {
         }
     }
 
+    /**
+     * Check if the token is valid.
+     * If this is not Master FE, will send the request to Master FE.
+     */
+    public boolean checkAuthToken(String token) throws UserException {
+        if (Env.getCurrentEnv().isMaster() || FeConstants.runningUnitTest) {
+            return tokenQueue.contains(token);
+        } else {
+            try {
+                return checkTokenFromMaster(token);
+            } catch (TException e) {
+                LOG.warn("check token error", e);
+                throw new UserException("Check token from master failed", e);
+            }
+        }
+    }
+
+    private boolean checkTokenFromMaster(String token) throws TException {
+        TNetworkAddress thriftAddress = getMasterAddress();
+        FrontendService.Client client = getClient(thriftAddress);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Send check token to Master {}", thriftAddress);
+        }
+
+        boolean isReturnToPool = false;
+        try {
+            boolean result = client.checkToken(token);
+            isReturnToPool = true;
+            return result;
+        } catch (TTransportException e) {
+            boolean ok = ClientPool.frontendPool.reopen(client, 
thriftTimeoutMs);
+            if (!ok) {
+                throw e;
+            }
+            if (e.getType() == TTransportException.TIMED_OUT) {
+                throw e;
+            } else {
+                boolean result = client.checkToken(token);
+                isReturnToPool = true;
+                return result;
+            }
+        } finally {
+            if (isReturnToPool) {
+                ClientPool.frontendPool.returnObject(thriftAddress, client);
+            } else {
+                ClientPool.frontendPool.invalidateObject(thriftAddress, 
client);
+            }
+        }
+    }
+
 
     private TNetworkAddress getMasterAddress() throws TException {
         Env.getCurrentEnv().checkReadyOrThrowTException();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 97b6f52e331..7f2b0db8c07 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1063,12 +1063,6 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         }
     }
 
-    private void checkToken(String token) throws AuthenticationException {
-        if 
(!Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token)) 
{
-            throw new AuthenticationException("Un matched cluster token.");
-        }
-    }
-
     private void checkPassword(String user, String passwd, String clientIp)
             throws AuthenticationException {
         final String fullUserName = ClusterNamespace.getNameFromFullName(user);
@@ -1133,7 +1127,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                     request.getTbl(),
                     request.getUserIp(), PrivPredicate.LOAD);
         } else {
-            checkToken(request.getToken());
+            if (!checkToken(request.getToken())) {
+                throw new AuthenticationException("Invalid token: " + 
request.getToken());
+            }
         }
 
         // check label
@@ -1344,7 +1340,9 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         if (request.isSetAuthCode()) {
             // CHECKSTYLE IGNORE THIS LINE
         } else if (request.isSetToken()) {
-            checkToken(request.getToken());
+            if (!checkToken(request.getToken())) {
+                throw new AuthenticationException("Invalid token: " + 
request.getToken());
+            }
         } else {
             // refactoring it
             if (CollectionUtils.isNotEmpty(request.getTbls())) {
@@ -2406,6 +2404,20 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         return result;
     }
 
+    @Override
+    public boolean checkToken(String token) {
+        String clientAddr = getClientAddrAsString();
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("receive check token request from client: {}", 
clientAddr);
+        }
+        try {
+            return 
Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token);
+        } catch (Throwable e) {
+            LOG.warn("catch unknown result.", e);
+            return false;
+        }
+    }
+
     @Override
     public TCheckAuthResult checkAuth(TCheckAuthRequest request) throws 
TException {
         String clientAddr = getClientAddrAsString();
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 176199325c7..17bc432d80a 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1435,6 +1435,8 @@ service FrontendService {
 
     TMySqlLoadAcquireTokenResult acquireToken()
 
+    bool checkToken(1: string token)
+
     TConfirmUnusedRemoteFilesResult confirmUnusedRemoteFiles(1: 
TConfirmUnusedRemoteFilesRequest request)
 
     TCheckAuthResult checkAuth(1: TCheckAuthRequest request)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to