This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 451356030dd branch-4.0: [fix](cloud) refresh event warmup backends
#62839 (#62887)
451356030dd is described below
commit 451356030ddbd033865b58f5f7fe7c075dd5a21e
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun May 10 10:26:19 2026 +0800
branch-4.0: [fix](cloud) refresh event warmup backends #62839 (#62887)
Cherry-picked from #62839
Co-authored-by: Xin Liao <[email protected]>
---
.../org/apache/doris/cloud/CloudWarmUpJob.java | 17 +++++
.../org/apache/doris/cloud/CloudWarmUpJobTest.java | 73 ++++++++++++++++++++++
2 files changed, 90 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
index 90f61fbfb75..593e68ef8d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
@@ -227,6 +227,22 @@ public class CloudWarmUpJob implements Writable {
}
}
+ void refreshEventDrivenBeToThriftAddress() {
+ if (!isEventDriven()) {
+ return;
+ }
+ Map<Long, String> previousBeToThriftAddress = this.beToThriftAddress;
+ fetchBeToThriftAddress();
+ if (previousBeToThriftAddress != null &&
!previousBeToThriftAddress.equals(this.beToThriftAddress)) {
+ LOG.info("refresh event-driven warm up job {} BE address count
from {} to {}",
+ jobId, previousBeToThriftAddress.size(),
this.beToThriftAddress.size());
+ LOG.debug("refresh event-driven warm up job {} BE addresses from
{} to {}",
+ jobId, previousBeToThriftAddress, this.beToThriftAddress);
+ }
+ this.beToClient = null;
+ this.beToAddr = null;
+ }
+
public CloudWarmUpJob(long jobId, String srcClusterName, String
dstClusterName,
Map<Long, List<List<Long>>>
beToTabletIdBatches, JobType jobType) {
this.jobId = jobId;
@@ -689,6 +705,7 @@ public class CloudWarmUpJob implements Writable {
private void runEventDrivenJob() throws Exception {
try {
+ refreshEventDrivenBeToThriftAddress();
initClients();
for (Map.Entry<Long, Client> entry : beToClient.entrySet()) {
TWarmUpTabletsRequest request = new TWarmUpTabletsRequest();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java
new file mode 100644
index 00000000000..add89a0c51c
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cloud;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.CloudWarmUpJob.JobType;
+import org.apache.doris.cloud.CloudWarmUpJob.SyncEvent;
+import org.apache.doris.cloud.CloudWarmUpJob.SyncMode;
+import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.system.Backend;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class CloudWarmUpJobTest {
+ @Test
+ public void testEventDrivenRefreshesSourceBackends() {
+ CloudSystemInfoService cloudSystemInfoService =
Mockito.mock(CloudSystemInfoService.class);
+ AtomicReference<String> requestedCluster = new AtomicReference<>();
+
Mockito.when(cloudSystemInfoService.getBackendsByClusterName(Mockito.anyString())).thenAnswer(invocation
-> {
+ requestedCluster.set(invocation.getArgument(0));
+ Backend backend1 = new Backend(1L, "host1", 9050);
+ backend1.setBePort(9060);
+ Backend backend2 = new Backend(2L, "host2", 9050);
+ backend2.setBePort(9061);
+ return Arrays.asList(backend1, backend2);
+ });
+
+ CloudWarmUpJob warmUpJob = new CloudWarmUpJob.Builder()
+ .setJobId(100L)
+ .setSrcClusterName("src_cluster")
+ .setDstClusterName("dst_cluster")
+ .setJobType(JobType.CLUSTER)
+ .setSyncMode(SyncMode.EVENT_DRIVEN)
+ .setSyncEvent(SyncEvent.LOAD)
+ .build();
+ Map<Long, String> staleBeToThriftAddress = new HashMap<>();
+ staleBeToThriftAddress.put(1L, "host1:9060");
+ warmUpJob.setBeToThriftAddress(staleBeToThriftAddress);
+
+ try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(cloudSystemInfoService);
+ warmUpJob.refreshEventDrivenBeToThriftAddress();
+ }
+
+ Assert.assertEquals("src_cluster", requestedCluster.get());
+ Assert.assertEquals(2, warmUpJob.getBeToThriftAddress().size());
+ Assert.assertEquals("host1:9060",
warmUpJob.getBeToThriftAddress().get(1L));
+ Assert.assertEquals("host2:9061",
warmUpJob.getBeToThriftAddress().get(2L));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]