This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 710d129e828 branch-3.1: [fix](cloud)Fix modify the cluster public and 
private network causing the node to be temporarily offline #52294 (#52656)
710d129e828 is described below

commit 710d129e8281562b40848b703d8863295c2b85da
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jul 2 18:08:44 2025 +0800

    branch-3.1: [fix](cloud)Fix modify the cluster public and private network 
causing the node to be temporarily offline #52294 (#52656)
    
    Cherry-picked from #52294
    
    Co-authored-by: deardeng <[email protected]>
---
 .../doris/cloud/catalog/CloudClusterChecker.java   |  42 ++++++---
 .../doris/regression/suite/SuiteCluster.groovy     |   1 +
 .../multi_cluster/test_change_node_net.groovy      | 102 +++++++++++++++++++++
 3 files changed, 134 insertions(+), 11 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
index 5d404cdc8a3..ca81b165cb9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudClusterChecker.java
@@ -155,7 +155,7 @@ public class CloudClusterChecker extends MasterDaemon {
         );
     }
 
-    private void updateStatus(List<Backend> currentBes, List<Cloud.NodeInfoPB> 
expectedBes) {
+    private void updateStatus(List<Backend> currentBes, List<Cloud.NodeInfoPB> 
expectedBes, ClusterPB remoteClusterPb) {
         Map<String, Backend> currentMap = new HashMap<>();
         for (Backend be : currentBes) {
             String endpoint = be.getHost() + ":" + be.getHeartbeatPort();
@@ -200,6 +200,33 @@ public class CloudClusterChecker extends MasterDaemon {
                 // edit log
                 Env.getCurrentEnv().getEditLog().logBackendStateChange(be);
             }
+            updateIfComputeNodeEndpointChanged(remoteClusterPb, be);
+        }
+    }
+
+    private void updateIfComputeNodeEndpointChanged(ClusterPB remoteClusterPb, 
Backend be) {
+        // check PublicEndpoint、PrivateEndpoint is changed?
+        boolean netChanged = false;
+        String remotePublicEndpoint = remoteClusterPb.getPublicEndpoint();
+        String localPublicEndpoint = 
be.getTagMap().get(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT);
+        if (!localPublicEndpoint.equals(remotePublicEndpoint)) {
+            LOG.info("be {} has changed public_endpoint from {} to {}",
+                    be, localPublicEndpoint, remotePublicEndpoint);
+            be.getTagMap().put(Tag.CLOUD_CLUSTER_PUBLIC_ENDPOINT, 
remotePublicEndpoint);
+            netChanged = true;
+        }
+
+        String remotePrivateEndpoint = remoteClusterPb.getPrivateEndpoint();
+        String localPrivateEndpoint = 
be.getTagMap().get(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT);
+        if (!localPrivateEndpoint.equals(remotePrivateEndpoint)) {
+            LOG.info("be {} has changed private_endpoint from {} to {}",
+                    be, localPrivateEndpoint, remotePrivateEndpoint);
+            be.getTagMap().put(Tag.CLOUD_CLUSTER_PRIVATE_ENDPOINT, 
remotePrivateEndpoint);
+            netChanged = true;
+        }
+        if (netChanged) {
+            // edit log
+            Env.getCurrentEnv().getEditLog().logBackendStateChange(be);
         }
     }
 
@@ -278,13 +305,12 @@ public class CloudClusterChecker extends MasterDaemon {
             LOG.info("get cloud cluster, clusterId={} local nodes={} remote 
nodes={}", cid,
                     currentBeEndpoints, remoteBeEndpoints);
 
-            updateStatus(currentBes, expectedBes);
+            updateStatus(currentBes, expectedBes, 
remoteClusterIdToPB.get(cid));
 
             diffNodes(toAdd, toDel, () -> {
                 Map<String, Backend> currentMap = new HashMap<>();
                 for (Backend be : currentBes) {
-                    String endpoint = be.getHost() + ":" + 
be.getHeartbeatPort()
-                            + be.getCloudPublicEndpoint() + 
be.getCloudPrivateEndpoint();
+                    String endpoint = be.getHost() + ":" + 
be.getHeartbeatPort();
                     currentMap.put(endpoint, be);
                 }
                 return currentMap;
@@ -296,9 +322,7 @@ public class CloudClusterChecker extends MasterDaemon {
                         LOG.warn("cant get valid add from ms {}", node);
                         continue;
                     }
-                    String endpoint = host + ":" + node.getHeartbeatPort()
-                            + remoteClusterIdToPB.get(cid).getPublicEndpoint()
-                            + 
remoteClusterIdToPB.get(cid).getPrivateEndpoint();
+                    String endpoint = host + ":" + node.getHeartbeatPort();
                     Backend b = new Backend(Env.getCurrentEnv().getNextId(), 
host, node.getHeartbeatPort());
                     if (node.hasIsSmoothUpgrade()) {
                         b.setSmoothUpgradeDst(node.getIsSmoothUpgrade());
@@ -463,10 +487,6 @@ public class CloudClusterChecker extends MasterDaemon {
                     continue;
                 }
                 Cloud.NodeInfoPB.NodeType type = node.getNodeType();
-                // ATTN: just allow to add follower or observer
-                if (Cloud.NodeInfoPB.NodeType.FE_MASTER.equals(type)) {
-                    LOG.warn("impossible !!!,  get fe node {} type equal 
master from ms", node);
-                }
                 FrontendNodeType role = type == 
Cloud.NodeInfoPB.NodeType.FE_OBSERVER
                         ? FrontendNodeType.OBSERVER :  
FrontendNodeType.FOLLOWER;
                 Frontend fe = new Frontend(role,
diff --git 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
index ec801c47c01..9b948a3c303 100644
--- 
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
+++ 
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteCluster.groovy
@@ -59,6 +59,7 @@ class ClusterOptions {
         'max_sys_mem_available_low_water_mark_bytes=0', //no check mem 
available memory
         'report_disk_state_interval_seconds=2',
         'report_random_wait=false',
+        'enable_java_support=false',
     ]
 
     List<String> msConfigs = []
diff --git 
a/regression-test/suites/cloud_p0/multi_cluster/test_change_node_net.groovy 
b/regression-test/suites/cloud_p0/multi_cluster/test_change_node_net.groovy
new file mode 100644
index 00000000000..3cc6991fa20
--- /dev/null
+++ b/regression-test/suites/cloud_p0/multi_cluster/test_change_node_net.groovy
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+import groovy.json.JsonOutput
+
+suite('test_change_node_net', 'multi_cluster,docker') {
+    def options = new ClusterOptions()
+    options.feConfigs += [
+        'cloud_cluster_check_interval_second=5',
+    ]
+    options.cloudMode = true
+
+    def token = "greedisgood9999"
+    def update_cluster_endpoint_api = { msHttpPort, request_body, check_func ->
+        httpTest {
+            endpoint msHttpPort
+            uri "/MetaService/http/update_cluster_endpoint?token=$token"
+            body request_body
+            check check_func
+        }
+    }
+
+    def showClusterBackends = { clusterName ->
+        def bes = sql_return_maparray "show backends"
+        def clusterBes = bes.findAll { be -> be.Tag.contains(clusterName) }
+        def backendMap = clusterBes.collectEntries { be ->
+            [(be.BackendId): be.Tag]
+        }
+        logger.info("Collected BackendId and Tag map: {}", backendMap)
+        backendMap
+    }
+
+    docker(options) {
+        def ms = cluster.getAllMetaservices().get(0)
+        def msHttpPort = ms.host + ":" + ms.httpPort
+        logger.info("ms addr={}, port={}, ms endpoint={}", ms.host, 
ms.httpPort, msHttpPort)
+
+        def clusterName = "newcluster1"
+        // 添加一个新的cluster add_new_cluster
+        cluster.addBackend(3, clusterName)
+       
+        def result = sql """show clusters"""
+        logger.info("show cluster1 : {}", result)
+
+        def beforeBackendMap = showClusterBackends.call(clusterName)
+
+        def tag = beforeBackendMap.entrySet().iterator().next().Value
+        assertNotNull(tag)
+        def jsonSlurper = new JsonSlurper()
+        def jsonObject = jsonSlurper.parseText(tag)
+        def cloudUniqueId = jsonObject.cloud_unique_id
+        def clusterId = jsonObject.compute_group_id
+        def before_public_endpoint = jsonObject.public_endpoint
+        def after_private_endpoint = jsonObject.private_endpoint
+
+
+        def changeCluster = [cluster_id: "${clusterId}", public_endpoint: 
"test_public_endpoint", private_endpoint: "test_private_endpoint"]
+        def updateClusterEndpointBody = [cloud_unique_id: "${cloudUniqueId}", 
cluster: changeCluster]
+        def jsonOutput = new JsonOutput()
+        def updateClusterEndpointJson = 
jsonOutput.toJson(updateClusterEndpointBody)
+
+        update_cluster_endpoint_api.call(msHttpPort, 
updateClusterEndpointJson) {
+            respCode, body ->
+                def json = parseJson(body)
+                log.info("update cluster endpoint result: ${body} ${respCode} 
${json}".toString())
+        }
+
+        def futrue = thread {
+            // check 15s
+            for (def i = 0; i < 15; i++) {
+                def afterBackendMap = showClusterBackends.call(clusterName)
+                if (i > 5) {
+                    // cloud_cluster_check_interval_second = 5
+                    afterBackendMap.each { key, value ->
+                        assert value.contains("test_public_endpoint") : "Value 
for key ${key} does not contain 'test_public_endpoint'"
+                        assert value.contains("test_private_endpoint") : 
"Value for key ${key} does not contain 'test_private_endpoint'"
+                    }
+                }
+                // check beid not changed
+                assertEquals(afterBackendMap.keySet(), 
beforeBackendMap.keySet())
+                sleep(1 * 1000)
+            }
+        }
+        futrue.get()
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to