This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new edd501b5b1a [streamload](redirect) Support redirect-policy for 
streamload which is used by audit plugin (#35840)
edd501b5b1a is described below

commit edd501b5b1a5cda2dc43d02008b3a79f7be02c8a
Author: Gavin Chou <[email protected]>
AuthorDate: Wed Jun 5 08:51:16 2024 +0800

    [streamload](redirect) Support redirect-policy for streamload which is used 
by audit plugin (#35840)
    
    Audit log plugin uses streamload to save audit data, in cloud mode, the
    streamload initiated by the audit-log component may be forwarded to a
    Load Balancer (LB) attached to a Backend (BE). This can cause the
    request to fail due to network issues.
    
    This PR adds new a streamlaod redirect policy for to get rid the LB
    issue.
---
 be/src/common/utils.h                              |  3 +-
 be/src/http/utils.cpp                              | 35 ++++++++++---------
 .../org/apache/doris/httpv2/rest/LoadAction.java   | 40 ++++++++++++++++++----
 .../org/apache/doris/load/StreamLoadHandler.java   |  7 ++++
 .../doris/plugin/audit/AuditStreamLoader.java      |  6 ++--
 5 files changed, 64 insertions(+), 27 deletions(-)

diff --git a/be/src/common/utils.h b/be/src/common/utils.h
index b03e41260e7..46df44a40f2 100644
--- a/be/src/common/utils.h
+++ b/be/src/common/utils.h
@@ -37,18 +37,17 @@ struct AuthInfo {
 
 template <class T>
 void set_request_auth(T* req, const AuthInfo& auth) {
+    req->user = auth.user; // always set user, because it may be used by FE
     if (auth.auth_code != -1) {
         // if auth_code is set, no need to set other info
         req->__set_auth_code(auth.auth_code);
         // user name and passwd is unused, but they are required field.
         // so they have to be set.
-        req->user = "";
         req->passwd = "";
     } else if (auth.token != "") {
         req->__isset.token = true;
         req->token = auth.token;
     } else {
-        req->user = auth.user;
         req->passwd = auth.passwd;
         if (!auth.cluster.empty()) {
             req->__set_cluster(auth.cluster);
diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp
index b03017c12a7..fbbc1cd93bf 100644
--- a/be/src/http/utils.cpp
+++ b/be/src/http/utils.cpp
@@ -77,30 +77,31 @@ bool parse_basic_auth(const HttpRequest& req, std::string* 
user, std::string* pa
 bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth) {
     const auto& token = req.header("token");
     const auto& auth_code = req.header(HTTP_AUTH_CODE);
+
+    std::tuple<std::string, std::string, std::string> tmp;
+    auto& [user, pass, cluster] = tmp;
+    bool valid_basic_auth = parse_basic_auth(req, &user, &pass);
+    if (valid_basic_auth) { // always set the basic auth, the user may be 
useful
+        auto pos = user.find('@');
+        if (pos != std::string::npos) {
+            cluster.assign(user.c_str() + pos + 1);
+            user.assign(user.c_str(), pos); // user is updated
+        }
+        auth->user = user;
+        auth->passwd = pass;
+        auth->cluster = cluster;
+    }
+
     if (!token.empty()) {
         auth->token = token;
     } else if (!auth_code.empty()) {
         auth->auth_code = std::stoll(auth_code);
-    } else {
-        std::string full_user;
-        if (!parse_basic_auth(req, &full_user, &auth->passwd)) {
-            return false;
-        }
-        auto pos = full_user.find('@');
-        if (pos != std::string::npos) {
-            auth->user.assign(full_user.data(), pos);
-            auth->cluster.assign(full_user.data() + pos + 1);
-        } else {
-            auth->user = full_user;
-        }
+    } else if (!valid_basic_auth) {
+        return false;
     }
 
     // set user ip
-    if (req.remote_host() != nullptr) {
-        auth->user_ip.assign(req.remote_host());
-    } else {
-        auth->user_ip.assign("");
-    }
+    auth->user_ip.assign(req.remote_host() != nullptr ? req.remote_host() : 
"");
 
     return true;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index ca69ba13c08..de15a2816f0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -57,6 +57,7 @@ import org.springframework.web.servlet.view.RedirectView;
 
 import java.net.InetAddress;
 import java.net.URI;
+import java.util.Enumeration;
 import java.util.List;
 import java.util.Set;
 import javax.servlet.http.HttpServletRequest;
@@ -69,6 +70,11 @@ public class LoadAction extends RestBaseController {
 
     public static final String SUB_LABEL_NAME_PARAM = "sub_label";
 
+    public static final String HEADER_REDIRECT_POLICY = "redirect-policy";
+
+    public static final String REDIRECT_POLICY_PUBLIC_PRIVATE = 
"public-private";
+    public static final String REDIRECT_POLICY_RANDOM_BE = "random-be";
+
     private ExecuteEnv execEnv = ExecuteEnv.getInstance();
 
     private int lastSelectedBackendIndex = 0;
@@ -94,6 +100,7 @@ public class LoadAction extends RestBaseController {
     public Object streamLoad(HttpServletRequest request,
                              HttpServletResponse response,
                              @PathVariable(value = DB_KEY) String db, 
@PathVariable(value = TABLE_KEY) String table) {
+        LOG.info("streamload action, db: {}, tbl: {}, headers: {}", db, table, 
 getAllHeaders(request));
         boolean groupCommit = false;
         String groupCommitStr = request.getHeader("group_commit");
         if (groupCommitStr != null && 
groupCommitStr.equalsIgnoreCase("async_mode")) {
@@ -209,6 +216,7 @@ public class LoadAction extends RestBaseController {
     public Object streamLoad2PC(HttpServletRequest request,
                                    HttpServletResponse response,
                                    @PathVariable(value = DB_KEY) String db) {
+        LOG.info("streamload action 2PC, db: {}, headers: {}", db, 
getAllHeaders(request));
         if (needRedirect(request.getScheme())) {
             return redirectToHttps(request);
         }
@@ -222,6 +230,7 @@ public class LoadAction extends RestBaseController {
                                       HttpServletResponse response,
                                       @PathVariable(value = DB_KEY) String db,
                                       @PathVariable(value = TABLE_KEY) String 
table) {
+        LOG.info("streamload action 2PC, db: {}, tbl: {}, headers: {}", db, 
table, getAllHeaders(request));
         if (needRedirect(request.getScheme())) {
             return redirectToHttps(request);
         }
@@ -348,8 +357,7 @@ public class LoadAction extends RestBaseController {
             if (Strings.isNullOrEmpty(cloudClusterName)) {
                 throw new LoadException("No cloud cluster name selected.");
             }
-            String reqHostStr = 
request.getHeader(HttpHeaderNames.HOST.toString());
-            return selectCloudRedirectBackend(cloudClusterName, reqHostStr, 
groupCommit);
+            return selectCloudRedirectBackend(cloudClusterName, request, 
groupCommit);
         } else {
             return selectLocalRedirectBackend(groupCommit);
         }
@@ -391,10 +399,18 @@ public class LoadAction extends RestBaseController {
         return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
     }
 
-    private TNetworkAddress selectCloudRedirectBackend(String clusterName, 
String reqHostStr, boolean groupCommit)
+    private TNetworkAddress selectCloudRedirectBackend(String clusterName, 
HttpServletRequest req, boolean groupCommit)
             throws LoadException {
         Backend backend = StreamLoadHandler.selectBackend(clusterName, 
groupCommit);
 
+        String redirectPolicy = 
req.getHeader(LoadAction.HEADER_REDIRECT_POLICY);
+        // User specified redirect policy
+        if (redirectPolicy != null && 
redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_RANDOM_BE)) {
+            return new TNetworkAddress(backend.getHost(), 
backend.getHttpPort());
+        }
+        redirectPolicy = redirectPolicy == null || redirectPolicy.isEmpty()
+            ? Config.streamload_redirect_policy : redirectPolicy;
+
         Pair<String, Integer> publicHostPort = null;
         Pair<String, Integer> privateHostPort = null;
         try {
@@ -413,6 +429,7 @@ public class LoadAction extends RestBaseController {
             throw new LoadException(e.getMessage());
         }
 
+        String reqHostStr = req.getHeader(HttpHeaderNames.HOST.toString());
         reqHostStr = reqHostStr.replaceAll("\\s+", "");
         if (reqHostStr.isEmpty()) {
             LOG.info("Invalid header host: {}", reqHostStr);
@@ -430,8 +447,8 @@ public class LoadAction extends RestBaseController {
             throw new LoadException("Invalid header host: " + reqHost);
         }
 
-        if 
(Config.streamload_redirect_policy.equalsIgnoreCase("public-private")) {
-            // ip
+        if (redirectPolicy != null && 
redirectPolicy.equalsIgnoreCase(REDIRECT_POLICY_PUBLIC_PRIVATE)) {
+            // redirect with ip
             if (InetAddressValidator.getInstance().isValid(reqHost)) {
                 InetAddress addr;
                 try {
@@ -451,7 +468,7 @@ public class LoadAction extends RestBaseController {
                 }
             }
 
-            // domin
+            // redirect with domain
             if (publicHostPort != null && 
reqHost.toLowerCase().contains("public")) {
                 return new TNetworkAddress(publicHostPort.first, 
publicHostPort.second);
             } else if (privateHostPort != null) {
@@ -585,4 +602,15 @@ public class LoadAction extends RestBaseController {
             ConnectContext.remove();
         }
     }
+
+    private String getAllHeaders(HttpServletRequest request) {
+        StringBuilder headers = new StringBuilder();
+        Enumeration<String> headerNames = request.getHeaderNames();
+        while (headerNames.hasMoreElements()) {
+            String headerName = headerNames.nextElement();
+            String headerValue = request.getHeader(headerName);
+            
headers.append(headerName).append(":").append(headerValue).append(", ");
+        }
+        return headers.toString();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
index 2029f96da3b..f2f17914a15 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
@@ -80,6 +80,13 @@ public class StreamLoadHandler {
         this.clientAddr = clientAddr;
     }
 
+    /**
+     * Select a random backend in the given cloud cluster.
+     *
+     * @param clusterName cloud cluster name
+     * @param groupCommit if this selection is for group commit
+     * @throws LoadException if there is no available backend
+     */
     public static Backend selectBackend(String clusterName, boolean 
groupCommit) throws LoadException {
         List<Backend> backends = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
                 .getBackendsByClusterName(clusterName)
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
index b29b3dfbe3b..3765872810d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditStreamLoader.java
@@ -58,7 +58,7 @@ public class AuditStreamLoader {
         conn.setInstanceFollowRedirects(false);
         conn.setRequestMethod("PUT");
         conn.setRequestProperty("token", clusterToken);
-        conn.setRequestProperty("Authorization", "Basic ");
+        conn.setRequestProperty("Authorization", "Basic YWRtaW46"); // admin
         conn.addRequestProperty("Expect", "100-continue");
         conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
         conn.addRequestProperty("label", label);
@@ -67,6 +67,7 @@ public class AuditStreamLoader {
         conn.addRequestProperty("columns",
                 InternalSchema.AUDIT_SCHEMA.stream().map(c -> 
c.getName()).collect(
                         Collectors.joining(",")));
+        conn.addRequestProperty("redirect-policy", "random-be");
         conn.setDoOutput(true);
         conn.setDoInput(true);
         return conn;
@@ -75,13 +76,14 @@ public class AuditStreamLoader {
     private String toCurl(HttpURLConnection conn) {
         StringBuilder sb = new StringBuilder("curl -v ");
         sb.append("-X ").append(conn.getRequestMethod()).append(" \\\n  ");
-        sb.append("-H \"").append("Authorization\":").append("\"Basic 
").append("\" \\\n  ");
+        sb.append("-H \"").append("Authorization\":").append("\"Basic 
YWRtaW46").append("\" \\\n  ");
         sb.append("-H \"").append("Expect\":").append("\"100-continue\" \\\n  
");
         sb.append("-H \"").append("Content-Type\":").append("\"text/plain; 
charset=UTF-8\" \\\n  ");
         sb.append("-H \"").append("max_filter_ratio\":").append("\"1.0\" \\\n  
");
         sb.append("-H \"").append("columns\":")
                 .append("\"" + InternalSchema.AUDIT_SCHEMA.stream().map(c -> 
c.getName()).collect(
                         Collectors.joining(",")) + "\" \\\n  ");
+        sb.append("-H 
\"").append("redirect-policy\":").append("\"random-be").append("\" \\\n  ");
         sb.append("\"").append(conn.getURL()).append("\"");
         return sb.toString();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to