This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c7c239afba2 [fix](cloud) modify primary cluster bes to be in 
CloudReplica to reduce memory (#59932)
c7c239afba2 is described below

commit c7c239afba2be6a28ef2c6be2b88784db6539e1b
Author: meiyi <[email protected]>
AuthorDate: Wed Jan 21 01:28:28 2026 +0800

    [fix](cloud) modify primary cluster bes to be in CloudReplica to reduce 
memory (#59932)
---
 .../org/apache/doris/catalog/LocalReplica.java     | 31 ++++++++++++
 .../java/org/apache/doris/catalog/Replica.java     | 26 ++--------
 .../apache/doris/cloud/catalog/CloudReplica.java   | 56 +++++++++++++---------
 3 files changed, 67 insertions(+), 46 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalReplica.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalReplica.java
index 3cf0a35b74a..53223658a0d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalReplica.java
@@ -441,6 +441,37 @@ public class LocalReplica extends Replica {
         this.lastSuccessVersion = version;
     }
 
+    /*
+     * Check whether the replica's version catch up with the expected version.
+     * If ignoreAlter is true, and state is ALTER, and replica's version is
+     *  PARTITION_INIT_VERSION, just return true, ignore the version.
+     *      This is for the case that when altering table,
+     *      the newly created replica's version is PARTITION_INIT_VERSION,
+     *      but we need to treat it as a "normal" replica which version is 
supposed to be "catch-up".
+     *      But if state is ALTER but version larger than 
PARTITION_INIT_VERSION, which means this replica
+     *      is already updated by load process, so we need to consider its 
version.
+     */
+    @Override
+    public boolean checkVersionCatchUp(long expectedVersion, boolean 
ignoreAlter) {
+        if (ignoreAlter && getState() == ReplicaState.ALTER && version == 
Partition.PARTITION_INIT_VERSION) {
+            return true;
+        }
+
+        if (expectedVersion == Partition.PARTITION_INIT_VERSION) {
+            // no data is loaded into this replica, just return true
+            return true;
+        }
+
+        if (this.version < expectedVersion) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("replica version does not catch up with version: {}. 
replica: {}",
+                        expectedVersion, this);
+            }
+            return false;
+        }
+        return true;
+    }
+
     @Override
     public boolean tooBigVersionCount() {
         return visibleVersionCount >= 
Config.min_version_count_indicate_replica_compaction_too_slow;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index b6ac45f0484..d087d944c1d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -33,7 +33,7 @@ import java.util.Comparator;
 /**
  * This class represents the olap replica related metadata.
  */
-public class Replica {
+public abstract class Replica {
     private static final Logger LOG = LogManager.getLogger(Replica.class);
     public static final LastSuccessVersionComparator<Replica> 
LAST_SUCCESS_VERSION_COMPARATOR =
             new LastSuccessVersionComparator<Replica>();
@@ -167,9 +167,7 @@ public class Replica {
         }
     }
 
-    public long getBackendId() throws UserException {
-        return -1L;
-    }
+    public abstract long getBackendId() throws UserException;
 
     protected long getBackendIdValue() {
         return -1L;
@@ -384,25 +382,7 @@ public class Replica {
      *      But if state is ALTER but version larger than 
PARTITION_INIT_VERSION, which means this replica
      *      is already updated by load process, so we need to consider its 
version.
      */
-    public boolean checkVersionCatchUp(long expectedVersion, boolean 
ignoreAlter) {
-        if (ignoreAlter && state == ReplicaState.ALTER && version == 
Partition.PARTITION_INIT_VERSION) {
-            return true;
-        }
-
-        if (expectedVersion == Partition.PARTITION_INIT_VERSION) {
-            // no data is loaded into this replica, just return true
-            return true;
-        }
-
-        if (this.version < expectedVersion) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("replica version does not catch up with version: {}. 
replica: {}",
-                          expectedVersion, this);
-            }
-            return false;
-        }
-        return true;
-    }
+    public abstract boolean checkVersionCatchUp(long expectedVersion, boolean 
ignoreAlter);
 
     public void setState(ReplicaState replicaState) {
         this.state = replicaState;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
index 8f2c6127e25..86863170f90 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java
@@ -29,11 +29,11 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.persist.gson.GsonPostProcessable;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.system.Backend;
 
 import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
 import com.google.common.hash.HashCode;
 import com.google.common.hash.Hashing;
 import com.google.gson.annotations.SerializedName;
@@ -41,6 +41,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -48,14 +49,15 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
-public class CloudReplica extends Replica {
+public class CloudReplica extends Replica implements GsonPostProcessable {
     private static final Logger LOG = LogManager.getLogger(CloudReplica.class);
 
-    // In the future, a replica may be mapped to multiple BEs in a cluster,
-    // so this value is be list
+    // a replica is mapped to one BE in a cluster, use primaryClusterToBackend 
instead of primaryClusterToBackends
+    @Deprecated
     @SerializedName(value = "bes")
-    private ConcurrentHashMap<String, List<Long>> primaryClusterToBackends
-            = new ConcurrentHashMap<String, List<Long>>();
+    private ConcurrentHashMap<String, List<Long>> primaryClusterToBackends = 
null;
+    @SerializedName(value = "be")
+    private ConcurrentHashMap<String, Long> primaryClusterToBackend = new 
ConcurrentHashMap<>();
     @SerializedName(value = "dbId")
     private long dbId = -1;
     @SerializedName(value = "tableId")
@@ -205,12 +207,7 @@ public class CloudReplica extends Replica {
             }
         }
 
-        List<Long> backendIds = primaryClusterToBackends.get(clusterId);
-        if (backendIds != null && !backendIds.isEmpty()) {
-            return backendIds.get(0);
-        }
-
-        return -1L;
+        return primaryClusterToBackend.getOrDefault(clusterId, -1L);
     }
 
     private String getCurrentClusterId() throws ComputeGroupException {
@@ -320,8 +317,8 @@ public class CloudReplica extends Replica {
                 backendId = memClusterToBackends.get(clusterId).get(indexRand);
             }
 
-            if (!replicaEnough && !allowColdRead && 
primaryClusterToBackends.containsKey(clusterId)) {
-                backendId = primaryClusterToBackends.get(clusterId).get(0);
+            if (!replicaEnough && !allowColdRead && 
primaryClusterToBackend.containsKey(clusterId)) {
+                backendId = primaryClusterToBackend.get(clusterId);
             }
 
             if (backendId > 0) {
@@ -346,7 +343,7 @@ public class CloudReplica extends Replica {
             }
         }
 
-        // use primaryClusterToBackends, if find be normal
+        // use primaryClusterToBackend, if find be normal
         Backend be = getPrimaryBackend(clusterId, false);
         if (be != null && be.isQueryAvailable()) {
             return be.getId();
@@ -589,7 +586,7 @@ public class CloudReplica extends Replica {
     }
 
     public void updateClusterToPrimaryBe(String cluster, long beId) {
-        primaryClusterToBackends.put(cluster, Lists.newArrayList(beId));
+        primaryClusterToBackend.put(cluster, beId);
         secondaryClusterToBackends.remove(cluster);
     }
 
@@ -603,7 +600,7 @@ public class CloudReplica extends Replica {
     }
 
     public void clearClusterToBe(String cluster) {
-        primaryClusterToBackends.remove(cluster);
+        primaryClusterToBackend.remove(cluster);
         secondaryClusterToBackends.remove(cluster);
     }
 
@@ -628,12 +625,7 @@ public class CloudReplica extends Replica {
 
     public List<Backend> getAllPrimaryBes() {
         List<Backend> result = new ArrayList<Backend>();
-        primaryClusterToBackends.keySet().forEach(clusterId -> {
-            List<Long> backendIds = primaryClusterToBackends.get(clusterId);
-            if (backendIds == null || backendIds.isEmpty()) {
-                return;
-            }
-            Long beId = backendIds.get(0);
+        primaryClusterToBackend.forEach((clusterId, beId) -> {
             if (beId != -1) {
                 Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
                 result.add(backend);
@@ -661,4 +653,22 @@ public class CloudReplica extends Replica {
     public void setRowsetCount(long rowsetCount) {
         this.rowsetCount = rowsetCount;
     }
+
+    @Override
+    public void gsonPostProcess() throws IOException {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("convert CloudReplica: {}, primaryClusterToBackends: {}, 
primaryClusterToBackend: {}",
+                    this.getId(), this.primaryClusterToBackends, 
this.primaryClusterToBackend);
+        }
+        if (primaryClusterToBackends != null) {
+            for (Map.Entry<String, List<Long>> entry : 
primaryClusterToBackends.entrySet()) {
+                String clusterId = entry.getKey();
+                List<Long> beIds = entry.getValue();
+                if (beIds != null && !beIds.isEmpty()) {
+                    primaryClusterToBackend.put(clusterId, beIds.get(0));
+                }
+            }
+            this.primaryClusterToBackends = null;
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to