This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new c31bf414008 branch-4.0: [fix](cloud) fix correct pipeline task number 
calculation in cloud mode #57129 (#57262)
c31bf414008 is described below

commit c31bf4140087a33ed0b5e0b91cae825a2707ac9d
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 23 17:26:45 2025 +0800

    branch-4.0: [fix](cloud) fix correct pipeline task number calculation in 
cloud mode #57129 (#57262)
    
    Cherry-picked from #57129
    
    Co-authored-by: Xin Liao <[email protected]>
---
 .../doris/cloud/system/CloudSystemInfoService.java |  11 +-
 .../org/apache/doris/system/SystemInfoService.java |  14 +-
 .../cloud/system/CloudSystemInfoServiceTest.java   | 645 ++++++++++++++++++++-
 .../apache/doris/system/SystemInfoServiceTest.java |  61 ++
 4 files changed, 720 insertions(+), 11 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 91033a9dd6d..62ab6e3b9ec 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -774,8 +774,15 @@ public class CloudSystemInfoService extends 
SystemInfoService {
                 && Strings.isNullOrEmpty(clusterName)) {
             return 1;
         }
-
-        return super.getMinPipelineExecutorSize();
+        List<Backend> currentBackends = getBackendsByClusterName(clusterName);
+        if (currentBackends == null || currentBackends.isEmpty()) {
+            return 1;
+        }
+        return currentBackends.stream()
+                .mapToInt(Backend::getPipelineExecutorSize)
+                .filter(size -> size > 0)
+                .min()
+                .orElse(1);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 316a53b1ce0..1e41eb8d501 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -1143,14 +1143,12 @@ public class SystemInfoService {
         if (currentBackends.size() == 0) {
             return 1;
         }
-        int minPipelineExecutorSize = Integer.MAX_VALUE;
-        for (Backend be : currentBackends) {
-            int size = be.getPipelineExecutorSize();
-            if (size > 0) {
-                minPipelineExecutorSize = Math.min(minPipelineExecutorSize, 
size);
-            }
-        }
-        return minPipelineExecutorSize;
+
+        return currentBackends.stream()
+                .mapToInt(Backend::getPipelineExecutorSize)
+                .filter(size -> size > 0)
+                .min()
+                .orElse(1);
     }
 
     // CloudSystemInfoService override
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
index 445ec828ba2..85d14585f21 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
@@ -17,8 +17,11 @@
 
 package org.apache.doris.cloud.system;
 
+import org.apache.doris.analysis.UserIdentity;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.cloud.catalog.ComputeGroup;
+import org.apache.doris.common.Config;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
 
