This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 451356030dd branch-4.0: [fix](cloud) refresh event warmup backends 
#62839 (#62887)
451356030dd is described below

commit 451356030ddbd033865b58f5f7fe7c075dd5a21e
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun May 10 10:26:19 2026 +0800

    branch-4.0: [fix](cloud) refresh event warmup backends #62839 (#62887)
    
    Cherry-picked from #62839
    
    Co-authored-by: Xin Liao <[email protected]>
---
 .../org/apache/doris/cloud/CloudWarmUpJob.java     | 17 +++++
 .../org/apache/doris/cloud/CloudWarmUpJobTest.java | 73 ++++++++++++++++++++++
 2 files changed, 90 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
index 90f61fbfb75..593e68ef8d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
@@ -227,6 +227,22 @@ public class CloudWarmUpJob implements Writable {
         }
     }
 
+    void refreshEventDrivenBeToThriftAddress() {
+        if (!isEventDriven()) {
+            return;
+        }
+        Map<Long, String> previousBeToThriftAddress = this.beToThriftAddress;
+        fetchBeToThriftAddress();
+        if (previousBeToThriftAddress != null && 
!previousBeToThriftAddress.equals(this.beToThriftAddress)) {
+            LOG.info("refresh event-driven warm up job {} BE address count 
from {} to {}",
+                    jobId, previousBeToThriftAddress.size(), 
this.beToThriftAddress.size());
+            LOG.debug("refresh event-driven warm up job {} BE addresses from 
{} to {}",
+                    jobId, previousBeToThriftAddress, this.beToThriftAddress);
+        }
+        this.beToClient = null;
+        this.beToAddr = null;
+    }
+
     public CloudWarmUpJob(long jobId, String srcClusterName, String 
dstClusterName,
                                 Map<Long, List<List<Long>>> 
beToTabletIdBatches, JobType jobType) {
         this.jobId = jobId;
@@ -689,6 +705,7 @@ public class CloudWarmUpJob implements Writable {
 
     private void runEventDrivenJob() throws Exception {
         try {
+            refreshEventDrivenBeToThriftAddress();
             initClients();
             for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
                 TWarmUpTabletsRequest request = new TWarmUpTabletsRequest();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java
new file mode 100644
index 00000000000..add89a0c51c
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.CloudWarmUpJob.JobType;
+import org.apache.doris.cloud.CloudWarmUpJob.SyncEvent;
+import org.apache.doris.cloud.CloudWarmUpJob.SyncMode;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.system.Backend;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class CloudWarmUpJobTest {
+    @Test
+    public void testEventDrivenRefreshesSourceBackends() {
+        CloudSystemInfoService cloudSystemInfoService = 
Mockito.mock(CloudSystemInfoService.class);
+        AtomicReference<String> requestedCluster = new AtomicReference<>();
+        
Mockito.when(cloudSystemInfoService.getBackendsByClusterName(Mockito.anyString())).thenAnswer(invocation
 -> {
+            requestedCluster.set(invocation.getArgument(0));
+            Backend backend1 = new Backend(1L, "host1", 9050);
+            backend1.setBePort(9060);
+            Backend backend2 = new Backend(2L, "host2", 9050);
+            backend2.setBePort(9061);
+            return Arrays.asList(backend1, backend2);
+        });
+
+        CloudWarmUpJob warmUpJob = new CloudWarmUpJob.Builder()
+                .setJobId(100L)
+                .setSrcClusterName("src_cluster")
+                .setDstClusterName("dst_cluster")
+                .setJobType(JobType.CLUSTER)
+                .setSyncMode(SyncMode.EVENT_DRIVEN)
+                .setSyncEvent(SyncEvent.LOAD)
+                .build();
+        Map<Long, String> staleBeToThriftAddress = new HashMap<>();
+        staleBeToThriftAddress.put(1L, "host1:9060");
+        warmUpJob.setBeToThriftAddress(staleBeToThriftAddress);
+
+        try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+            
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(cloudSystemInfoService);
+            warmUpJob.refreshEventDrivenBeToThriftAddress();
+        }
+
+        Assert.assertEquals("src_cluster", requestedCluster.get());
+        Assert.assertEquals(2, warmUpJob.getBeToThriftAddress().size());
+        Assert.assertEquals("host1:9060", 
warmUpJob.getBeToThriftAddress().get(1L));
+        Assert.assertEquals("host2:9061", 
warmUpJob.getBeToThriftAddress().get(2L));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to