This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c7c239afba2 [fix](cloud) modify primary cluster bes to be in
CloudReplica to reduce memory (#59932)
c7c239afba2 is described below
commit c7c239afba2be6a28ef2c6be2b88784db6539e1b
Author: meiyi <[email protected]>
AuthorDate: Wed Jan 21 01:28:28 2026 +0800
[fix](cloud) modify primary cluster bes to be in CloudReplica to reduce
memory (#59932)
---
.../org/apache/doris/catalog/LocalReplica.java | 31 ++++++++++++
.../java/org/apache/doris/catalog/Replica.java | 26 ++--------
.../apache/doris/cloud/catalog/CloudReplica.java | 56 +++++++++++++---------
3 files changed, 67 insertions(+), 46 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalReplica.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalReplica.java
index 3cf0a35b74a..53223658a0d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalReplica.java
@@ -441,6 +441,37 @@ public class LocalReplica extends Replica {
this.lastSuccessVersion = version;
}
+ /*
+ * Check whether the replica's version catch up with the expected version.
+ * If ignoreAlter is true, and state is ALTER, and replica's version is
+ * PARTITION_INIT_VERSION, just return true, ignore the version.
+ * This is for the case that when altering table,
+ * the newly created replica's version is PARTITION_INIT_VERSION,
+ * but we need to treat it as a "normal" replica which version is
supposed to be "catch-up".
+ * But if state is ALTER but version larger than
PARTITION_INIT_VERSION, which means this replica
+ * is already updated by load process, so we need to consider its
version.
+ */
+ @Override
+ public boolean checkVersionCatchUp(long expectedVersion, boolean
ignoreAlter) {
+ if (ignoreAlter && getState() == ReplicaState.ALTER && version ==
Partition.PARTITION_INIT_VERSION) {
+ return true;
+ }
+
+ if (expectedVersion == Partition.PARTITION_INIT_VERSION) {
+ // no data is loaded into this replica, just return true
+ return true;
+ }
+
+ if (this.version < expectedVersion) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("replica version does not catch up with version: {}.
replica: {}",
+ expectedVersion, this);
+ }
+ return false;
+ }
+ return true;
+ }
+
@Override
public boolean tooBigVersionCount() {
return visibleVersionCount >=
Config.min_version_count_indicate_replica_compaction_too_slow;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index b6ac45f0484..d087d944c1d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -33,7 +33,7 @@ import java.util.Comparator;
/**
* This class represents the olap replica related metadata.
*/
-public class Replica {
+public abstract class Replica {
private static final Logger LOG = LogManager.getLogger(Replica.class);
public static final LastSuccessVersionComparator<Replica>
LAST_SUCCESS_VERSION_COMPARATOR =
new LastSuccessVersionComparator<Replica>();
@@ -167,9 +167,7 @@ public class Replica {
}
}
- public long getBackendId() throws UserException {
- return -1L;
- }
+ public abstract long getBackendId() throws UserException;
protected long getBackendIdValue() {
return -1L;
@@ -384,25 +382,7 @@ public class Replica {
* But if state is ALTER but version larger than
PARTITION_INIT_VERSION, which means this replica
* is already updated by load process, so we need to consider its
version.
*/
- public boolean checkVersionCatchUp(long expectedVersion, boolean
ignoreAlter) {
- if (ignoreAlter && state == ReplicaState.ALTER && version ==
Partition.PARTITION_INIT_VERSION) {
- return true;
- }
-
- if (expectedVersion == Partition.PARTITION_INIT_VERSION) {
- // no data is loaded into this replica, just return true
- return true;
- }
-
- if (this.version < expectedVersion) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("replica version does not catch up with version: {}.
replica: {}",
- expectedVersion, this);
- }
- return false;
- }
- return true;
- }
+ public abstract boolean checkVersionCatchUp(long expectedVersion, boolean
ignoreAlter);
public void setState(ReplicaState replicaState) {
this.state = replicaState;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index 8f2c6127e25..86863170f90 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -29,11 +29,11 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import com.google.gson.annotations.SerializedName;
@@ -41,6 +41,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -48,14 +49,15 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
-public class CloudReplica extends Replica {
+public class CloudReplica extends Replica implements GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(CloudReplica.class);
- // In the future, a replica may be mapped to multiple BEs in a cluster,
- // so this value is be list
+ // a replica is mapped to one BE in a cluster, use primaryClusterToBackend
instead of primaryClusterToBackends
+ @Deprecated
@SerializedName(value = "bes")
- private ConcurrentHashMap<String, List<Long>> primaryClusterToBackends
- = new ConcurrentHashMap<String, List<Long>>();
+ private ConcurrentHashMap<String, List<Long>> primaryClusterToBackends =
null;
+ @SerializedName(value = "be")
+ private ConcurrentHashMap<String, Long> primaryClusterToBackend = new
ConcurrentHashMap<>();
@SerializedName(value = "dbId")
private long dbId = -1;
@SerializedName(value = "tableId")
@@ -205,12 +207,7 @@ public class CloudReplica extends Replica {
}
}
- List<Long> backendIds = primaryClusterToBackends.get(clusterId);
- if (backendIds != null && !backendIds.isEmpty()) {
- return backendIds.get(0);
- }
-
- return -1L;
+ return primaryClusterToBackend.getOrDefault(clusterId, -1L);
}
private String getCurrentClusterId() throws ComputeGroupException {
@@ -320,8 +317,8 @@ public class CloudReplica extends Replica {
backendId = memClusterToBackends.get(clusterId).get(indexRand);
}
- if (!replicaEnough && !allowColdRead &&
primaryClusterToBackends.containsKey(clusterId)) {
- backendId = primaryClusterToBackends.get(clusterId).get(0);
+ if (!replicaEnough && !allowColdRead &&
primaryClusterToBackend.containsKey(clusterId)) {
+ backendId = primaryClusterToBackend.get(clusterId);
}
if (backendId > 0) {
@@ -346,7 +343,7 @@ public class CloudReplica extends Replica {
}
}
- // use primaryClusterToBackends, if find be normal
+ // use primaryClusterToBackend, if find be normal
Backend be = getPrimaryBackend(clusterId, false);
if (be != null && be.isQueryAvailable()) {
return be.getId();
@@ -589,7 +586,7 @@ public class CloudReplica extends Replica {
}
public void updateClusterToPrimaryBe(String cluster, long beId) {
- primaryClusterToBackends.put(cluster, Lists.newArrayList(beId));
+ primaryClusterToBackend.put(cluster, beId);
secondaryClusterToBackends.remove(cluster);
}
@@ -603,7 +600,7 @@ public class CloudReplica extends Replica {
}
public void clearClusterToBe(String cluster) {
- primaryClusterToBackends.remove(cluster);
+ primaryClusterToBackend.remove(cluster);
secondaryClusterToBackends.remove(cluster);
}
@@ -628,12 +625,7 @@ public class CloudReplica extends Replica {
public List<Backend> getAllPrimaryBes() {
List<Backend> result = new ArrayList<Backend>();
- primaryClusterToBackends.keySet().forEach(clusterId -> {
- List<Long> backendIds = primaryClusterToBackends.get(clusterId);
- if (backendIds == null || backendIds.isEmpty()) {
- return;
- }
- Long beId = backendIds.get(0);
+ primaryClusterToBackend.forEach((clusterId, beId) -> {
if (beId != -1) {
Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
result.add(backend);
@@ -661,4 +653,22 @@ public class CloudReplica extends Replica {
public void setRowsetCount(long rowsetCount) {
this.rowsetCount = rowsetCount;
}
+
+ @Override
+ public void gsonPostProcess() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("convert CloudReplica: {}, primaryClusterToBackends: {},
primaryClusterToBackend: {}",
+ this.getId(), this.primaryClusterToBackends,
this.primaryClusterToBackend);
+ }
+ if (primaryClusterToBackends != null) {
+ for (Map.Entry<String, List<Long>> entry :
primaryClusterToBackends.entrySet()) {
+ String clusterId = entry.getKey();
+ List<Long> beIds = entry.getValue();
+ if (beIds != null && !beIds.isEmpty()) {
+ primaryClusterToBackend.put(clusterId, beIds.get(0));
+ }
+ }
+ this.primaryClusterToBackends = null;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]