@@ -35,7 +38,8 @@ public class CloudSystemInfoServiceTest {
 
     @Before
     public void setUp() {
-        //infoService = new CloudSystemInfoService();
+        // Enable cloud mode for testing
+        Config.cloud_unique_id = "test_cloud_unique_id";
     }
 
     @Test
@@ -348,4 +352,643 @@ public class CloudSystemInfoServiceTest {
         res = infoService.isStandByComputeGroup(pcgName3);
         Assert.assertFalse(res);
     }
+
+    // Test for getMinPipelineExecutorSize method
+    @Test
+    public void testGetMinPipelineExecutorSizeWithEmptyCluster() {
+        infoService = new CloudSystemInfoService();
+        String clusterName = "test_cluster";
+        String clusterId = "test_cluster_id";
+
+        // Mock an empty cluster (no backends)
+        ComputeGroup cg = new ComputeGroup(clusterId, clusterName, 
ComputeGroup.ComputeTypeEnum.COMPUTE);
+        infoService.addComputeGroup(clusterId, cg);
+
+        // Set ConnectContext to select the cluster
+        createTestConnectContext(clusterName);
+
+        try {
+            // Since there are no backends in the cluster, should return 1
+            int result = infoService.getMinPipelineExecutorSize();
+            Assert.assertEquals(1, result);
+        } finally {
+            ConnectContext.remove();
+        }
+    }
+
+    @Test
+    public void testGetMinPipelineExecutorSizeWithSingleBackend() {
+        infoService = new CloudSystemInfoService();
+        String clusterName = "test_cluster";
+        String clusterId = "test_cluster_id";
+
+        // Setup cluster
+        ComputeGroup cg = new ComputeGroup(clusterId, clusterName, 
ComputeGroup.ComputeTypeEnum.COMPUTE);
+        infoService.addComputeGroup(clusterId, cg);
+
+        // Add a backend with pipeline executor size = 8
+        List<Backend> toAdd = new ArrayList<>();
+        Backend backend = new Backend(Env.getCurrentEnv().getNextId(), 
"127.0.0.1", 9050);
+        Map<String, String> tagMap = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+        tagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+        backend.setTagMap(tagMap);
+        backend.setPipelineExecutorSize(8);
+        toAdd.add(backend);
+
+        infoService.updateCloudClusterMapNoLock(toAdd, new ArrayList<>());
+
+        // Set ConnectContext to select the cluster
+        createTestConnectContext(clusterName);
+
+        try {
+            // Should return the pipeline executor size of the single backend
+            int result = infoService.getMinPipelineExecutorSize();
+            Assert.assertEquals(8, result);
+        } finally {
+            ConnectContext.remove();
+        }
+    }
+
+    @Test
+    public void testGetMinPipelineExecutorSizeWithMultipleBackends() {
+        infoService = new CloudSystemInfoService();
+        String clusterName = "test_cluster";
+        String clusterId = "test_cluster_id";
+
+        // Setup cluster
+        ComputeGroup cg = new ComputeGroup(clusterId, clusterName, 
ComputeGroup.ComputeTypeEnum.COMPUTE);
+        infoService.addComputeGroup(clusterId, cg);
+
+        // Add multiple backends with different pipeline executor sizes
+        List<Backend> toAdd = new ArrayList<>();
+
+        Backend backend1 = new Backend(Env.getCurrentEnv().getNextId(), 
"127.0.0.1", 9050);
+        Map<String, String> tagMap1 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap1.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+        tagMap1.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+        backend1.setTagMap(tagMap1);
+        backend1.setPipelineExecutorSize(12);
+        toAdd.add(backend1);
+
+        Backend backend2 = new Backend(Env.getCurrentEnv().getNextId(), 
"127.0.0.2", 9050);
+        Map<String, String> tagMap2 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap2.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+        tagMap2.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+        backend2.setTagMap(tagMap2);
+        backend2.setPipelineExecutorSize(6); // This should be the minimum
+        toAdd.add(backend2);
+
+        Backend backend3 = new Backend(Env.getCurrentEnv().getNextId(), 
"127.0.0.3", 9050);
+        Map<String, String> tagMap3 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap3.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+        tagMap3.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+        backend3.setTagMap(tagMap3);
+        backend3.setPipelineExecutorSize(10);
+        toAdd.add(backend3);
+
+        infoService.updateCloudClusterMapNoLock(toAdd, new ArrayList<>());
+
+        // Set ConnectContext to select the cluster
+        createTestConnectContext(clusterName);
+
+        try {
+            // Should return the minimum pipeline executor size (6)
+            int result = infoService.getMinPipelineExecutorSize();
+            Assert.assertEquals(6, result);
+        } finally {
+            ConnectContext.remove();
+        }
+    }
+
+    @Test
+    public void testGetMinPipelineExecutorSizeWithZeroSizeBackends() {
+        infoService = new CloudSystemInfoService();
+        String clusterName = "test_cluster";
+        String clusterId = "test_cluster_id";
+
+        // Setup cluster
+        ComputeGroup cg = new ComputeGroup(clusterId, clusterName, 
ComputeGroup.ComputeTypeEnum.COMPUTE);
+        infoService.addComputeGroup(clusterId, cg);
+
+        // Add backends with zero and positive pipeline executor sizes
+        List<Backend> toAdd = new ArrayList<>();
+
+        Backend backend1 = new Backend(Env.getCurrentEnv().getNextId(), 
"127.0.0.1", 9050);
+        Map<String, String> tagMap1 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap1.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+        tagMap1.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+        backend1.setTagMap(tagMap1);
+        backend1.setPipelineExecutorSize(0); // Should be ignored
+        toAdd.add(backend1);
+
+        Backend backend2 = new Backend(Env.getCurrentEnv().getNextId(), 
"127.0.0.2", 9050);
+        Map<String, String> tagMap2 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap2.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+        tagMap2.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+        backend2.setTagMap(tagMap2);
+        backend2.setPipelineExecutorSize(4); // This should be the minimum
+        toAdd.add(backend2);
+
+        Backend backend3 = new Backend(Env.getCurrentEnv().getNextId(), 
"127.0.0.3", 9050);
+        Map<String, String> tagMap3 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap3.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+        tagMap3.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+        backend3.setTagMap(tagMap3);
+        backend3.setPipelineExecutorSize(-1); // Should be ignored
+        toAdd.add(backend3);
+
+        infoService.updateCloudClusterMapNoLock(toAdd, new ArrayList<>());
+
+        // Set ConnectContext to select the cluster
+        createTestConnectContext(clusterName);
+
+        try {
+            // Should return the minimum positive pipeline executor size (4)
+            int result = infoService.getMinPipelineExecutorSize();
+            Assert.assertEquals(4, result);
+        } finally {
+            ConnectContext.remove();
+        }
+    }
+
+    @Test
+    public void testGetMinPipelineExecutorSizeWithAllZeroSizeBackends() {
+        infoService = new CloudSystemInfoService();
+        String clusterName = "test_cluster";
+        String clusterId = "test_cluster_id";
+
+        // Setup cluster
+        ComputeGroup cg = new ComputeGroup(clusterId, clusterName, 
ComputeGroup.ComputeTypeEnum.COMPUTE);
+        infoService.addComputeGroup(clusterId, cg);
+
+        // Add backends with only zero or negative pipeline executor sizes
+        List<Backend> toAdd = new ArrayList<>();
+
+        Backend backend1 = new Backend(Env.getCurrentEnv().getNextId(), 
"127.0.0.1", 9050);
+        Map<String, String> tagMap1 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap1.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+        tagMap1.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+        backend1.setTagMap(tagMap1);
+        backend1.setPipelineExecutorSize(0);
+        toAdd.add(backend1);
+
+        Backend backend2 = new Backend(Env.getCurrentEnv().getNextId(), 
"127.0.0.2", 9050);
+        Map<String, String> tagMap2 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap2.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+        tagMap2.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+        backend2.setTagMap(tagMap2);
+        backend2.setPipelineExecutorSize(-1);
+        toAdd.add(backend2);
+
+        infoService.updateCloudClusterMapNoLock(toAdd, new ArrayList<>());
+
+        // Set ConnectContext to select the cluster
+        createTestConnectContext(clusterName);
+
+        try {
+            // Should return 1 when no valid pipeline executor sizes are
+            // found
+            int result = infoService.getMinPipelineExecutorSize();
+            Assert.assertEquals(1, result);
+        } finally {
+            ConnectContext.remove();
+        }
+    }
+
+    // Test for error handling when ConnectContext has no cluster set
+    @Test
+    public void testGetMinPipelineExecutorSizeWithNoClusterInContext() {
+        infoService = new CloudSystemInfoService();
+
+        // Create ConnectContext but don't set any cluster (empty cluster name)
+        createTestConnectContext(null);
+        try {
+            // Should return 1 when no cluster is set in ConnectContext
+            int result = infoService.getMinPipelineExecutorSize();
+            Assert.assertEquals(1, result);
+        } finally {
+            ConnectContext.remove();
+        }
+    }
+
+    @Test
+    public void testGetMinPipelineExecutorSizeWithMixedValidInvalidBackends() {
+        infoService = new CloudSystemInfoService();
+        String clusterName = "mixed_cluster";
+        String clusterId = "mixed_cluster_id";
+
+        // Setup cluster
+        ComputeGroup cg = new ComputeGroup(clusterId, clusterName, 
ComputeGroup.ComputeTypeEnum.COMPUTE);
+        infoService.addComputeGroup(clusterId, cg);
+
+        // Add backends with mixed valid and invalid pipeline executor sizes
+        List<Backend> toAdd = new ArrayList<>();
+
+        // Backend with valid size
+        Backend backend1 = new Backend(Env.getCurrentEnv().getNextId(), 
"127.0.0.1", 9050);
+        Map<String, String> tagMap1 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap1.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+        tagMap1.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+        backend1.setTagMap(tagMap1);
+        backend1.setPipelineExecutorSize(16);
+        toAdd.add(backend1);
+
+        // Backend with zero size (should be ignored)
+        Backend backend2 = new Backend(Env.getCurrentEnv().getNextId(), 
"127.0.0.2", 9050);
+        Map<String, String> tagMap2 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap2.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+        tagMap2.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+        backend2.setTagMap(tagMap2);
+        backend2.setPipelineExecutorSize(0);
+        toAdd.add(backend2);
+
+        // Backend with valid size (smaller than first)
+        Backend backend3 = new Backend(Env.getCurrentEnv().getNextId(), 
"127.0.0.3", 9050);
+        Map<String, String> tagMap3 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap3.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+        tagMap3.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+        backend3.setTagMap(tagMap3);
+        backend3.setPipelineExecutorSize(8); // This should be the minimum
+        toAdd.add(backend3);
+
+        // Backend with negative size (should be ignored)
+        Backend backend4 = new Backend(Env.getCurrentEnv().getNextId(), 
"127.0.0.4", 9050);
+        Map<String, String> tagMap4 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap4.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+        tagMap4.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+        backend4.setTagMap(tagMap4);
+        backend4.setPipelineExecutorSize(-5);
+        toAdd.add(backend4);
+
+        infoService.updateCloudClusterMapNoLock(toAdd, new ArrayList<>());
+
+        // Set ConnectContext to select the cluster
+        createTestConnectContext(clusterName);
+
+        try {
+            // Should return 8 (minimum valid size)
+            int result = infoService.getMinPipelineExecutorSize();
+            Assert.assertEquals(8, result);
+        } finally {
+            ConnectContext.remove();
+        }
+    }
+
+    @Test
+    public void testGetMinPipelineExecutorSizeWithLargeValues() {
+        infoService = new CloudSystemInfoService();
+        String clusterName = "large_cluster";
+        String clusterId = "large_cluster_id";
+
+        // Setup cluster
+        ComputeGroup cg = new ComputeGroup(clusterId, clusterName, 
ComputeGroup.ComputeTypeEnum.COMPUTE);
+        infoService.addComputeGroup(clusterId, cg);
+
+        // Add backends with large pipeline executor sizes
+        List<Backend> toAdd = new ArrayList<>();
+
+        Backend backend1 = new Backend(Env.getCurrentEnv().getNextId(), 
"127.0.0.1", 9050);
+        Map<String, String> tagMap1 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap1.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+        tagMap1.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+        backend1.setTagMap(tagMap1);
+        backend1.setPipelineExecutorSize(1024);
+        toAdd.add(backend1);
+
+        Backend backend2 = new Backend(Env.getCurrentEnv().getNextId(), 
"127.0.0.2", 9050);
+        Map<String, String> tagMap2 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap2.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+        tagMap2.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+        backend2.setTagMap(tagMap2);
+        backend2.setPipelineExecutorSize(2048);
+        toAdd.add(backend2);
+
+        Backend backend3 = new Backend(Env.getCurrentEnv().getNextId(), 
"127.0.0.3", 9050);
+        Map<String, String> tagMap3 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap3.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+        tagMap3.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+        backend3.setTagMap(tagMap3);
+        backend3.setPipelineExecutorSize(512); // This should be the minimum
+        toAdd.add(backend3);
+
+        infoService.updateCloudClusterMapNoLock(toAdd, new ArrayList<>());
+
+        // Set ConnectContext to select the cluster
+        createTestConnectContext(clusterName);
+
+        try {
+            // Should return 512 (minimum among large values)
+            int result = infoService.getMinPipelineExecutorSize();
+            Assert.assertEquals(512, result);
+        } finally {
+            ConnectContext.remove();
+        }
+    }
+
+    @Test
+    public void testGetMinPipelineExecutorSizeConsistency() {
+        infoService = new CloudSystemInfoService();
+        String clusterName = "consistency_cluster";
+        String clusterId = "consistency_cluster_id";
+
+        // Setup cluster
+        ComputeGroup cg = new ComputeGroup(clusterId, clusterName, 
ComputeGroup.ComputeTypeEnum.COMPUTE);
+        infoService.addComputeGroup(clusterId, cg);
+
+        // Add backends with same pipeline executor sizes
+        List<Backend> toAdd = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            Backend backend = new Backend(Env.getCurrentEnv().getNextId(), 
"127.0.0." + (i + 1), 9050);
+            Map<String, String> tagMap = Tag.DEFAULT_BACKEND_TAG.toMap();
+            tagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+            tagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+            backend.setTagMap(tagMap);
+            backend.setPipelineExecutorSize(32); // All backends have same size
+            toAdd.add(backend);
+        }
+
+        infoService.updateCloudClusterMapNoLock(toAdd, new ArrayList<>());
+
+        // Set ConnectContext to select the cluster
+        createTestConnectContext(clusterName);
+
+        try {
+            // Should return 32 (consistent across all backends)
+            int result = infoService.getMinPipelineExecutorSize();
+            Assert.assertEquals(32, result);
+        } finally {
+            ConnectContext.remove();
+        }
+    }
+
+    // Test for multiple compute groups - should only use current cluster
+    @Test
+    public void testGetMinPipelineExecutorSizeWithMultipleComputeGroups() {
+        infoService = new CloudSystemInfoService();
+
+        // Setup multiple clusters with different pipeline executor sizes
+        String cluster1Name = "cluster1";
+        String cluster1Id = "cluster1_id";
+        String cluster2Name = "cluster2";
+        String cluster2Id = "cluster2_id";
+
+        // Setup cluster1
+        ComputeGroup cg1 = new ComputeGroup(cluster1Id, cluster1Name, 
ComputeGroup.ComputeTypeEnum.COMPUTE);
+        infoService.addComputeGroup(cluster1Id, cg1);
+
+        // Setup cluster2
+        ComputeGroup cg2 = new ComputeGroup(cluster2Id, cluster2Name, 
ComputeGroup.ComputeTypeEnum.COMPUTE);
+        infoService.addComputeGroup(cluster2Id, cg2);
+
+        // Add backends to cluster1 with smaller pipeline executor sizes
+        List<Backend> cluster1Backends = new ArrayList<>();
+        Backend cluster1Backend1 = new 
Backend(Env.getCurrentEnv().getNextId(), "10.0.1.1", 9050);
+        Map<String, String> tagMap1 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap1.put(Tag.CLOUD_CLUSTER_NAME, cluster1Name);
+        tagMap1.put(Tag.CLOUD_CLUSTER_ID, cluster1Id);
+        cluster1Backend1.setTagMap(tagMap1);
+        cluster1Backend1.setPipelineExecutorSize(4); // Smaller than current 
cluster
+        cluster1Backends.add(cluster1Backend1);
+
+        Backend cluster1Backend2 = new 
Backend(Env.getCurrentEnv().getNextId(), "10.0.1.2", 9050);
+        Map<String, String> tagMap2 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap2.put(Tag.CLOUD_CLUSTER_NAME, cluster1Name);
+        tagMap2.put(Tag.CLOUD_CLUSTER_ID, cluster1Id);
+        cluster1Backend2.setTagMap(tagMap2);
+        cluster1Backend2.setPipelineExecutorSize(2); // Smallest overall
+        cluster1Backends.add(cluster1Backend2);
+
+        infoService.updateCloudClusterMapNoLock(cluster1Backends, new 
ArrayList<>());
+
+        // Add backends to cluster2
+        List<Backend> cluster2Backends = new ArrayList<>();
+        Backend cluster2Backend1 = new 
Backend(Env.getCurrentEnv().getNextId(), "10.0.2.1", 9050);
+        Map<String, String> tagMap3 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap3.put(Tag.CLOUD_CLUSTER_NAME, cluster2Name);
+        tagMap3.put(Tag.CLOUD_CLUSTER_ID, cluster2Id);
+        cluster2Backend1.setTagMap(tagMap3);
+        cluster2Backend1.setPipelineExecutorSize(8);
+        cluster2Backends.add(cluster2Backend1);
+
+        Backend cluster2Backend2 = new 
Backend(Env.getCurrentEnv().getNextId(), "10.0.2.2", 9050);
+        Map<String, String> tagMap4 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap4.put(Tag.CLOUD_CLUSTER_NAME, cluster2Name);
+        tagMap4.put(Tag.CLOUD_CLUSTER_ID, cluster2Id);
+        cluster2Backend2.setTagMap(tagMap4);
+        cluster2Backend2.setPipelineExecutorSize(12);
+        cluster2Backends.add(cluster2Backend2);
+
+        infoService.updateCloudClusterMapNoLock(cluster2Backends, new 
ArrayList<>());
+
+        // Set ConnectContext to cluster2 to test that only cluster2 backends 
are used
+        createTestConnectContext(cluster2Name);
+
+        try {
+            // Should return 8 (minimum from current cluster2), not 2 (global 
minimum from cluster1)
+            int result = infoService.getMinPipelineExecutorSize();
+            Assert.assertEquals(8, result);
+        } finally {
+            ConnectContext.remove();
+        }
+    }
+
+    @Test
+    public void testGetMinPipelineExecutorSizeWithVirtualComputeGroup() {
+        infoService = new CloudSystemInfoService();
+
+        // Setup virtual and physical clusters
+        String virtualClusterName = "virtual_cluster";
+        String virtualClusterId = "virtual_cluster_id";
+        String physicalClusterName = "physical_cluster";
+        String physicalClusterId = "physical_cluster_id";
+        String otherClusterName = "other_cluster";
+        String otherClusterId = "other_cluster_id";
+
+        // Setup virtual cluster
+        ComputeGroup virtualCg = new ComputeGroup(virtualClusterId, 
virtualClusterName,
+                ComputeGroup.ComputeTypeEnum.VIRTUAL);
+        ComputeGroup.Policy policy = new ComputeGroup.Policy();
+        policy.setActiveComputeGroup(physicalClusterName);
+        virtualCg.setPolicy(policy);
+        infoService.addComputeGroup(virtualClusterId, virtualCg);
+
+        // Setup physical cluster
+        ComputeGroup physicalCg = new ComputeGroup(physicalClusterId, 
physicalClusterName,
+                ComputeGroup.ComputeTypeEnum.COMPUTE);
+        infoService.addComputeGroup(physicalClusterId, physicalCg);
+
+        // Setup other cluster
+        ComputeGroup otherCg = new ComputeGroup(otherClusterId, 
otherClusterName, ComputeGroup.ComputeTypeEnum.COMPUTE);
+        infoService.addComputeGroup(otherClusterId, otherCg);
+
+        // Add backends to physical cluster
+        List<Backend> physicalBackends = new ArrayList<>();
+        Backend physicalBackend1 = new 
Backend(Env.getCurrentEnv().getNextId(), "172.16.1.1", 9050);
+        Map<String, String> tagMap1 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap1.put(Tag.CLOUD_CLUSTER_NAME, physicalClusterName);
+        tagMap1.put(Tag.CLOUD_CLUSTER_ID, physicalClusterId);
+        physicalBackend1.setTagMap(tagMap1);
+        physicalBackend1.setPipelineExecutorSize(32); // Min in physical 
cluster
+        physicalBackends.add(physicalBackend1);
+
+        Backend physicalBackend2 = new 
Backend(Env.getCurrentEnv().getNextId(), "172.16.1.2", 9050);
+        Map<String, String> tagMap2 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap2.put(Tag.CLOUD_CLUSTER_NAME, physicalClusterName);
+        tagMap2.put(Tag.CLOUD_CLUSTER_ID, physicalClusterId);
+        physicalBackend2.setTagMap(tagMap2);
+        physicalBackend2.setPipelineExecutorSize(48);
+        physicalBackends.add(physicalBackend2);
+
+        infoService.updateCloudClusterMapNoLock(physicalBackends, new 
ArrayList<>());
+
+        // Add backends to other cluster with smaller values
+        List<Backend> otherBackends = new ArrayList<>();
+        Backend otherBackend = new Backend(Env.getCurrentEnv().getNextId(), 
"10.0.3.1", 9050);
+        Map<String, String> tagMap3 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap3.put(Tag.CLOUD_CLUSTER_NAME, otherClusterName);
+        tagMap3.put(Tag.CLOUD_CLUSTER_ID, otherClusterId);
+        otherBackend.setTagMap(tagMap3);
+        otherBackend.setPipelineExecutorSize(8); // Smaller than virtual 
cluster's physical cluster
+        otherBackends.add(otherBackend);
+
+        infoService.updateCloudClusterMapNoLock(otherBackends, new 
ArrayList<>());
+
+        // Create ConnectContext and set it to select virtual cluster
+        ConnectContext ctx = createTestConnectContext(virtualClusterName);
+
+        try {
+            // Should return 32 (minimum from virtual cluster's physical 
cluster), not 8
+            // (from other cluster)
+            int result = infoService.getMinPipelineExecutorSize();
+            Assert.assertEquals(32, result);
+
+            // Switch to other cluster
+            ctx.setCloudCluster(otherClusterName);
+
+            // Should return 8 (from other cluster)
+            result = infoService.getMinPipelineExecutorSize();
+            Assert.assertEquals(8, result);
+
+        } finally {
+            // Clean up ConnectContext
+            ConnectContext.remove();
+        }
+    }
+
+    @Test
+    public void testGetMinPipelineExecutorSizeWithConnectContextNoCluster() {
+        infoService = new CloudSystemInfoService();
+
+        // Create ConnectContext but don't set any cluster
+        createTestConnectContext(null); // null to test no cluster scenario
+
+        try {
+            // Should return 1 because no cluster is set (will catch 
AnalysisException)
+            int result = infoService.getMinPipelineExecutorSize();
+            Assert.assertEquals(1, result);
+
+        } finally {
+            // Clean up ConnectContext
+            ConnectContext.remove();
+        }
+    }
+
+    // Test using real ConnectContext to select compute group
+    @Test
+    public void testGetMinPipelineExecutorSizeWithConnectContext() {
+        infoService = new CloudSystemInfoService();
+
+        // Setup multiple clusters with different pipeline executor sizes
+        String cluster1Name = "ctx_cluster1";
+        String cluster1Id = "ctx_cluster1_id";
+        String cluster2Name = "ctx_cluster2";
+        String cluster2Id = "ctx_cluster2_id";
+
+        // Setup cluster1
+        ComputeGroup cg1 = new ComputeGroup(cluster1Id, cluster1Name, 
ComputeGroup.ComputeTypeEnum.COMPUTE);
+        infoService.addComputeGroup(cluster1Id, cg1);
+
+        // Setup cluster2
+        ComputeGroup cg2 = new ComputeGroup(cluster2Id, cluster2Name, 
ComputeGroup.ComputeTypeEnum.COMPUTE);
+        infoService.addComputeGroup(cluster2Id, cg2);
+
+        // Add backends to cluster1 with smaller pipeline executor sizes
+        List<Backend> cluster1Backends = new ArrayList<>();
+        Backend cluster1Backend1 = new 
Backend(Env.getCurrentEnv().getNextId(), "10.0.1.1", 9050);
+        Map<String, String> tagMap1 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap1.put(Tag.CLOUD_CLUSTER_NAME, cluster1Name);
+        tagMap1.put(Tag.CLOUD_CLUSTER_ID, cluster1Id);
+        cluster1Backend1.setTagMap(tagMap1);
+        cluster1Backend1.setPipelineExecutorSize(4);
+        cluster1Backends.add(cluster1Backend1);
+
+        Backend cluster1Backend2 = new 
Backend(Env.getCurrentEnv().getNextId(), "10.0.1.2", 9050);
+        Map<String, String> tagMap2 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap2.put(Tag.CLOUD_CLUSTER_NAME, cluster1Name);
+        tagMap2.put(Tag.CLOUD_CLUSTER_ID, cluster1Id);
+        cluster1Backend2.setTagMap(tagMap2);
+        cluster1Backend2.setPipelineExecutorSize(2); // Smallest in cluster1
+        cluster1Backends.add(cluster1Backend2);
+
+        infoService.updateCloudClusterMapNoLock(cluster1Backends, new 
ArrayList<>());
+
+        // Add backends to cluster2 with larger pipeline executor sizes
+        List<Backend> cluster2Backends = new ArrayList<>();
+        Backend cluster2Backend1 = new 
Backend(Env.getCurrentEnv().getNextId(), "10.0.2.1", 9050);
+        Map<String, String> tagMap3 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap3.put(Tag.CLOUD_CLUSTER_NAME, cluster2Name);
+        tagMap3.put(Tag.CLOUD_CLUSTER_ID, cluster2Id);
+        cluster2Backend1.setTagMap(tagMap3);
+        cluster2Backend1.setPipelineExecutorSize(16); // Smallest in cluster2
+        cluster2Backends.add(cluster2Backend1);
+
+        Backend cluster2Backend2 = new 
Backend(Env.getCurrentEnv().getNextId(), "10.0.2.2", 9050);
+        Map<String, String> tagMap4 = Tag.DEFAULT_BACKEND_TAG.toMap();
+        tagMap4.put(Tag.CLOUD_CLUSTER_NAME, cluster2Name);
+        tagMap4.put(Tag.CLOUD_CLUSTER_ID, cluster2Id);
+        cluster2Backend2.setTagMap(tagMap4);
+        cluster2Backend2.setPipelineExecutorSize(24);
+        cluster2Backends.add(cluster2Backend2);
+
+        infoService.updateCloudClusterMapNoLock(cluster2Backends, new 
ArrayList<>());
+
+        // Create ConnectContext and set it to select cluster1
+        ConnectContext ctx = createTestConnectContext(cluster1Name);
+
+        try {
+            // Should return 2 (minimum from cluster1), not 16 (minimum from 
cluster2)
+            int result = infoService.getMinPipelineExecutorSize();
+            Assert.assertEquals(2, result);
+
+            // Now switch to cluster2
+            ctx.setCloudCluster(cluster2Name);
+
+            // Should return 16 (minimum from cluster2), not 2 (minimum from 
cluster1)
+            result = infoService.getMinPipelineExecutorSize();
+            Assert.assertEquals(16, result);
+        } finally {
+            // Clean up ConnectContext
+            ConnectContext.remove();
+        }
+    }
+
+    /**
+     * Helper method to create a test ConnectContext with specific cluster name
+     */
+    private ConnectContext createTestConnectContext(String clusterName) {
+        try {
+            ConnectContext ctx = new ConnectContext();
+            ctx.setCurrentUserIdentity(UserIdentity.ROOT);
+            ctx.setRemoteIP("127.0.0.1");
+            ctx.setEnv(Env.getCurrentEnv());
+            if (clusterName != null) {
+                ctx.setCloudCluster(clusterName);
+            }
+            ctx.setThreadLocalInfo();
+            return ctx;
+        } catch (Throwable t) {
+            throw new IllegalStateException("Cannot create test connect 
context", t);
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
index 62ade50c919..0ad09096626 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -444,4 +444,65 @@ public class SystemInfoServiceTest {
         be.setTagMap(tagMap);
     }
 
+    @Test
+    public void testGetMinPipelineExecutorSize() {
+        // Test case 1: No backends
+        int result = infoService.getMinPipelineExecutorSize();
+        Assert.assertEquals(1, result);
+
+        // Test case 2: Single backend with pipeline executor size = 8
+        addBackend(20001, "192.168.2.1", 9050);
+        Backend be1 = infoService.getBackend(20001);
+        be1.setPipelineExecutorSize(8);
+        be1.setAlive(true);
+
+        result = infoService.getMinPipelineExecutorSize();
+        Assert.assertEquals(8, result);
+
+        // Test case 3: Multiple backends with different pipeline executor 
sizes
+        addBackend(20002, "192.168.2.2", 9050);
+        Backend be2 = infoService.getBackend(20002);
+        be2.setPipelineExecutorSize(4); // This should be the minimum
+        be2.setAlive(true);
+
+        addBackend(20003, "192.168.2.3", 9050);
+        Backend be3 = infoService.getBackend(20003);
+        be3.setPipelineExecutorSize(12);
+        be3.setAlive(true);
+
+        result = infoService.getMinPipelineExecutorSize();
+        Assert.assertEquals(4, result);
+
+        // Test case 4: Backends with zero and negative pipeline executor 
sizes (should
+        // be ignored)
+        addBackend(20004, "192.168.2.4", 9050);
+        Backend be4 = infoService.getBackend(20004);
+        be4.setPipelineExecutorSize(0); // Should be ignored
+        be4.setAlive(true);
+
+        addBackend(20005, "192.168.2.5", 9050);
+        Backend be5 = infoService.getBackend(20005);
+        be5.setPipelineExecutorSize(-1); // Should be ignored
+        be5.setAlive(true);
+
+        result = infoService.getMinPipelineExecutorSize();
+        Assert.assertEquals(4, result); // Still should be 4 from be2
+
+        // Test case 5: All backends have zero or negative pipeline executor 
sizes
+        be1.setPipelineExecutorSize(0);
+        be2.setPipelineExecutorSize(-5);
+        be3.setPipelineExecutorSize(0);
+
+        result = infoService.getMinPipelineExecutorSize();
+        Assert.assertEquals(1, result); // Should return default value 1
+
+        // Test case 6: Mix of positive and non-positive values
+        be1.setPipelineExecutorSize(16);
+        be2.setPipelineExecutorSize(0); // ignored
+        be3.setPipelineExecutorSize(6); // This should be the minimum
+
+        result = infoService.getMinPipelineExecutorSize();
+        Assert.assertEquals(6, result);
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to