This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 8d0cd03d534 [Fix](group commit) Fix cloud group commit be select  
strategy (#39986)
8d0cd03d534 is described below

commit 8d0cd03d534ae9a3fa4ec179af0d011bd2b0430c
Author: abmdocrt <[email protected]>
AuthorDate: Wed Aug 28 23:10:44 2024 +0800

    [Fix](group commit) Fix cloud group commit be select  strategy (#39986)
    
    ## Proposed changes
    
    In #35558, we optimized be select for group commit. However, we forgot
    to apply this strategy to cloud. This PR applys it.
    
    <!--Describe your changes.-->
---
 .../org/apache/doris/httpv2/rest/LoadAction.java   | 140 +++++++++++++--------
 .../org/apache/doris/load/StreamLoadHandler.java   |   8 +-
 .../apache/doris/load/loadv2/MysqlLoadManager.java |   2 +-
 3 files changed, 92 insertions(+), 58 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index c2d50460ea4..b0a714da149 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -85,7 +85,7 @@ public class LoadAction extends RestBaseController {
 
     @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_load", 
method = RequestMethod.PUT)
     public Object load(HttpServletRequest request, HttpServletResponse 
response,
-                       @PathVariable(value = DB_KEY) String db, 
@PathVariable(value = TABLE_KEY) String table) {
+            @PathVariable(value = DB_KEY) String db, @PathVariable(value = 
TABLE_KEY) String table) {
         if (needRedirect(request.getScheme())) {
             return redirectToHttps(request);
         }
@@ -102,21 +102,29 @@ public class LoadAction extends RestBaseController {
 
     @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + 
"}/_stream_load", method = RequestMethod.PUT)
     public Object streamLoad(HttpServletRequest request,
-                             HttpServletResponse response,
-                             @PathVariable(value = DB_KEY) String db, 
@PathVariable(value = TABLE_KEY) String table) {
-        LOG.info("streamload action, db: {}, tbl: {}, headers: {}", db, table, 
 getAllHeaders(request));
+            HttpServletResponse response,
+            @PathVariable(value = DB_KEY) String db, @PathVariable(value = 
TABLE_KEY) String table) {
+        LOG.info("streamload action, db: {}, tbl: {}, headers: {}", db, table, 
getAllHeaders(request));
         boolean groupCommit = false;
         String groupCommitStr = request.getHeader("group_commit");
-        if (groupCommitStr != null && 
groupCommitStr.equalsIgnoreCase("async_mode")) {
-            groupCommit = true;
-            try {
-                if (isGroupCommitBlock(db, table)) {
-                    String msg = "insert table " + table + 
GroupCommitPlanner.SCHEMA_CHANGE;
-                    return new RestBaseResult(msg);
+        if (groupCommitStr != null) {
+            if (!groupCommitStr.equalsIgnoreCase("async_mode") && 
!groupCommitStr.equalsIgnoreCase("sync_mode")
+                    && !groupCommitStr.equalsIgnoreCase("off_mode")) {
+                return new RestBaseResult("Header `group_commit` can only be 
`sync_mode`, `async_mode` or `off_mode`.");
+            }
+            if (!groupCommitStr.equalsIgnoreCase("off_mode")) {
+                groupCommit = true;
+                if (groupCommitStr.equalsIgnoreCase("async_mode")) {
+                    try {
+                        if (isGroupCommitBlock(db, table)) {
+                            String msg = "insert table " + table + 
GroupCommitPlanner.SCHEMA_CHANGE;
+                            return new RestBaseResult(msg);
+                        }
+                    } catch (Exception e) {
+                        LOG.info("exception:" + e);
+                        return new RestBaseResult(e.getMessage());
+                    }
                 }
-            } catch (Exception e) {
-                LOG.info("exception:" + e);
-                return new RestBaseResult(e.getMessage());
             }
         }
         if (needRedirect(request.getScheme())) {
@@ -147,21 +155,32 @@ public class LoadAction extends RestBaseController {
         boolean groupCommit = false;
         long tableId = -1;
         String groupCommitStr = request.getHeader("group_commit");
-        if (groupCommitStr != null && 
groupCommitStr.equalsIgnoreCase("async_mode")) {
-            groupCommit = true;
-            try {
-                String[] pair = parseDbAndTb(sql);
-                Database db = Env.getCurrentInternalCatalog()
-                        .getDbOrException(pair[0], s -> new 
TException("database is invalid for dbName: " + s));
-                Table tbl = db.getTableOrException(pair[1], s -> new 
TException("table is invalid: " + s));
-                tableId = tbl.getId();
-                if (isGroupCommitBlock(pair[0], pair[1])) {
-                    String msg = "insert table " + pair[1] + 
GroupCommitPlanner.SCHEMA_CHANGE;
-                    return new RestBaseResult(msg);
+        if (groupCommitStr != null) {
+            if (!groupCommitStr.equalsIgnoreCase("async_mode") && 
!groupCommitStr.equalsIgnoreCase("sync_mode")
+                    && !groupCommitStr.equalsIgnoreCase("off_mode")) {
+                return new RestBaseResult("Header `group_commit` can only be 
`sync_mode`, `async_mode` or `off_mode`.");
+            }
+            if (!groupCommitStr.equalsIgnoreCase("off_mode")) {
+                try {
+                    groupCommit = true;
+                    String[] pair = parseDbAndTb(sql);
+                    Database db = Env.getCurrentInternalCatalog()
+                            .getDbOrException(pair[0], s -> new 
TException("database is invalid for dbName: " + s));
+                    Table tbl = db.getTableOrException(pair[1], s -> new 
TException("table is invalid: " + s));
+                    tableId = tbl.getId();
+
+                    // async mode needs to write WAL, we need to block load 
during waiting WAL.
+                    if (groupCommitStr.equalsIgnoreCase("async_mode")) {
+                        if (isGroupCommitBlock(pair[0], pair[1])) {
+                            String msg = "insert table " + pair[1] + 
GroupCommitPlanner.SCHEMA_CHANGE;
+                            return new RestBaseResult(msg);
+                        }
+
+                    }
+                } catch (Exception e) {
+                    LOG.info("exception:" + e);
+                    return new RestBaseResult(e.getMessage());
                 }
-            } catch (Exception e) {
-                LOG.info("exception:" + e);
-                return new RestBaseResult(e.getMessage());
             }
         }
         executeCheckPassword(request, response);
@@ -223,8 +242,8 @@ public class LoadAction extends RestBaseController {
 
     @RequestMapping(path = "/api/{" + DB_KEY + "}/_stream_load_2pc", method = 
RequestMethod.PUT)
     public Object streamLoad2PC(HttpServletRequest request,
-                                   HttpServletResponse response,
-                                   @PathVariable(value = DB_KEY) String db) {
+            HttpServletResponse response,
+            @PathVariable(value = DB_KEY) String db) {
         LOG.info("streamload action 2PC, db: {}, headers: {}", db, 
getAllHeaders(request));
         if (needRedirect(request.getScheme())) {
             return redirectToHttps(request);
@@ -236,9 +255,9 @@ public class LoadAction extends RestBaseController {
 
     @RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + 
"}/_stream_load_2pc", method = RequestMethod.PUT)
     public Object streamLoad2PC_table(HttpServletRequest request,
-                                      HttpServletResponse response,
-                                      @PathVariable(value = DB_KEY) String db,
-                                      @PathVariable(value = TABLE_KEY) String 
table) {
+            HttpServletResponse response,
+            @PathVariable(value = DB_KEY) String db,
+            @PathVariable(value = TABLE_KEY) String table) {
         LOG.info("streamload action 2PC, db: {}, tbl: {}, headers: {}", db, 
table, getAllHeaders(request));
         if (needRedirect(request.getScheme())) {
             return redirectToHttps(request);
@@ -382,7 +401,7 @@ public class LoadAction extends RestBaseController {
             if (Strings.isNullOrEmpty(cloudClusterName)) {
                 throw new LoadException("No cloud cluster name selected.");
             }
-            return selectCloudRedirectBackend(cloudClusterName, request, 
groupCommit);
+            return selectCloudRedirectBackend(cloudClusterName, request, 
groupCommit, tableId);
         } else {
             return selectLocalRedirectBackend(groupCommit, request, tableId);
         }
@@ -409,21 +428,7 @@ public class LoadAction extends RestBaseController {
             throw new 
LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + 
policy);
         }
         if (groupCommit) {
-            ConnectContext ctx = new ConnectContext();
-            ctx.setEnv(Env.getCurrentEnv());
-            ctx.setThreadLocalInfo();
-            ctx.setRemoteIP(request.getRemoteAddr());
-            // We set this variable to fulfill required field 'user' in
-            // TMasterOpRequest(FrontendService.thrift)
-            ctx.setQualifiedUser(Auth.ADMIN_USER);
-            ctx.setThreadLocalInfo();
-
-            try {
-                backend = Env.getCurrentEnv().getGroupCommitManager()
-                        .selectBackendForGroupCommit(tableId, ctx, false);
-            } catch (DdlException e) {
-                throw new RuntimeException(e);
-            }
+            backend = selectBackendForGroupCommit("", request, tableId, false);
         } else {
             backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
         }
@@ -433,9 +438,15 @@ public class LoadAction extends RestBaseController {
         return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
     }
 
-    private TNetworkAddress selectCloudRedirectBackend(String clusterName, 
HttpServletRequest req, boolean groupCommit)
+    private TNetworkAddress selectCloudRedirectBackend(String clusterName, 
HttpServletRequest req, boolean groupCommit,
+            long tableId)
             throws LoadException {
-        Backend backend = StreamLoadHandler.selectBackend(clusterName, 
groupCommit);
+        Backend backend = null;
+        if (groupCommit) {
+            backend = selectBackendForGroupCommit(clusterName, req, tableId, 
true);
+        } else {
+            backend = StreamLoadHandler.selectBackend(clusterName);
+        }
 
         String redirectPolicy = 
req.getHeader(LoadAction.HEADER_REDIRECT_POLICY);
         // User specified redirect policy
@@ -443,7 +454,7 @@ public class LoadAction extends RestBaseController {
             return new TNetworkAddress(backend.getHost(), 
backend.getHttpPort());
         }
         redirectPolicy = redirectPolicy == null || redirectPolicy.isEmpty()
-            ? Config.streamload_redirect_policy : redirectPolicy;
+                ? Config.streamload_redirect_policy : redirectPolicy;
 
         Pair<String, Integer> publicHostPort = null;
         Pair<String, Integer> privateHostPort = null;
@@ -563,7 +574,7 @@ public class LoadAction extends RestBaseController {
     // temporarily addressing the users' needs for audit logs.
     // So this function is not widely tested under general scenario
     private Object executeWithClusterToken(HttpServletRequest request, String 
db,
-                                          String table, boolean isStreamLoad) {
+            String table, boolean isStreamLoad) {
         try {
             ConnectContext ctx = new ConnectContext();
             ctx.setEnv(Env.getCurrentEnv());
@@ -647,4 +658,29 @@ public class LoadAction extends RestBaseController {
         }
         return headers.toString();
     }
+
+    private Backend selectBackendForGroupCommit(String clusterName, 
HttpServletRequest req, long tableId,
+            boolean isCloud)
+            throws LoadException {
+        ConnectContext ctx = new ConnectContext();
+        ctx.setEnv(Env.getCurrentEnv());
+        ctx.setThreadLocalInfo();
+        ctx.setRemoteIP(req.getRemoteAddr());
+        // We set this variable to fulfill required field 'user' in
+        // TMasterOpRequest(FrontendService.thrift)
+        ctx.setQualifiedUser(Auth.ADMIN_USER);
+        ctx.setThreadLocalInfo();
+        if (isCloud) {
+            ctx.setCloudCluster(clusterName);
+        }
+
+        Backend backend = null;
+        try {
+            backend = Env.getCurrentEnv().getGroupCommitManager()
+                    .selectBackendForGroupCommit(tableId, ctx, isCloud);
+        } catch (DdlException e) {
+            throw new LoadException(e.getMessage(), e);
+        }
+        return backend;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
index 0f44ec3f785..9e51e2cffd0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
@@ -84,13 +84,12 @@ public class StreamLoadHandler {
      * Select a random backend in the given cloud cluster.
      *
      * @param clusterName cloud cluster name
-     * @param groupCommit if this selection is for group commit
      * @throws LoadException if there is no available backend
      */
-    public static Backend selectBackend(String clusterName, boolean 
groupCommit) throws LoadException {
+    public static Backend selectBackend(String clusterName) throws 
LoadException {
         List<Backend> backends = ((CloudSystemInfoService) 
Env.getCurrentSystemInfo())
                 .getBackendsByClusterName(clusterName)
-                .stream().filter(be -> be.isAlive() && (!groupCommit || 
groupCommit && !be.isDecommissioned()))
+                .stream().filter(Backend::isAlive)
                 .collect(Collectors.toList());
 
         if (backends.isEmpty()) {
@@ -101,8 +100,7 @@ public class StreamLoadHandler {
         // TODO: add a more sophisticated algorithm to select backend
         SecureRandom rand = new SecureRandom();
         int randomIndex = rand.nextInt(backends.size());
-        Backend backend = backends.get(randomIndex);
-        return backend;
+        return backends.get(randomIndex);
     }
 
     public void setCloudCluster() throws UserException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
index 7b156aeff8f..975b1ca1175 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
@@ -447,7 +447,7 @@ public class MysqlLoadManager {
     private String selectBackendForMySqlLoad(String database, String table) 
throws LoadException {
         Backend backend = null;
         if (Config.isCloudMode()) {
-            backend = 
StreamLoadHandler.selectBackend(ConnectContext.get().getCloudCluster(), false);
+            backend = 
StreamLoadHandler.selectBackend(ConnectContext.get().getCloudCluster());
         } else {
             BeSelectionPolicy policy = new 
BeSelectionPolicy.Builder().needLoadAvailable().build();
             List<Long> backendIds = 
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to