github-actions[bot] commented on code in PR #61922:
URL: https://github.com/apache/doris/pull/61922#discussion_r3013765894
##########
fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java:
##########
@@ -932,6 +932,9 @@ public void write(JsonWriter out, T value) throws
IOException {
((GsonPreProcessable) value).gsonPreProcess();
}
delegate.write(out, value);
+ if (value instanceof GsonPreProcessable) {
+ ((GsonPreProcessable) value).gsonPostSerialize();
+ }
Review Comment:
**Minor (Low):** `gsonPostSerialize()` is not protected by `try/finally`. If
`delegate.write(out, value)` throws an exception, `gsonPostSerialize()` won't
be called, leaving `primaryClusterToBackend` non-null with stale data. While
this field is only used during serialization and won't cause incorrect behavior
(the data source of truth is the central index), it represents a minor memory
leak on error.
Consider:
```java
if (value instanceof GsonPreProcessable) {
((GsonPreProcessable) value).gsonPreProcess();
}
try {
delegate.write(out, value);
} finally {
if (value instanceof GsonPreProcessable) {
((GsonPreProcessable) value).gsonPostSerialize();
}
}
```
##########
fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudReplica.java:
##########
@@ -610,37 +618,38 @@ public void updateClusterToSecondaryBe(String cluster,
long beId) {
LOG.debug("add to secondary clusterId {}, beId {}, changeTimestamp
{}, replica info {}",
cluster, beId, changeTimestamp, this);
}
- secondaryClusterToBackends.put(cluster, Pair.of(beId,
changeTimestamp));
+ getCloudInvertedIndex().setSecondaryBe(cluster, getId(), beId,
changeTimestamp);
}
public void clearClusterToBe(String cluster) {
- primaryClusterToBackend.remove(cluster);
- secondaryClusterToBackends.remove(cluster);
+ CloudTabletInvertedIndex idx = getCloudInvertedIndex();
+ idx.removePrimaryBeId(cluster, getId());
+ idx.removeSecondaryBe(cluster, getId());
}
// ATTN: This func is only used by redundant tablet report clean in bes.
// Only the master node will do the diff logic,
// so just only need to clean up secondaryClusterToBackends on the master
node.
public void checkAndClearSecondaryClusterToBe(String clusterId, long
expireTimestamp) {
- Pair<Long, Long> secondBeAndChangeTimestamp =
secondaryClusterToBackends.get(clusterId);
- if (secondBeAndChangeTimestamp == null) {
+ CloudTabletInvertedIndex idx = getCloudInvertedIndex();
+ long beId = idx.getSecondaryBeId(clusterId, getId());
+ if (beId == -1L) {
return;
}
- long beId = secondBeAndChangeTimestamp.key();
- long changeTimestamp = secondBeAndChangeTimestamp.value();
+ long changeTimestamp = idx.getSecondaryTimestamp(clusterId, getId());
Review Comment:
**TOCTOU race (Medium):** Between `idx.getSecondaryBeId()` (line above) and
`idx.getSecondaryTimestamp()` here, a concurrent `setSecondaryBe()` call for
the same (clusterId, replicaId) could update the beId+timestamp pair. The
`changeTimestamp` read here could come from the new update while `beId` came
from the old one. In the original code, both values were read atomically from a
single `Pair` in one `ConcurrentHashMap.get()` call.
In the worst case, this could cause `checkAndClearSecondaryClusterToBe` to
compare the new (larger) timestamp against `expireTimestamp` and decide NOT to
remove the secondary entry, even though the beId it retrieved was from the old,
expired entry. The practical impact is limited since this is only used for
cleanup, but it's a correctness regression.
##########
fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletInvertedIndex.java:
##########
@@ -128,5 +149,96 @@ public List<Replica> getReplicasByTabletId(long tabletId) {
@Override
protected void innerClear() {
replicaMetaMap.clear();
+ clusterPrimaryBeMap.clear();
+ clusterSecondaryBeMap.clear();
+ clusterSecondaryTsMap.clear();
+ }
+
+ // ---- Central cluster-to-BE mapping accessors ----
+
+ private ConcurrentLong2LongHashMap getOrCreateClusterMap(
+ ConcurrentHashMap<String, ConcurrentLong2LongHashMap> outer,
String clusterId) {
+ return outer.computeIfAbsent(clusterId, k -> new
ConcurrentLong2LongHashMap());
+ }
+
+ // -- Primary BE --
+
+ public long getPrimaryBeId(String clusterId, long replicaId) {
+ ConcurrentLong2LongHashMap inner = clusterPrimaryBeMap.get(clusterId);
+ if (inner == null) {
+ return -1L;
+ }
+ return inner.getOrDefault(replicaId, -1L);
+ }
+
+ public void setPrimaryBeId(String clusterId, long replicaId, long beId) {
+ getOrCreateClusterMap(clusterPrimaryBeMap, clusterId).put(replicaId,
beId);
+ }
+
+ public void removePrimaryBeId(String clusterId, long replicaId) {
+ ConcurrentLong2LongHashMap inner = clusterPrimaryBeMap.get(clusterId);
+ if (inner != null) {
+ inner.remove(replicaId);
+ }
+ }
+
+ public void clearPrimaryBeForReplica(long replicaId) {
+ for (ConcurrentLong2LongHashMap inner : clusterPrimaryBeMap.values()) {
+ inner.remove(replicaId);
+ }
+ }
+
+ public Map<String, Long> getAllPrimaryClusterBeIds(long replicaId) {
+ Map<String, Long> result = new HashMap<>();
+ for (Map.Entry<String, ConcurrentLong2LongHashMap> entry :
clusterPrimaryBeMap.entrySet()) {
+ long beId = entry.getValue().getOrDefault(replicaId, -1L);
+ if (beId != -1L) {
+ result.put(entry.getKey(), beId);
+ }
+ }
+ return result;
+ }
+
+ // -- Secondary BE --
+
+ public long getSecondaryBeId(String clusterId, long replicaId) {
+ ConcurrentLong2LongHashMap inner =
clusterSecondaryBeMap.get(clusterId);
+ if (inner == null) {
+ return -1L;
+ }
+ return inner.getOrDefault(replicaId, -1L);
+ }
+
+ public long getSecondaryTimestamp(String clusterId, long replicaId) {
+ ConcurrentLong2LongHashMap inner =
clusterSecondaryTsMap.get(clusterId);
+ if (inner == null) {
+ return -1L;
+ }
+ return inner.getOrDefault(replicaId, -1L);
+ }
+
+ public void setSecondaryBe(String clusterId, long replicaId, long beId,
long timestamp) {
+ getOrCreateClusterMap(clusterSecondaryBeMap, clusterId).put(replicaId,
beId);
Review Comment:
**Atomicity concern (Medium):** `setSecondaryBe` performs two separate
`put()` operations on two different maps (`clusterSecondaryBeMap` and
`clusterSecondaryTsMap`) without any synchronization between them. In the
original code, `secondaryClusterToBackends.put(cluster, Pair.of(beId,
changeTimestamp))` was a single atomic `ConcurrentHashMap.put` of an immutable
`Pair`, so readers always saw a consistent (beId, timestamp) pair.
With this split, a concurrent reader calling `getSecondaryBeId()` followed
by `getSecondaryTimestamp()` can observe a beId from one `setSecondaryBe` call
and a timestamp from a different call (if a concurrent update interleaves
between the two reads). Similarly, `removeSecondaryBe` can remove the beId but
not yet the timestamp, leaving a stale timestamp briefly visible.
Callers affected:
- `checkAndClearSecondaryClusterToBe`: reads beId then timestamp separately
— could see stale timestamp and incorrectly retain or remove the secondary
entry.
- `getSecondaryBackend`: reads beId then timestamp — the timestamp is only
used for debug logging so the impact is low here.
Suggested fix: Encode both beId and timestamp into a single `long` value in
one map (e.g., use a `ConcurrentHashMap<String, ConcurrentHashMap<Long,
long[]>>` or pack the beId into upper bits and a reduced timestamp into lower
bits), or add a thin synchronized accessor that retrieves both values
atomically.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]