This is an automated email from the ASF dual-hosted git repository.
hellostephen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 48894b90f61 [fix](cloud) Fix NPE in group commit when backend belongs
to a different cluster (#60652)
48894b90f61 is described below
commit 48894b90f618f181cd504e4d78b7f0932d1ebde4
Author: deardeng <[email protected]>
AuthorDate: Tue Feb 24 14:27:04 2026 +0800
[fix](cloud) Fix NPE in group commit when backend belongs to a different
cluster (#60652)
Fix:
- Add cluster membership check in getCachedBackend() to invalidate
cached backends that no longer belong to the expected cluster, which can
happen during auto-scaling.
---
.../org/apache/doris/load/GroupCommitManager.java | 14 ++++++++++--
.../main/java/org/apache/doris/qe/Coordinator.java | 26 ++++++++++++++++++++++
2 files changed, 38 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
index 25e72199d2a..a2234c5366a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/GroupCommitManager.java
@@ -213,7 +213,15 @@ public class GroupCommitManager {
try {
long backendId = new MasterOpExecutor(context)
.getGroupCommitLoadBeId(tableId, clusterName);
- return Env.getCurrentSystemInfo().getBackend(backendId);
+ Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("selectBackendForGroupCommit on non-master:
tableId={}, clusterName={},"
+ + " backendId={}, backend={}, backendCluster={}",
+ tableId, clusterName, backendId,
+ be != null ? be.getHost() + ":" + be.getBePort() :
"null",
+ be != null ? be.getCloudClusterName() : "null");
+ }
+ return be;
} catch (Exception e) {
throw new LoadException(e.getMessage());
}
@@ -350,7 +358,9 @@ public class GroupCommitManager {
}
Backend backend =
Env.getCurrentSystemInfo().getBackend(backendId);
if (backend != null && backend.isAlive() &&
!backend.isDecommissioned()
- && (!Config.isCloudMode() ||
!backend.isDecommissioning())) {
+ && (!Config.isCloudMode() ||
!backend.isDecommissioning())
+ && (!Config.isCloudMode() || cluster == null
+ ||
cluster.equals(backend.getCloudClusterName()))) {
return backend.getId();
} else {
tableToBeMap.remove(encode(cluster, tableId));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index f6c929b6526..fefd77c7df8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -565,6 +565,26 @@ public class Coordinator implements CoordInterface {
this.idToBackend =
Env.getCurrentSystemInfo().getBackendsByCurrentCluster();
+ // Log cluster info and groupCommitBackend for debugging NPE in
PipelineExecContext
+ if (groupCommitBackend != null) {
+ String currentCluster = "unknown";
+ try {
+ if (ConnectContext.get() != null) {
+ currentCluster = ConnectContext.get().getCloudCluster();
+ }
+ } catch (Exception e) {
+ LOG.debug("failed to get current cloud cluster for debug log",
e);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("query {} prepare: currentCluster={},
idToBackend.size={}, idToBackend.keys={},"
+ + " groupCommitBackend=[id={}, host={},
cluster={}], groupCommitBackendInMap={}",
+ DebugUtil.printId(queryId), currentCluster,
idToBackend.size(), idToBackend.keySet(),
+ groupCommitBackend.getId(),
groupCommitBackend.getHost(),
+ groupCommitBackend.getCloudClusterName(),
+ idToBackend.containsKey(groupCommitBackend.getId()));
+ }
+ }
+
if (LOG.isDebugEnabled()) {
int backendNum = idToBackend.size();
StringBuilder backendInfos = new StringBuilder("backends info:");
@@ -835,6 +855,12 @@ public class Coordinator implements CoordInterface {
// So that we can use one RPC to send all fragment instances
of a BE.
for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry
: tParams.entrySet()) {
Long backendId =
this.addressToBackendID.get(entry.getKey());
+ if (backendId == null) {
+ LOG.warn("query {} sendPipelineCtx: addressToBackendID
lookup returned null!"
+ + " address={}, fragmentId={},
addressToBackendID={}",
+ DebugUtil.printId(queryId), entry.getKey(),
+ fragment.getFragmentId(), addressToBackendID);
+ }
backendFragments.add(Pair.of(fragment.getFragmentId(),
backendId));
PipelineExecContext pipelineExecContext = new
PipelineExecContext(fragment.getFragmentId(),
entry.getValue(), idToBackend.get(backendId),
executionProfile, jobId);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]