github-actions[bot] commented on code in PR #61922:
URL: https://github.com/apache/doris/pull/61922#discussion_r3013765894


##########
fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java:
##########
@@ -932,6 +932,9 @@ public void write(JsonWriter out, T value) throws 
IOException {
                         ((GsonPreProcessable) value).gsonPreProcess();
                     }
                     delegate.write(out, value);
+                    if (value instanceof GsonPreProcessable) {
+                        ((GsonPreProcessable) value).gsonPostSerialize();
+                    }

Review Comment:
   **Minor (Low):** `gsonPostSerialize()` is not protected by `try/finally`. If 
`delegate.write(out, value)` throws an exception, `gsonPostSerialize()` won't 
be called, leaving `primaryClusterToBackend` non-null with stale data. While 
this field is only used during serialization and won't cause incorrect behavior 
(the data source of truth is the central index), it represents a minor memory 
leak on error.
   
   Consider:
   ```java
   if (value instanceof GsonPreProcessable) {
       ((GsonPreProcessable) value).gsonPreProcess();
   }
   try {
       delegate.write(out, value);
   } finally {
       if (value instanceof GsonPreProcessable) {
           ((GsonPreProcessable) value).gsonPostSerialize();
       }
   }
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java:
##########
@@ -610,37 +618,38 @@ public void updateClusterToSecondaryBe(String cluster, 
long beId) {
             LOG.debug("add to secondary clusterId {}, beId {}, changeTimestamp 
{}, replica info {}",
                     cluster, beId, changeTimestamp, this);
         }
-        secondaryClusterToBackends.put(cluster, Pair.of(beId, 
changeTimestamp));
+        getCloudInvertedIndex().setSecondaryBe(cluster, getId(), beId, 
changeTimestamp);
     }
 
     public void clearClusterToBe(String cluster) {
-        primaryClusterToBackend.remove(cluster);
-        secondaryClusterToBackends.remove(cluster);
+        CloudTabletInvertedIndex idx = getCloudInvertedIndex();
+        idx.removePrimaryBeId(cluster, getId());
+        idx.removeSecondaryBe(cluster, getId());
     }
 
     // ATTN: This func is only used by redundant tablet report clean in bes.
     // Only the master node will do the diff logic,
     // so just only need to clean up secondaryClusterToBackends on the master 
node.
     public void checkAndClearSecondaryClusterToBe(String clusterId, long 
expireTimestamp) {
-        Pair<Long, Long> secondBeAndChangeTimestamp = 
secondaryClusterToBackends.get(clusterId);
-        if (secondBeAndChangeTimestamp == null) {
+        CloudTabletInvertedIndex idx = getCloudInvertedIndex();
+        long beId = idx.getSecondaryBeId(clusterId, getId());
+        if (beId == -1L) {
             return;
         }
-        long beId = secondBeAndChangeTimestamp.key();
-        long changeTimestamp = secondBeAndChangeTimestamp.value();
+        long changeTimestamp = idx.getSecondaryTimestamp(clusterId, getId());
 

Review Comment:
   **TOCTOU race (Medium):** Between `idx.getSecondaryBeId()` (line above) and 
`idx.getSecondaryTimestamp()` here, a concurrent `setSecondaryBe()` call for 
the same (clusterId, replicaId) could update the beId+timestamp pair. The 
`changeTimestamp` read here could come from the new update while `beId` came 
from the old one. In the original code, both values were read atomically from a 
single `Pair` in one `ConcurrentHashMap.get()` call.
   
   In the worst case, this could cause `checkAndClearSecondaryClusterToBe` to 
compare the new (larger) timestamp against `expireTimestamp` and decide NOT to 
remove the secondary entry, even though the beId it retrieved was from the old, 
expired entry. The practical impact is limited since this is only used for 
cleanup, but it's a correctness regression.



##########
fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletInvertedIndex.java:
##########
@@ -128,5 +149,96 @@ public List<Replica> getReplicasByTabletId(long tabletId) {
     @Override
     protected void innerClear() {
         replicaMetaMap.clear();
+        clusterPrimaryBeMap.clear();
+        clusterSecondaryBeMap.clear();
+        clusterSecondaryTsMap.clear();
+    }
+
+    // ---- Central cluster-to-BE mapping accessors ----
+
+    private ConcurrentLong2LongHashMap getOrCreateClusterMap(
+            ConcurrentHashMap<String, ConcurrentLong2LongHashMap> outer, 
String clusterId) {
+        return outer.computeIfAbsent(clusterId, k -> new 
ConcurrentLong2LongHashMap());
+    }
+
+    // -- Primary BE --
+
+    public long getPrimaryBeId(String clusterId, long replicaId) {
+        ConcurrentLong2LongHashMap inner = clusterPrimaryBeMap.get(clusterId);
+        if (inner == null) {
+            return -1L;
+        }
+        return inner.getOrDefault(replicaId, -1L);
+    }
+
+    public void setPrimaryBeId(String clusterId, long replicaId, long beId) {
+        getOrCreateClusterMap(clusterPrimaryBeMap, clusterId).put(replicaId, 
beId);
+    }
+
+    public void removePrimaryBeId(String clusterId, long replicaId) {
+        ConcurrentLong2LongHashMap inner = clusterPrimaryBeMap.get(clusterId);
+        if (inner != null) {
+            inner.remove(replicaId);
+        }
+    }
+
+    public void clearPrimaryBeForReplica(long replicaId) {
+        for (ConcurrentLong2LongHashMap inner : clusterPrimaryBeMap.values()) {
+            inner.remove(replicaId);
+        }
+    }
+
+    public Map<String, Long> getAllPrimaryClusterBeIds(long replicaId) {
+        Map<String, Long> result = new HashMap<>();
+        for (Map.Entry<String, ConcurrentLong2LongHashMap> entry : 
clusterPrimaryBeMap.entrySet()) {
+            long beId = entry.getValue().getOrDefault(replicaId, -1L);
+            if (beId != -1L) {
+                result.put(entry.getKey(), beId);
+            }
+        }
+        return result;
+    }
+
+    // -- Secondary BE --
+
+    public long getSecondaryBeId(String clusterId, long replicaId) {
+        ConcurrentLong2LongHashMap inner = 
clusterSecondaryBeMap.get(clusterId);
+        if (inner == null) {
+            return -1L;
+        }
+        return inner.getOrDefault(replicaId, -1L);
+    }
+
+    public long getSecondaryTimestamp(String clusterId, long replicaId) {
+        ConcurrentLong2LongHashMap inner = 
clusterSecondaryTsMap.get(clusterId);
+        if (inner == null) {
+            return -1L;
+        }
+        return inner.getOrDefault(replicaId, -1L);
+    }
+
+    public void setSecondaryBe(String clusterId, long replicaId, long beId, 
long timestamp) {
+        getOrCreateClusterMap(clusterSecondaryBeMap, clusterId).put(replicaId, 
beId);

Review Comment:
   **Atomicity concern (Medium):** `setSecondaryBe` performs two separate 
`put()` operations on two different maps (`clusterSecondaryBeMap` and 
`clusterSecondaryTsMap`) without any synchronization between them. In the 
original code, `secondaryClusterToBackends.put(cluster, Pair.of(beId, 
changeTimestamp))` was a single atomic `ConcurrentHashMap.put` of an immutable 
`Pair`, so readers always saw a consistent (beId, timestamp) pair.
   
   With this split, a concurrent reader calling `getSecondaryBeId()` followed 
by `getSecondaryTimestamp()` can observe a beId from one `setSecondaryBe` call 
and a timestamp from a different call (if a concurrent update interleaves 
between the two reads). Similarly, `removeSecondaryBe` can remove the beId but 
not yet the timestamp, leaving a stale timestamp briefly visible.
   
   Callers affected:
   - `checkAndClearSecondaryClusterToBe`: reads beId then timestamp separately 
— could see stale timestamp and incorrectly retain or remove the secondary 
entry.
   - `getSecondaryBackend`: reads beId then timestamp — the timestamp is only 
used for debug logging so the impact is low here.
   
   Suggested fix: Encode both beId and timestamp into a single `long` value in 
one map (e.g., use a `ConcurrentHashMap<String, ConcurrentHashMap<Long, 
long[]>>` or pack the beId into upper bits and a reduced timestamp into lower 
bits), or add a thin synchronized accessor that retrieves both values 
atomically.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to