This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 70439e70580 branch-4.1: [fix](fe) Clear warm-up error message after 
successful retry #64813 (#64870)
70439e70580 is described below

commit 70439e70580a725e72055085641cd951a89576cc
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jun 26 14:48:51 2026 +0800

    branch-4.1: [fix](fe) Clear warm-up error message after successful retry 
#64813 (#64870)
    
    Cherry-picked from #64813
    
    Co-authored-by: bobhan1 <[email protected]>
---
 .../org/apache/doris/cloud/CloudWarmUpJob.java     |  14 ++
 .../org/apache/doris/cloud/CloudWarmUpJobTest.java | 232 +++++++++++++++++++++
 2 files changed, 246 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
index c4f47ed9269..1d9ee1a821c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
@@ -524,6 +524,14 @@ public class CloudWarmUpJob implements Writable {
         this.errMsg = msg;
     }
 
+    private boolean resetErrMsg() {
+        if (StringUtils.isEmpty(errMsg)) {
+            return false;
+        }
+        this.errMsg = "";
+        return true;
+    }
+
     public void setFinishedTimeMs(long timeMs) {
         this.finishedTimeMs = timeMs;
     }
@@ -969,6 +977,7 @@ public class CloudWarmUpJob implements Writable {
     }
 
     private void runEventDrivenJob() throws Exception {
+        boolean hasError = false;
         try {
             refreshEventDrivenBeToThriftAddress();
             initClients();
@@ -990,6 +999,7 @@ public class CloudWarmUpJob implements Writable {
                         hasTableFilter() ? getCurrentTableIdNames().size() : 
"all");
                 TWarmUpTabletsResponse response = 
entry.getValue().warmUpTablets(request);
                 if (response.getStatus().getStatusCode() != TStatusCode.OK) {
+                    hasError = true;
                     if (!response.getStatus().getErrorMsgs().isEmpty()) {
                         errMsg = response.getStatus().getErrorMsgs().get(0);
                     }
@@ -997,6 +1007,9 @@ public class CloudWarmUpJob implements Writable {
                             jobId, syncEvent, errMsg);
                 }
             }
+            if (!hasError && resetErrMsg()) {
+                Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(this);
+            }
         } catch (Exception e) {
             errMsg = e.getMessage();
             LOG.warn("send warm up request job_id={} failed with exception {}",
@@ -1105,6 +1118,7 @@ public class CloudWarmUpJob implements Writable {
                     }
                     if (allBatchesDone) {
                         clearJobOnBEs();
+                        resetErrMsg();
                         this.finishedTimeMs = System.currentTimeMillis();
                         if (this.isPeriodic()) {
                             // wait for next schedule
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java
index 3da09461138..800dd89c6b2 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java
@@ -22,12 +22,17 @@ import org.apache.doris.cloud.CloudWarmUpJob.JobState;
 import org.apache.doris.cloud.CloudWarmUpJob.JobType;
 import org.apache.doris.cloud.CloudWarmUpJob.SyncEvent;
 import org.apache.doris.cloud.CloudWarmUpJob.SyncMode;
+import org.apache.doris.cloud.catalog.CloudEnv;
 import org.apache.doris.cloud.system.CloudSystemInfoService;
 import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.GenericPool;
+import org.apache.doris.persist.EditLog;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.BackendService;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
 import org.apache.doris.thrift.TWarmUpTabletsRequest;
 import org.apache.doris.thrift.TWarmUpTabletsRequestType;
 import org.apache.doris.thrift.TWarmUpTabletsResponse;
@@ -40,20 +45,31 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.lang.reflect.Method;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class CloudWarmUpJobTest {
+    private static final int COL_ERR_MSG = 11;
+
     private GenericPool<BackendService.Client> originalBackendPool;
     private GenericPool<BackendService.Client> mockBackendPool;
+    private boolean originalRunningUnitTest;
 
     @SuppressWarnings("unchecked")
     @Before
     public void setUp() {
+        originalRunningUnitTest = FeConstants.runningUnitTest;
+        FeConstants.runningUnitTest = true;
         originalBackendPool = ClientPool.backendPool;
         mockBackendPool = Mockito.mock(GenericPool.class);
         ClientPool.backendPool = mockBackendPool;
@@ -62,6 +78,7 @@ public class CloudWarmUpJobTest {
     @After
     public void tearDown() {
         ClientPool.backendPool = originalBackendPool;
+        FeConstants.runningUnitTest = originalRunningUnitTest;
     }
 
     @Test
@@ -140,6 +157,174 @@ public class CloudWarmUpJobTest {
         Mockito.verify(mockBackendPool).invalidateObject(unavailableAddress, 
null);
     }
 
+    @Test
+    public void testPendingRetryKeepsErrMsgWhenJobStarts() throws Exception {
+        CloudWarmUpJob job = new CloudWarmUpJob.Builder()
+                .setJobId(200L)
+                .setSrcClusterName("source_cluster")
+                .setDstClusterName("target_cluster")
+                .setJobType(JobType.CLUSTER)
+                .setSyncMode(SyncMode.PERIODIC)
+                .setSyncInterval(60L)
+                .build();
+        job.setErrMsg("previous failure");
+
+        CloudEnv cloudEnv = Mockito.mock(CloudEnv.class);
+        CacheHotspotManager cacheHotspotManager = 
Mockito.mock(CacheHotspotManager.class);
+        EditLog editLog = Mockito.mock(EditLog.class);
+        
Mockito.when(cloudEnv.getCacheHotspotMgr()).thenReturn(cacheHotspotManager);
+        Mockito.when(cloudEnv.getEditLog()).thenReturn(editLog);
+        
Mockito.when(cacheHotspotManager.tryRegisterRunningJob(job)).thenReturn(true);
+
+        try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+            mockedEnv.when(Env::getCurrentEnv).thenReturn(cloudEnv);
+            invokeRunPendingJob(job);
+        }
+
+        Assert.assertEquals(JobState.RUNNING, job.getJobState());
+        Assert.assertEquals("previous failure", 
job.getJobInfo(null).get(COL_ERR_MSG));
+        Mockito.verify(editLog).logModifyCloudWarmUpJob(job);
+    }
+
+    @Test
+    public void testEventDrivenSuccessfulRetryClearsErrMsg() throws Exception {
+        CloudSystemInfoService cloudSystemInfoService = 
Mockito.mock(CloudSystemInfoService.class);
+        Backend backend = new Backend(1L, "host1", 9050);
+        backend.setBePort(9060);
+        
Mockito.when(cloudSystemInfoService.getBackendsByClusterName("source_cluster"))
+                .thenReturn(Arrays.asList(backend));
+
+        TNetworkAddress address = new TNetworkAddress("host1", 9060);
+        BackendService.Client client = 
Mockito.mock(BackendService.Client.class);
+        Mockito.when(mockBackendPool.borrowObject(address)).thenReturn(client);
+        
Mockito.when(client.warmUpTablets(Mockito.any(TWarmUpTabletsRequest.class)))
+                .thenReturn(okWarmUpResponse());
+        CloudEnv cloudEnv = Mockito.mock(CloudEnv.class);
+        EditLog editLog = Mockito.mock(EditLog.class);
+        Mockito.when(cloudEnv.getEditLog()).thenReturn(editLog);
+
+        CloudWarmUpJob job = new CloudWarmUpJob.Builder()
+                .setJobId(201L)
+                .setSrcClusterName("source_cluster")
+                .setDstClusterName("target_cluster")
+                .setJobType(JobType.CLUSTER)
+                .setSyncMode(SyncMode.EVENT_DRIVEN)
+                .setSyncEvent(SyncEvent.LOAD)
+                .build();
+        job.setJobState(JobState.RUNNING);
+        setStartTimeMs(job, System.currentTimeMillis());
+        job.setErrMsg("previous failure");
+
+        boolean runningUnitTest = FeConstants.runningUnitTest;
+        FeConstants.runningUnitTest = false;
+        try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+            
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(cloudSystemInfoService);
+            mockedEnv.when(Env::getCurrentEnv).thenReturn(cloudEnv);
+            job.run();
+            job.run();
+        } finally {
+            FeConstants.runningUnitTest = runningUnitTest;
+        }
+
+        Assert.assertEquals("", job.getJobInfo(null).get(COL_ERR_MSG));
+        ArgumentCaptor<CloudWarmUpJob> jobCaptor = 
ArgumentCaptor.forClass(CloudWarmUpJob.class);
+        Mockito.verify(editLog, 
Mockito.times(1)).logModifyCloudWarmUpJob(jobCaptor.capture());
+        CloudWarmUpJob replayedJob = copyBySerialization(jobCaptor.getValue());
+        Assert.assertEquals("", replayedJob.getJobInfo(null).get(COL_ERR_MSG));
+        Mockito.verify(client, 
Mockito.times(2)).warmUpTablets(Mockito.any(TWarmUpTabletsRequest.class));
+        Mockito.verify(mockBackendPool, 
Mockito.times(2)).returnObject(address, client);
+    }
+
+    @Test
+    public void testEventDrivenFailedRetryKeepsErrMsg() throws Exception {
+        CloudSystemInfoService cloudSystemInfoService = 
Mockito.mock(CloudSystemInfoService.class);
+        Backend backend = new Backend(1L, "host1", 9050);
+        backend.setBePort(9060);
+        
Mockito.when(cloudSystemInfoService.getBackendsByClusterName("source_cluster"))
+                .thenReturn(Arrays.asList(backend));
+
+        TNetworkAddress address = new TNetworkAddress("host1", 9060);
+        BackendService.Client client = 
Mockito.mock(BackendService.Client.class);
+        Mockito.when(mockBackendPool.borrowObject(address)).thenReturn(client);
+        
Mockito.when(client.warmUpTablets(Mockito.any(TWarmUpTabletsRequest.class)))
+                .thenReturn(failedWarmUpResponse());
+
+        CloudWarmUpJob job = new CloudWarmUpJob.Builder()
+                .setJobId(203L)
+                .setSrcClusterName("source_cluster")
+                .setDstClusterName("target_cluster")
+                .setJobType(JobType.CLUSTER)
+                .setSyncMode(SyncMode.EVENT_DRIVEN)
+                .setSyncEvent(SyncEvent.LOAD)
+                .build();
+        job.setJobState(JobState.RUNNING);
+        setStartTimeMs(job, System.currentTimeMillis());
+        job.setErrMsg("previous failure");
+
+        boolean runningUnitTest = FeConstants.runningUnitTest;
+        FeConstants.runningUnitTest = false;
+        try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+            
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(cloudSystemInfoService);
+            job.run();
+        } finally {
+            FeConstants.runningUnitTest = runningUnitTest;
+        }
+
+        Assert.assertEquals("previous failure", 
job.getJobInfo(null).get(COL_ERR_MSG));
+        
Mockito.verify(client).warmUpTablets(Mockito.any(TWarmUpTabletsRequest.class));
+        Mockito.verify(mockBackendPool).returnObject(address, client);
+    }
+
+    @Test
+    public void testRunningRetryClearsErrMsgWhenJobFinishes() throws Exception 
{
+        TNetworkAddress address = new TNetworkAddress("127.0.0.1", 9050);
+        BackendService.Client client = 
Mockito.mock(BackendService.Client.class);
+        Mockito.when(mockBackendPool.borrowObject(address)).thenReturn(client);
+        
Mockito.when(client.warmUpTablets(Mockito.any(TWarmUpTabletsRequest.class)))
+                .thenReturn(okWarmUpResponse());
+
+        CloudWarmUpJob job = new CloudWarmUpJob.Builder()
+                .setJobId(202L)
+                .setSrcClusterName("source_cluster")
+                .setDstClusterName("target_cluster")
+                .setJobType(JobType.CLUSTER)
+                .setSyncMode(SyncMode.PERIODIC)
+                .setSyncInterval(60L)
+                .build();
+        job.setJobState(JobState.RUNNING);
+        setStartTimeMs(job, System.currentTimeMillis());
+        setSetJobDone(job, true);
+        job.setErrMsg("previous failure");
+
+        Map<Long, List<List<Long>>> beToTabletIdBatches = new HashMap<>();
+        beToTabletIdBatches.put(1L, Collections.emptyList());
+        job.setBeToTabletIdBatches(beToTabletIdBatches);
+        Map<Long, String> beToThriftAddress = new HashMap<>();
+        beToThriftAddress.put(1L, address.getHostname() + ":" + 
address.getPort());
+        job.setBeToThriftAddress(beToThriftAddress);
+
+        CloudEnv cloudEnv = Mockito.mock(CloudEnv.class);
+        CacheHotspotManager cacheHotspotManager = 
Mockito.mock(CacheHotspotManager.class);
+        EditLog editLog = Mockito.mock(EditLog.class);
+        
Mockito.when(cloudEnv.getCacheHotspotMgr()).thenReturn(cacheHotspotManager);
+        Mockito.when(cloudEnv.getEditLog()).thenReturn(editLog);
+
+        boolean runningUnitTest = FeConstants.runningUnitTest;
+        FeConstants.runningUnitTest = false;
+        try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+            mockedEnv.when(Env::getCurrentEnv).thenReturn(cloudEnv);
+            invokeRunRunningJob(job);
+        } finally {
+            FeConstants.runningUnitTest = runningUnitTest;
+        }
+
+        Assert.assertEquals(JobState.PENDING, job.getJobState());
+        Assert.assertEquals("", job.getJobInfo(null).get(COL_ERR_MSG));
+        Mockito.verify(cacheHotspotManager).notifyJobStop(job);
+        Mockito.verify(editLog, 
Mockito.atLeastOnce()).logModifyCloudWarmUpJob(job);
+        Mockito.verify(mockBackendPool).returnObject(address, client);
+    }
+
     private CloudWarmUpJob createRunningJob(long jobId, TNetworkAddress 
firstAddress,
             TNetworkAddress secondAddress) {
         CloudWarmUpJob job = new CloudWarmUpJob.Builder()
@@ -160,4 +345,51 @@ public class CloudWarmUpJobTest {
         method.setAccessible(true);
         method.invoke(job);
     }
+
+    private void invokeRunPendingJob(CloudWarmUpJob job) throws Exception {
+        Method method = 
CloudWarmUpJob.class.getDeclaredMethod("runPendingJob");
+        method.setAccessible(true);
+        method.invoke(job);
+    }
+
+    private void invokeRunRunningJob(CloudWarmUpJob job) throws Exception {
+        Method method = 
CloudWarmUpJob.class.getDeclaredMethod("runRunningJob");
+        method.setAccessible(true);
+        method.invoke(job);
+    }
+
+    private void setStartTimeMs(CloudWarmUpJob job, long startTimeMs) throws 
Exception {
+        java.lang.reflect.Field field = 
CloudWarmUpJob.class.getDeclaredField("startTimeMs");
+        field.setAccessible(true);
+        field.setLong(job, startTimeMs);
+    }
+
+    private void setSetJobDone(CloudWarmUpJob job, boolean setJobDone) throws 
Exception {
+        java.lang.reflect.Field field = 
CloudWarmUpJob.class.getDeclaredField("setJobDone");
+        field.setAccessible(true);
+        field.setBoolean(job, setJobDone);
+    }
+
+    private CloudWarmUpJob copyBySerialization(CloudWarmUpJob job) throws 
Exception {
+        ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+        try (DataOutputStream out = new DataOutputStream(bytes)) {
+            job.write(out);
+        }
+        try (DataInputStream in = new DataInputStream(new 
ByteArrayInputStream(bytes.toByteArray()))) {
+            return CloudWarmUpJob.read(in);
+        }
+    }
+
+    private TWarmUpTabletsResponse okWarmUpResponse() {
+        TWarmUpTabletsResponse response = new TWarmUpTabletsResponse();
+        response.setStatus(new TStatus(TStatusCode.OK));
+        return response;
+    }
+
+    private TWarmUpTabletsResponse failedWarmUpResponse() {
+        TWarmUpTabletsResponse response = new TWarmUpTabletsResponse();
+        response.setStatus(new TStatus(TStatusCode.INTERNAL_ERROR));
+        response.getStatus().setErrorMsgs(Collections.emptyList());
+        return response;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to