mymeiyi commented on code in PR #35558:
URL: https://github.com/apache/doris/pull/35558#discussion_r1628713104


##########
fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java:
##########
@@ -163,4 +177,76 @@ public long getWalQueueSize(Backend backend, 
PGetWalQueueSizeRequest request) {
         return size;
     }
 
+    public Backend selectBackendForGroupCommit(long tableId, ConnectContext 
context) throws LoadException {
+        if (!Env.getCurrentEnv().isMaster()) {
+            try {
+                long backendId = new 
MasterOpExecutor(context).getGroupCommitLoadBeId(tableId);
+                return Env.getCurrentSystemInfo().getBackend(backendId);
+            } catch (Exception e) {
+                throw new LoadException(e.getMessage());
+            }
+        } else {
+            return 
Env.getCurrentSystemInfo().getBackend(selectBackendForGroupCommitInternal(tableId));
+        }
+    }
+
+    public long selectBackendForGroupCommitInternal(long tableId) throws 
LoadException {
+        LOG.info("group commit new strategy select be, tableToBeMap {}, 
bePressureMap {}", tableToBeMap.toString(),
+                bePressureMap.toString());
+        if (tableToBeMap.containsKey(tableId)) {
+            // todo(lyk): change 1000 to table.groupCommitSize
+            if (bePressureMap.get(tableToBeMap.get(tableId)).get() < 1000) {
+                return tableToBeMap.get(tableId);
+            } else {
+                tableToBeMap.remove(tableId);
+            }
+        }
+        Random random = new Random();
+        List<Long> backendIds = 
Env.getCurrentSystemInfo().getAllBackendIds(true);
+
+        int size = backendIds.size();
+
+        while (size > 0) {
+            Long backendId = backendIds.get(random.nextInt(size));
+            Backend candidateBe = 
Env.getCurrentSystemInfo().getBackend(backendId);
+
+            if (!candidateBe.isDecommissioned()) {
+                tableToBeMap.put(tableId, backendId);
+                // todo(lyk): change 10 to table.groupTime
+                bePressureMap.put(backendId, new SlidingWindowCounter(10));
+                return backendId;
+            }
+
+            backendIds.remove(backendId);
+            size = backendIds.size();
+        }
+        throw new 
LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG);
+    }
+
+    public void updateLoadData(long backendId, long receiveData) {
+        if (backendId == -1) {
+            LOG.warn("invalid backend id: " + backendId);
+        }
+        if (!Env.getCurrentEnv().isMaster()) {
+            ConnectContext ctx = new ConnectContext();
+            ctx.setEnv(Env.getCurrentEnv());
+            ctx.setThreadLocalInfo();
+            // set user to ADMIN_USER, so that we can get the proper resource 
tag
+            ctx.setQualifiedUser(Auth.ADMIN_USER);
+            ctx.setThreadLocalInfo();
+            try {
+                new MasterOpExecutor(ctx).updateLoadData(backendId, 
receiveData);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            updateLoadDataInternal(backendId, receiveData);
+        }
+    }
+
+    public void updateLoadDataInternal(long backendId, long receiveData) {
+        bePressureMap.get(backendId).add(receiveData);

Review Comment:
   the map is not thread safe



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to