This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new e083dc26a00 [cherry-pick](branch-2.1) Pick "[Fix](group commit) Fix
multiple cluster group commit BE select strategy (#38644)" (#39010)
e083dc26a00 is described below
commit e083dc26a00c3a109de8eb51ee3af31b242edbfa
Author: abmdocrt <[email protected]>
AuthorDate: Wed Aug 7 22:07:30 2024 +0800
[cherry-pick](branch-2.1) Pick "[Fix](group commit) Fix multiple cluster
group commit BE select strategy (#38644)" (#39010)
## Proposed changes
Pick #38644
<!--Describe your changes.-->
---
.../org/apache/doris/load/GroupCommitManager.java | 30 ++++++++++++++--------
1 file changed, 20 insertions(+), 10 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
index 1ec6a06179e..b6cf6cbb0a8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
@@ -55,8 +55,8 @@ public class GroupCommitManager {
// Table id to BE id map. Only for group commit.
private Map<Long, Long> tableToBeMap = new ConcurrentHashMap<>();
- // BE id to pressure map. Only for group commit.
- private Map<Long, SlidingWindowCounter> tablePressureMap = new
ConcurrentHashMap<>();
+ // Table id to pressure map. Only for group commit.
+ private Map<Long, SlidingWindowCounter> tableToPressureMap = new
ConcurrentHashMap<>();
public boolean isBlock(long tableId) {
return blockedTableIds.contains(tableId);
@@ -236,8 +236,8 @@ public class GroupCommitManager {
}
private long selectBackendForLocalGroupCommitInternal(long tableId) throws
LoadException {
- LOG.debug("group commit select be info, tableToBeMap {},
tablePressureMap {}", tableToBeMap.toString(),
- tablePressureMap.toString());
+ LOG.debug("group commit select be info, tableToBeMap {},
tableToPressureMap {}", tableToBeMap.toString(),
+ tableToPressureMap.toString());
Long cachedBackendId = getCachedBackend(tableId);
if (cachedBackendId != null) {
return cachedBackendId;
@@ -264,8 +264,18 @@ public class GroupCommitManager {
private Long getCachedBackend(long tableId) {
OlapTable table = (OlapTable)
Env.getCurrentEnv().getInternalCatalog().getTableByTableId(tableId);
if (tableToBeMap.containsKey(tableId)) {
- if (tablePressureMap.get(tableId).get() <
table.getGroupCommitDataBytes()) {
- Backend backend =
Env.getCurrentSystemInfo().getBackend(tableToBeMap.get(tableId));
+ if (tableToPressureMap.get(tableId).get() <
table.getGroupCommitDataBytes()) {
+ // There are multiple threads getting cached backends for the
same table.
+ // Maybe one thread removes the tableId from the tableToBeMap.
+ // Another thread gets the same tableId but can not find this
tableId.
+ // So another thread needs to get the random backend.
+ Long backendId = tableToBeMap.get(tableId);
+ Backend backend;
+ if (backendId != null) {
+ backend = Env.getCurrentSystemInfo().getBackend(backendId);
+ } else {
+ return null;
+ }
if (backend.isAlive() && !backend.isDecommissioned()) {
return backend.getId();
} else {
@@ -285,7 +295,7 @@ public class GroupCommitManager {
for (Backend backend : backends) {
if (backend.isAlive() && !backend.isDecommissioned()) {
tableToBeMap.put(tableId, backend.getId());
- tablePressureMap.put(tableId,
+ tableToPressureMap.put(tableId,
new
SlidingWindowCounter(table.getGroupCommitIntervalMs() / 1000 + 1));
return backend.getId();
}
@@ -315,10 +325,10 @@ public class GroupCommitManager {
}
public void updateLoadDataInternal(long tableId, long receiveData) {
- if (tablePressureMap.containsKey(tableId)) {
- tablePressureMap.get(tableId).add(receiveData);
+ if (tableToPressureMap.containsKey(tableId)) {
+ tableToPressureMap.get(tableId).add(receiveData);
LOG.info("Update load data for table{}, receiveData {},
tablePressureMap {}", tableId, receiveData,
- tablePressureMap.toString());
+ tableToPressureMap.toString());
} else {
LOG.warn("can not find backend id: {}", tableId);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]