This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new c31bf414008 branch-4.0: [fix](cloud) fix correct pipeline task number
calculation in cloud mode #57129 (#57262)
c31bf414008 is described below
commit c31bf4140087a33ed0b5e0b91cae825a2707ac9d
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Oct 23 17:26:45 2025 +0800
branch-4.0: [fix](cloud) fix correct pipeline task number calculation in
cloud mode #57129 (#57262)
Cherry-picked from #57129
Co-authored-by: Xin Liao <[email protected]>
---
.../doris/cloud/system/CloudSystemInfoService.java | 11 +-
.../org/apache/doris/system/SystemInfoService.java | 14 +-
.../cloud/system/CloudSystemInfoServiceTest.java | 645 ++++++++++++++++++++-
.../apache/doris/system/SystemInfoServiceTest.java | 61 ++
4 files changed, 720 insertions(+), 11 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 91033a9dd6d..62ab6e3b9ec 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -774,8 +774,15 @@ public class CloudSystemInfoService extends
SystemInfoService {
&& Strings.isNullOrEmpty(clusterName)) {
return 1;
}
-
- return super.getMinPipelineExecutorSize();
+ List<Backend> currentBackends = getBackendsByClusterName(clusterName);
+ if (currentBackends == null || currentBackends.isEmpty()) {
+ return 1;
+ }
+ return currentBackends.stream()
+ .mapToInt(Backend::getPipelineExecutorSize)
+ .filter(size -> size > 0)
+ .min()
+ .orElse(1);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 316a53b1ce0..1e41eb8d501 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -1143,14 +1143,12 @@ public class SystemInfoService {
if (currentBackends.size() == 0) {
return 1;
}
- int minPipelineExecutorSize = Integer.MAX_VALUE;
- for (Backend be : currentBackends) {
- int size = be.getPipelineExecutorSize();
- if (size > 0) {
- minPipelineExecutorSize = Math.min(minPipelineExecutorSize,
size);
- }
- }
- return minPipelineExecutorSize;
+
+ return currentBackends.stream()
+ .mapToInt(Backend::getPipelineExecutorSize)
+ .filter(size -> size > 0)
+ .min()
+ .orElse(1);
}
// CloudSystemInfoService override
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
index 445ec828ba2..85d14585f21 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
@@ -17,8 +17,11 @@
package org.apache.doris.cloud.system;
+import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.catalog.ComputeGroup;
+import org.apache.doris.common.Config;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
@@ -35,7 +38,8 @@ public class CloudSystemInfoServiceTest {
@Before
public void setUp() {
- //infoService = new CloudSystemInfoService();
+ // Enable cloud mode for testing
+ Config.cloud_unique_id = "test_cloud_unique_id";
}
@Test
@@ -348,4 +352,643 @@ public class CloudSystemInfoServiceTest {
res = infoService.isStandByComputeGroup(pcgName3);
Assert.assertFalse(res);
}
+
+ // Test for getMinPipelineExecutorSize method
+ @Test
+ public void testGetMinPipelineExecutorSizeWithEmptyCluster() {
+ infoService = new CloudSystemInfoService();
+ String clusterName = "test_cluster";
+ String clusterId = "test_cluster_id";
+
+ // Mock an empty cluster (no backends)
+ ComputeGroup cg = new ComputeGroup(clusterId, clusterName,
ComputeGroup.ComputeTypeEnum.COMPUTE);
+ infoService.addComputeGroup(clusterId, cg);
+
+ // Set ConnectContext to select the cluster
+ createTestConnectContext(clusterName);
+
+ try {
+ // Since there are no backends in the cluster, should return 1
+ int result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(1, result);
+ } finally {
+ ConnectContext.remove();
+ }
+ }
+
+ @Test
+ public void testGetMinPipelineExecutorSizeWithSingleBackend() {
+ infoService = new CloudSystemInfoService();
+ String clusterName = "test_cluster";
+ String clusterId = "test_cluster_id";
+
+ // Setup cluster
+ ComputeGroup cg = new ComputeGroup(clusterId, clusterName,
ComputeGroup.ComputeTypeEnum.COMPUTE);
+ infoService.addComputeGroup(clusterId, cg);
+
+ // Add a backend with pipeline executor size = 8
+ List<Backend> toAdd = new ArrayList<>();
+ Backend backend = new Backend(Env.getCurrentEnv().getNextId(),
"127.0.0.1", 9050);
+ Map<String, String> tagMap = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ tagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+ backend.setTagMap(tagMap);
+ backend.setPipelineExecutorSize(8);
+ toAdd.add(backend);
+
+ infoService.updateCloudClusterMapNoLock(toAdd, new ArrayList<>());
+
+ // Set ConnectContext to select the cluster
+ createTestConnectContext(clusterName);
+
+ try {
+ // Should return the pipeline executor size of the single backend
+ int result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(8, result);
+ } finally {
+ ConnectContext.remove();
+ }
+ }
+
+ @Test
+ public void testGetMinPipelineExecutorSizeWithMultipleBackends() {
+ infoService = new CloudSystemInfoService();
+ String clusterName = "test_cluster";
+ String clusterId = "test_cluster_id";
+
+ // Setup cluster
+ ComputeGroup cg = new ComputeGroup(clusterId, clusterName,
ComputeGroup.ComputeTypeEnum.COMPUTE);
+ infoService.addComputeGroup(clusterId, cg);
+
+ // Add multiple backends with different pipeline executor sizes
+ List<Backend> toAdd = new ArrayList<>();
+
+ Backend backend1 = new Backend(Env.getCurrentEnv().getNextId(),
"127.0.0.1", 9050);
+ Map<String, String> tagMap1 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap1.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ tagMap1.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+ backend1.setTagMap(tagMap1);
+ backend1.setPipelineExecutorSize(12);
+ toAdd.add(backend1);
+
+ Backend backend2 = new Backend(Env.getCurrentEnv().getNextId(),
"127.0.0.2", 9050);
+ Map<String, String> tagMap2 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap2.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ tagMap2.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+ backend2.setTagMap(tagMap2);
+ backend2.setPipelineExecutorSize(6); // This should be the minimum
+ toAdd.add(backend2);
+
+ Backend backend3 = new Backend(Env.getCurrentEnv().getNextId(),
"127.0.0.3", 9050);
+ Map<String, String> tagMap3 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap3.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ tagMap3.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+ backend3.setTagMap(tagMap3);
+ backend3.setPipelineExecutorSize(10);
+ toAdd.add(backend3);
+
+ infoService.updateCloudClusterMapNoLock(toAdd, new ArrayList<>());
+
+ // Set ConnectContext to select the cluster
+ createTestConnectContext(clusterName);
+
+ try {
+ // Should return the minimum pipeline executor size (6)
+ int result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(6, result);
+ } finally {
+ ConnectContext.remove();
+ }
+ }
+
+ @Test
+ public void testGetMinPipelineExecutorSizeWithZeroSizeBackends() {
+ infoService = new CloudSystemInfoService();
+ String clusterName = "test_cluster";
+ String clusterId = "test_cluster_id";
+
+ // Setup cluster
+ ComputeGroup cg = new ComputeGroup(clusterId, clusterName,
ComputeGroup.ComputeTypeEnum.COMPUTE);
+ infoService.addComputeGroup(clusterId, cg);
+
+ // Add backends with zero and positive pipeline executor sizes
+ List<Backend> toAdd = new ArrayList<>();
+
+ Backend backend1 = new Backend(Env.getCurrentEnv().getNextId(),
"127.0.0.1", 9050);
+ Map<String, String> tagMap1 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap1.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ tagMap1.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+ backend1.setTagMap(tagMap1);
+ backend1.setPipelineExecutorSize(0); // Should be ignored
+ toAdd.add(backend1);
+
+ Backend backend2 = new Backend(Env.getCurrentEnv().getNextId(),
"127.0.0.2", 9050);
+ Map<String, String> tagMap2 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap2.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ tagMap2.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+ backend2.setTagMap(tagMap2);
+ backend2.setPipelineExecutorSize(4); // This should be the minimum
+ toAdd.add(backend2);
+
+ Backend backend3 = new Backend(Env.getCurrentEnv().getNextId(),
"127.0.0.3", 9050);
+ Map<String, String> tagMap3 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap3.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ tagMap3.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+ backend3.setTagMap(tagMap3);
+ backend3.setPipelineExecutorSize(-1); // Should be ignored
+ toAdd.add(backend3);
+
+ infoService.updateCloudClusterMapNoLock(toAdd, new ArrayList<>());
+
+ // Set ConnectContext to select the cluster
+ createTestConnectContext(clusterName);
+
+ try {
+ // Should return the minimum positive pipeline executor size (4)
+ int result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(4, result);
+ } finally {
+ ConnectContext.remove();
+ }
+ }
+
+ @Test
+ public void testGetMinPipelineExecutorSizeWithAllZeroSizeBackends() {
+ infoService = new CloudSystemInfoService();
+ String clusterName = "test_cluster";
+ String clusterId = "test_cluster_id";
+
+ // Setup cluster
+ ComputeGroup cg = new ComputeGroup(clusterId, clusterName,
ComputeGroup.ComputeTypeEnum.COMPUTE);
+ infoService.addComputeGroup(clusterId, cg);
+
+ // Add backends with only zero or negative pipeline executor sizes
+ List<Backend> toAdd = new ArrayList<>();
+
+ Backend backend1 = new Backend(Env.getCurrentEnv().getNextId(),
"127.0.0.1", 9050);
+ Map<String, String> tagMap1 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap1.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ tagMap1.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+ backend1.setTagMap(tagMap1);
+ backend1.setPipelineExecutorSize(0);
+ toAdd.add(backend1);
+
+ Backend backend2 = new Backend(Env.getCurrentEnv().getNextId(),
"127.0.0.2", 9050);
+ Map<String, String> tagMap2 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap2.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ tagMap2.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+ backend2.setTagMap(tagMap2);
+ backend2.setPipelineExecutorSize(-1);
+ toAdd.add(backend2);
+
+ infoService.updateCloudClusterMapNoLock(toAdd, new ArrayList<>());
+
+ // Set ConnectContext to select the cluster
+ createTestConnectContext(clusterName);
+
+ try {
+ // Should return 1 when no valid pipeline executor sizes are
+ // found
+ int result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(1, result);
+ } finally {
+ ConnectContext.remove();
+ }
+ }
+
+ // Test for error handling when ConnectContext has no cluster set
+ @Test
+ public void testGetMinPipelineExecutorSizeWithNoClusterInContext() {
+ infoService = new CloudSystemInfoService();
+
+ // Create ConnectContext but don't set any cluster (empty cluster name)
+ createTestConnectContext(null);
+ try {
+ // Should return 1 when no cluster is set in ConnectContext
+ int result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(1, result);
+ } finally {
+ ConnectContext.remove();
+ }
+ }
+
+ @Test
+ public void testGetMinPipelineExecutorSizeWithMixedValidInvalidBackends() {
+ infoService = new CloudSystemInfoService();
+ String clusterName = "mixed_cluster";
+ String clusterId = "mixed_cluster_id";
+
+ // Setup cluster
+ ComputeGroup cg = new ComputeGroup(clusterId, clusterName,
ComputeGroup.ComputeTypeEnum.COMPUTE);
+ infoService.addComputeGroup(clusterId, cg);
+
+ // Add backends with mixed valid and invalid pipeline executor sizes
+ List<Backend> toAdd = new ArrayList<>();
+
+ // Backend with valid size
+ Backend backend1 = new Backend(Env.getCurrentEnv().getNextId(),
"127.0.0.1", 9050);
+ Map<String, String> tagMap1 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap1.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ tagMap1.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+ backend1.setTagMap(tagMap1);
+ backend1.setPipelineExecutorSize(16);
+ toAdd.add(backend1);
+
+ // Backend with zero size (should be ignored)
+ Backend backend2 = new Backend(Env.getCurrentEnv().getNextId(),
"127.0.0.2", 9050);
+ Map<String, String> tagMap2 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap2.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ tagMap2.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+ backend2.setTagMap(tagMap2);
+ backend2.setPipelineExecutorSize(0);
+ toAdd.add(backend2);
+
+ // Backend with valid size (smaller than first)
+ Backend backend3 = new Backend(Env.getCurrentEnv().getNextId(),
"127.0.0.3", 9050);
+ Map<String, String> tagMap3 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap3.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ tagMap3.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+ backend3.setTagMap(tagMap3);
+ backend3.setPipelineExecutorSize(8); // This should be the minimum
+ toAdd.add(backend3);
+
+ // Backend with negative size (should be ignored)
+ Backend backend4 = new Backend(Env.getCurrentEnv().getNextId(),
"127.0.0.4", 9050);
+ Map<String, String> tagMap4 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap4.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ tagMap4.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+ backend4.setTagMap(tagMap4);
+ backend4.setPipelineExecutorSize(-5);
+ toAdd.add(backend4);
+
+ infoService.updateCloudClusterMapNoLock(toAdd, new ArrayList<>());
+
+ // Set ConnectContext to select the cluster
+ createTestConnectContext(clusterName);
+
+ try {
+ // Should return 8 (minimum valid size)
+ int result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(8, result);
+ } finally {
+ ConnectContext.remove();
+ }
+ }
+
+ @Test
+ public void testGetMinPipelineExecutorSizeWithLargeValues() {
+ infoService = new CloudSystemInfoService();
+ String clusterName = "large_cluster";
+ String clusterId = "large_cluster_id";
+
+ // Setup cluster
+ ComputeGroup cg = new ComputeGroup(clusterId, clusterName,
ComputeGroup.ComputeTypeEnum.COMPUTE);
+ infoService.addComputeGroup(clusterId, cg);
+
+ // Add backends with large pipeline executor sizes
+ List<Backend> toAdd = new ArrayList<>();
+
+ Backend backend1 = new Backend(Env.getCurrentEnv().getNextId(),
"127.0.0.1", 9050);
+ Map<String, String> tagMap1 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap1.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ tagMap1.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+ backend1.setTagMap(tagMap1);
+ backend1.setPipelineExecutorSize(1024);
+ toAdd.add(backend1);
+
+ Backend backend2 = new Backend(Env.getCurrentEnv().getNextId(),
"127.0.0.2", 9050);
+ Map<String, String> tagMap2 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap2.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ tagMap2.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+ backend2.setTagMap(tagMap2);
+ backend2.setPipelineExecutorSize(2048);
+ toAdd.add(backend2);
+
+ Backend backend3 = new Backend(Env.getCurrentEnv().getNextId(),
"127.0.0.3", 9050);
+ Map<String, String> tagMap3 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap3.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ tagMap3.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+ backend3.setTagMap(tagMap3);
+ backend3.setPipelineExecutorSize(512); // This should be the minimum
+ toAdd.add(backend3);
+
+ infoService.updateCloudClusterMapNoLock(toAdd, new ArrayList<>());
+
+ // Set ConnectContext to select the cluster
+ createTestConnectContext(clusterName);
+
+ try {
+ // Should return 512 (minimum among large values)
+ int result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(512, result);
+ } finally {
+ ConnectContext.remove();
+ }
+ }
+
+ @Test
+ public void testGetMinPipelineExecutorSizeConsistency() {
+ infoService = new CloudSystemInfoService();
+ String clusterName = "consistency_cluster";
+ String clusterId = "consistency_cluster_id";
+
+ // Setup cluster
+ ComputeGroup cg = new ComputeGroup(clusterId, clusterName,
ComputeGroup.ComputeTypeEnum.COMPUTE);
+ infoService.addComputeGroup(clusterId, cg);
+
+ // Add backends with same pipeline executor sizes
+ List<Backend> toAdd = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ Backend backend = new Backend(Env.getCurrentEnv().getNextId(),
"127.0.0." + (i + 1), 9050);
+ Map<String, String> tagMap = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap.put(Tag.CLOUD_CLUSTER_NAME, clusterName);
+ tagMap.put(Tag.CLOUD_CLUSTER_ID, clusterId);
+ backend.setTagMap(tagMap);
+ backend.setPipelineExecutorSize(32); // All backends have same size
+ toAdd.add(backend);
+ }
+
+ infoService.updateCloudClusterMapNoLock(toAdd, new ArrayList<>());
+
+ // Set ConnectContext to select the cluster
+ createTestConnectContext(clusterName);
+
+ try {
+ // Should return 32 (consistent across all backends)
+ int result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(32, result);
+ } finally {
+ ConnectContext.remove();
+ }
+ }
+
+ // Test for multiple compute groups - should only use current cluster
+ @Test
+ public void testGetMinPipelineExecutorSizeWithMultipleComputeGroups() {
+ infoService = new CloudSystemInfoService();
+
+ // Setup multiple clusters with different pipeline executor sizes
+ String cluster1Name = "cluster1";
+ String cluster1Id = "cluster1_id";
+ String cluster2Name = "cluster2";
+ String cluster2Id = "cluster2_id";
+
+ // Setup cluster1
+ ComputeGroup cg1 = new ComputeGroup(cluster1Id, cluster1Name,
ComputeGroup.ComputeTypeEnum.COMPUTE);
+ infoService.addComputeGroup(cluster1Id, cg1);
+
+ // Setup cluster2
+ ComputeGroup cg2 = new ComputeGroup(cluster2Id, cluster2Name,
ComputeGroup.ComputeTypeEnum.COMPUTE);
+ infoService.addComputeGroup(cluster2Id, cg2);
+
+ // Add backends to cluster1 with smaller pipeline executor sizes
+ List<Backend> cluster1Backends = new ArrayList<>();
+ Backend cluster1Backend1 = new
Backend(Env.getCurrentEnv().getNextId(), "10.0.1.1", 9050);
+ Map<String, String> tagMap1 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap1.put(Tag.CLOUD_CLUSTER_NAME, cluster1Name);
+ tagMap1.put(Tag.CLOUD_CLUSTER_ID, cluster1Id);
+ cluster1Backend1.setTagMap(tagMap1);
+ cluster1Backend1.setPipelineExecutorSize(4); // Smaller than current
cluster
+ cluster1Backends.add(cluster1Backend1);
+
+ Backend cluster1Backend2 = new
Backend(Env.getCurrentEnv().getNextId(), "10.0.1.2", 9050);
+ Map<String, String> tagMap2 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap2.put(Tag.CLOUD_CLUSTER_NAME, cluster1Name);
+ tagMap2.put(Tag.CLOUD_CLUSTER_ID, cluster1Id);
+ cluster1Backend2.setTagMap(tagMap2);
+ cluster1Backend2.setPipelineExecutorSize(2); // Smallest overall
+ cluster1Backends.add(cluster1Backend2);
+
+ infoService.updateCloudClusterMapNoLock(cluster1Backends, new
ArrayList<>());
+
+ // Add backends to cluster2
+ List<Backend> cluster2Backends = new ArrayList<>();
+ Backend cluster2Backend1 = new
Backend(Env.getCurrentEnv().getNextId(), "10.0.2.1", 9050);
+ Map<String, String> tagMap3 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap3.put(Tag.CLOUD_CLUSTER_NAME, cluster2Name);
+ tagMap3.put(Tag.CLOUD_CLUSTER_ID, cluster2Id);
+ cluster2Backend1.setTagMap(tagMap3);
+ cluster2Backend1.setPipelineExecutorSize(8);
+ cluster2Backends.add(cluster2Backend1);
+
+ Backend cluster2Backend2 = new
Backend(Env.getCurrentEnv().getNextId(), "10.0.2.2", 9050);
+ Map<String, String> tagMap4 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap4.put(Tag.CLOUD_CLUSTER_NAME, cluster2Name);
+ tagMap4.put(Tag.CLOUD_CLUSTER_ID, cluster2Id);
+ cluster2Backend2.setTagMap(tagMap4);
+ cluster2Backend2.setPipelineExecutorSize(12);
+ cluster2Backends.add(cluster2Backend2);
+
+ infoService.updateCloudClusterMapNoLock(cluster2Backends, new
ArrayList<>());
+
+ // Set ConnectContext to cluster2 to test that only cluster2 backends
are used
+ createTestConnectContext(cluster2Name);
+
+ try {
+ // Should return 8 (minimum from current cluster2), not 2 (global
minimum from cluster1)
+ int result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(8, result);
+ } finally {
+ ConnectContext.remove();
+ }
+ }
+
+ @Test
+ public void testGetMinPipelineExecutorSizeWithVirtualComputeGroup() {
+ infoService = new CloudSystemInfoService();
+
+ // Setup virtual and physical clusters
+ String virtualClusterName = "virtual_cluster";
+ String virtualClusterId = "virtual_cluster_id";
+ String physicalClusterName = "physical_cluster";
+ String physicalClusterId = "physical_cluster_id";
+ String otherClusterName = "other_cluster";
+ String otherClusterId = "other_cluster_id";
+
+ // Setup virtual cluster
+ ComputeGroup virtualCg = new ComputeGroup(virtualClusterId,
virtualClusterName,
+ ComputeGroup.ComputeTypeEnum.VIRTUAL);
+ ComputeGroup.Policy policy = new ComputeGroup.Policy();
+ policy.setActiveComputeGroup(physicalClusterName);
+ virtualCg.setPolicy(policy);
+ infoService.addComputeGroup(virtualClusterId, virtualCg);
+
+ // Setup physical cluster
+ ComputeGroup physicalCg = new ComputeGroup(physicalClusterId,
physicalClusterName,
+ ComputeGroup.ComputeTypeEnum.COMPUTE);
+ infoService.addComputeGroup(physicalClusterId, physicalCg);
+
+ // Setup other cluster
+ ComputeGroup otherCg = new ComputeGroup(otherClusterId,
otherClusterName, ComputeGroup.ComputeTypeEnum.COMPUTE);
+ infoService.addComputeGroup(otherClusterId, otherCg);
+
+ // Add backends to physical cluster
+ List<Backend> physicalBackends = new ArrayList<>();
+ Backend physicalBackend1 = new
Backend(Env.getCurrentEnv().getNextId(), "172.16.1.1", 9050);
+ Map<String, String> tagMap1 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap1.put(Tag.CLOUD_CLUSTER_NAME, physicalClusterName);
+ tagMap1.put(Tag.CLOUD_CLUSTER_ID, physicalClusterId);
+ physicalBackend1.setTagMap(tagMap1);
+ physicalBackend1.setPipelineExecutorSize(32); // Min in physical
cluster
+ physicalBackends.add(physicalBackend1);
+
+ Backend physicalBackend2 = new
Backend(Env.getCurrentEnv().getNextId(), "172.16.1.2", 9050);
+ Map<String, String> tagMap2 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap2.put(Tag.CLOUD_CLUSTER_NAME, physicalClusterName);
+ tagMap2.put(Tag.CLOUD_CLUSTER_ID, physicalClusterId);
+ physicalBackend2.setTagMap(tagMap2);
+ physicalBackend2.setPipelineExecutorSize(48);
+ physicalBackends.add(physicalBackend2);
+
+ infoService.updateCloudClusterMapNoLock(physicalBackends, new
ArrayList<>());
+
+ // Add backends to other cluster with smaller values
+ List<Backend> otherBackends = new ArrayList<>();
+ Backend otherBackend = new Backend(Env.getCurrentEnv().getNextId(),
"10.0.3.1", 9050);
+ Map<String, String> tagMap3 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap3.put(Tag.CLOUD_CLUSTER_NAME, otherClusterName);
+ tagMap3.put(Tag.CLOUD_CLUSTER_ID, otherClusterId);
+ otherBackend.setTagMap(tagMap3);
+ otherBackend.setPipelineExecutorSize(8); // Smaller than virtual
cluster's physical cluster
+ otherBackends.add(otherBackend);
+
+ infoService.updateCloudClusterMapNoLock(otherBackends, new
ArrayList<>());
+
+ // Create ConnectContext and set it to select virtual cluster
+ ConnectContext ctx = createTestConnectContext(virtualClusterName);
+
+ try {
+ // Should return 32 (minimum from virtual cluster's physical
cluster), not 8
+ // (from other cluster)
+ int result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(32, result);
+
+ // Switch to other cluster
+ ctx.setCloudCluster(otherClusterName);
+
+ // Should return 8 (from other cluster)
+ result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(8, result);
+
+ } finally {
+ // Clean up ConnectContext
+ ConnectContext.remove();
+ }
+ }
+
+ @Test
+ public void testGetMinPipelineExecutorSizeWithConnectContextNoCluster() {
+ infoService = new CloudSystemInfoService();
+
+ // Create ConnectContext but don't set any cluster
+ createTestConnectContext(null); // null to test no cluster scenario
+
+ try {
+ // Should return 1 because no cluster is set (will catch
AnalysisException)
+ int result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(1, result);
+
+ } finally {
+ // Clean up ConnectContext
+ ConnectContext.remove();
+ }
+ }
+
+ // Test using real ConnectContext to select compute group
+ @Test
+ public void testGetMinPipelineExecutorSizeWithConnectContext() {
+ infoService = new CloudSystemInfoService();
+
+ // Setup multiple clusters with different pipeline executor sizes
+ String cluster1Name = "ctx_cluster1";
+ String cluster1Id = "ctx_cluster1_id";
+ String cluster2Name = "ctx_cluster2";
+ String cluster2Id = "ctx_cluster2_id";
+
+ // Setup cluster1
+ ComputeGroup cg1 = new ComputeGroup(cluster1Id, cluster1Name,
ComputeGroup.ComputeTypeEnum.COMPUTE);
+ infoService.addComputeGroup(cluster1Id, cg1);
+
+ // Setup cluster2
+ ComputeGroup cg2 = new ComputeGroup(cluster2Id, cluster2Name,
ComputeGroup.ComputeTypeEnum.COMPUTE);
+ infoService.addComputeGroup(cluster2Id, cg2);
+
+ // Add backends to cluster1 with smaller pipeline executor sizes
+ List<Backend> cluster1Backends = new ArrayList<>();
+ Backend cluster1Backend1 = new
Backend(Env.getCurrentEnv().getNextId(), "10.0.1.1", 9050);
+ Map<String, String> tagMap1 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap1.put(Tag.CLOUD_CLUSTER_NAME, cluster1Name);
+ tagMap1.put(Tag.CLOUD_CLUSTER_ID, cluster1Id);
+ cluster1Backend1.setTagMap(tagMap1);
+ cluster1Backend1.setPipelineExecutorSize(4);
+ cluster1Backends.add(cluster1Backend1);
+
+ Backend cluster1Backend2 = new
Backend(Env.getCurrentEnv().getNextId(), "10.0.1.2", 9050);
+ Map<String, String> tagMap2 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap2.put(Tag.CLOUD_CLUSTER_NAME, cluster1Name);
+ tagMap2.put(Tag.CLOUD_CLUSTER_ID, cluster1Id);
+ cluster1Backend2.setTagMap(tagMap2);
+ cluster1Backend2.setPipelineExecutorSize(2); // Smallest in cluster1
+ cluster1Backends.add(cluster1Backend2);
+
+ infoService.updateCloudClusterMapNoLock(cluster1Backends, new
ArrayList<>());
+
+ // Add backends to cluster2 with larger pipeline executor sizes
+ List<Backend> cluster2Backends = new ArrayList<>();
+ Backend cluster2Backend1 = new
Backend(Env.getCurrentEnv().getNextId(), "10.0.2.1", 9050);
+ Map<String, String> tagMap3 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap3.put(Tag.CLOUD_CLUSTER_NAME, cluster2Name);
+ tagMap3.put(Tag.CLOUD_CLUSTER_ID, cluster2Id);
+ cluster2Backend1.setTagMap(tagMap3);
+ cluster2Backend1.setPipelineExecutorSize(16); // Smallest in cluster2
+ cluster2Backends.add(cluster2Backend1);
+
+ Backend cluster2Backend2 = new
Backend(Env.getCurrentEnv().getNextId(), "10.0.2.2", 9050);
+ Map<String, String> tagMap4 = Tag.DEFAULT_BACKEND_TAG.toMap();
+ tagMap4.put(Tag.CLOUD_CLUSTER_NAME, cluster2Name);
+ tagMap4.put(Tag.CLOUD_CLUSTER_ID, cluster2Id);
+ cluster2Backend2.setTagMap(tagMap4);
+ cluster2Backend2.setPipelineExecutorSize(24);
+ cluster2Backends.add(cluster2Backend2);
+
+ infoService.updateCloudClusterMapNoLock(cluster2Backends, new
ArrayList<>());
+
+ // Create ConnectContext and set it to select cluster1
+ ConnectContext ctx = createTestConnectContext(cluster1Name);
+
+ try {
+ // Should return 2 (minimum from cluster1), not 16 (minimum from
cluster2)
+ int result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(2, result);
+
+ // Now switch to cluster2
+ ctx.setCloudCluster(cluster2Name);
+
+ // Should return 16 (minimum from cluster2), not 2 (minimum from
cluster1)
+ result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(16, result);
+ } finally {
+ // Clean up ConnectContext
+ ConnectContext.remove();
+ }
+ }
+
+ /**
+ * Helper method to create a test ConnectContext with specific cluster name
+ */
+ private ConnectContext createTestConnectContext(String clusterName) {
+ try {
+ ConnectContext ctx = new ConnectContext();
+ ctx.setCurrentUserIdentity(UserIdentity.ROOT);
+ ctx.setRemoteIP("127.0.0.1");
+ ctx.setEnv(Env.getCurrentEnv());
+ if (clusterName != null) {
+ ctx.setCloudCluster(clusterName);
+ }
+ ctx.setThreadLocalInfo();
+ return ctx;
+ } catch (Throwable t) {
+ throw new IllegalStateException("Cannot create test connect
context", t);
+ }
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
index 62ade50c919..0ad09096626 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -444,4 +444,65 @@ public class SystemInfoServiceTest {
be.setTagMap(tagMap);
}
+ @Test
+ public void testGetMinPipelineExecutorSize() {
+ // Test case 1: No backends
+ int result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(1, result);
+
+ // Test case 2: Single backend with pipeline executor size = 8
+ addBackend(20001, "192.168.2.1", 9050);
+ Backend be1 = infoService.getBackend(20001);
+ be1.setPipelineExecutorSize(8);
+ be1.setAlive(true);
+
+ result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(8, result);
+
+ // Test case 3: Multiple backends with different pipeline executor
sizes
+ addBackend(20002, "192.168.2.2", 9050);
+ Backend be2 = infoService.getBackend(20002);
+ be2.setPipelineExecutorSize(4); // This should be the minimum
+ be2.setAlive(true);
+
+ addBackend(20003, "192.168.2.3", 9050);
+ Backend be3 = infoService.getBackend(20003);
+ be3.setPipelineExecutorSize(12);
+ be3.setAlive(true);
+
+ result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(4, result);
+
+ // Test case 4: Backends with zero and negative pipeline executor
sizes (should
+ // be ignored)
+ addBackend(20004, "192.168.2.4", 9050);
+ Backend be4 = infoService.getBackend(20004);
+ be4.setPipelineExecutorSize(0); // Should be ignored
+ be4.setAlive(true);
+
+ addBackend(20005, "192.168.2.5", 9050);
+ Backend be5 = infoService.getBackend(20005);
+ be5.setPipelineExecutorSize(-1); // Should be ignored
+ be5.setAlive(true);
+
+ result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(4, result); // Still should be 4 from be2
+
+ // Test case 5: All backends have zero or negative pipeline executor
sizes
+ be1.setPipelineExecutorSize(0);
+ be2.setPipelineExecutorSize(-5);
+ be3.setPipelineExecutorSize(0);
+
+ result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(1, result); // Should return default value 1
+
+ // Test case 6: Mix of positive and non-positive values
+ be1.setPipelineExecutorSize(16);
+ be2.setPipelineExecutorSize(0); // ignored
+ be3.setPipelineExecutorSize(6); // This should be the minimum
+
+ result = infoService.getMinPipelineExecutorSize();
+ Assert.assertEquals(6, result);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]