This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 70439e70580 branch-4.1: [fix](fe) Clear warm-up error message after
successful retry #64813 (#64870)
70439e70580 is described below
commit 70439e70580a725e72055085641cd951a89576cc
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jun 26 14:48:51 2026 +0800
branch-4.1: [fix](fe) Clear warm-up error message after successful retry
#64813 (#64870)
Cherry-picked from #64813
Co-authored-by: bobhan1 <[email protected]>
---
.../org/apache/doris/cloud/CloudWarmUpJob.java | 14 ++
.../org/apache/doris/cloud/CloudWarmUpJobTest.java | 232 +++++++++++++++++++++
2 files changed, 246 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
index c4f47ed9269..1d9ee1a821c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CloudWarmUpJob.java
@@ -524,6 +524,14 @@ public class CloudWarmUpJob implements Writable {
this.errMsg = msg;
}
+ private boolean resetErrMsg() {
+ if (StringUtils.isEmpty(errMsg)) {
+ return false;
+ }
+ this.errMsg = "";
+ return true;
+ }
+
public void setFinishedTimeMs(long timeMs) {
this.finishedTimeMs = timeMs;
}
@@ -969,6 +977,7 @@ public class CloudWarmUpJob implements Writable {
}
private void runEventDrivenJob() throws Exception {
+ boolean hasError = false;
try {
refreshEventDrivenBeToThriftAddress();
initClients();
@@ -990,6 +999,7 @@ public class CloudWarmUpJob implements Writable {
hasTableFilter() ? getCurrentTableIdNames().size() :
"all");
TWarmUpTabletsResponse response =
entry.getValue().warmUpTablets(request);
if (response.getStatus().getStatusCode() != TStatusCode.OK) {
+ hasError = true;
if (!response.getStatus().getErrorMsgs().isEmpty()) {
errMsg = response.getStatus().getErrorMsgs().get(0);
}
@@ -997,6 +1007,9 @@ public class CloudWarmUpJob implements Writable {
jobId, syncEvent, errMsg);
}
}
+ if (!hasError && resetErrMsg()) {
+ Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(this);
+ }
} catch (Exception e) {
errMsg = e.getMessage();
LOG.warn("send warm up request job_id={} failed with exception {}",
@@ -1105,6 +1118,7 @@ public class CloudWarmUpJob implements Writable {
}
if (allBatchesDone) {
clearJobOnBEs();
+ resetErrMsg();
this.finishedTimeMs = System.currentTimeMillis();
if (this.isPeriodic()) {
// wait for next schedule
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java
index 3da09461138..800dd89c6b2 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/CloudWarmUpJobTest.java
@@ -22,12 +22,17 @@ import org.apache.doris.cloud.CloudWarmUpJob.JobState;
import org.apache.doris.cloud.CloudWarmUpJob.JobType;
import org.apache.doris.cloud.CloudWarmUpJob.SyncEvent;
import org.apache.doris.cloud.CloudWarmUpJob.SyncMode;
+import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.GenericPool;
+import org.apache.doris.persist.EditLog;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TWarmUpTabletsRequest;
import org.apache.doris.thrift.TWarmUpTabletsRequestType;
import org.apache.doris.thrift.TWarmUpTabletsResponse;
@@ -40,20 +45,31 @@ import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.lang.reflect.Method;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
public class CloudWarmUpJobTest {
+ private static final int COL_ERR_MSG = 11;
+
private GenericPool<BackendService.Client> originalBackendPool;
private GenericPool<BackendService.Client> mockBackendPool;
+ private boolean originalRunningUnitTest;
@SuppressWarnings("unchecked")
@Before
public void setUp() {
+ originalRunningUnitTest = FeConstants.runningUnitTest;
+ FeConstants.runningUnitTest = true;
originalBackendPool = ClientPool.backendPool;
mockBackendPool = Mockito.mock(GenericPool.class);
ClientPool.backendPool = mockBackendPool;
@@ -62,6 +78,7 @@ public class CloudWarmUpJobTest {
@After
public void tearDown() {
ClientPool.backendPool = originalBackendPool;
+ FeConstants.runningUnitTest = originalRunningUnitTest;
}
@Test
@@ -140,6 +157,174 @@ public class CloudWarmUpJobTest {
Mockito.verify(mockBackendPool).invalidateObject(unavailableAddress,
null);
}
+ @Test
+ public void testPendingRetryKeepsErrMsgWhenJobStarts() throws Exception {
+ CloudWarmUpJob job = new CloudWarmUpJob.Builder()
+ .setJobId(200L)
+ .setSrcClusterName("source_cluster")
+ .setDstClusterName("target_cluster")
+ .setJobType(JobType.CLUSTER)
+ .setSyncMode(SyncMode.PERIODIC)
+ .setSyncInterval(60L)
+ .build();
+ job.setErrMsg("previous failure");
+
+ CloudEnv cloudEnv = Mockito.mock(CloudEnv.class);
+ CacheHotspotManager cacheHotspotManager =
Mockito.mock(CacheHotspotManager.class);
+ EditLog editLog = Mockito.mock(EditLog.class);
+
Mockito.when(cloudEnv.getCacheHotspotMgr()).thenReturn(cacheHotspotManager);
+ Mockito.when(cloudEnv.getEditLog()).thenReturn(editLog);
+
Mockito.when(cacheHotspotManager.tryRegisterRunningJob(job)).thenReturn(true);
+
+ try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+ mockedEnv.when(Env::getCurrentEnv).thenReturn(cloudEnv);
+ invokeRunPendingJob(job);
+ }
+
+ Assert.assertEquals(JobState.RUNNING, job.getJobState());
+ Assert.assertEquals("previous failure",
job.getJobInfo(null).get(COL_ERR_MSG));
+ Mockito.verify(editLog).logModifyCloudWarmUpJob(job);
+ }
+
+ @Test
+ public void testEventDrivenSuccessfulRetryClearsErrMsg() throws Exception {
+ CloudSystemInfoService cloudSystemInfoService =
Mockito.mock(CloudSystemInfoService.class);
+ Backend backend = new Backend(1L, "host1", 9050);
+ backend.setBePort(9060);
+
Mockito.when(cloudSystemInfoService.getBackendsByClusterName("source_cluster"))
+ .thenReturn(Arrays.asList(backend));
+
+ TNetworkAddress address = new TNetworkAddress("host1", 9060);
+ BackendService.Client client =
Mockito.mock(BackendService.Client.class);
+ Mockito.when(mockBackendPool.borrowObject(address)).thenReturn(client);
+
Mockito.when(client.warmUpTablets(Mockito.any(TWarmUpTabletsRequest.class)))
+ .thenReturn(okWarmUpResponse());
+ CloudEnv cloudEnv = Mockito.mock(CloudEnv.class);
+ EditLog editLog = Mockito.mock(EditLog.class);
+ Mockito.when(cloudEnv.getEditLog()).thenReturn(editLog);
+
+ CloudWarmUpJob job = new CloudWarmUpJob.Builder()
+ .setJobId(201L)
+ .setSrcClusterName("source_cluster")
+ .setDstClusterName("target_cluster")
+ .setJobType(JobType.CLUSTER)
+ .setSyncMode(SyncMode.EVENT_DRIVEN)
+ .setSyncEvent(SyncEvent.LOAD)
+ .build();
+ job.setJobState(JobState.RUNNING);
+ setStartTimeMs(job, System.currentTimeMillis());
+ job.setErrMsg("previous failure");
+
+ boolean runningUnitTest = FeConstants.runningUnitTest;
+ FeConstants.runningUnitTest = false;
+ try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(cloudSystemInfoService);
+ mockedEnv.when(Env::getCurrentEnv).thenReturn(cloudEnv);
+ job.run();
+ job.run();
+ } finally {
+ FeConstants.runningUnitTest = runningUnitTest;
+ }
+
+ Assert.assertEquals("", job.getJobInfo(null).get(COL_ERR_MSG));
+ ArgumentCaptor<CloudWarmUpJob> jobCaptor =
ArgumentCaptor.forClass(CloudWarmUpJob.class);
+ Mockito.verify(editLog,
Mockito.times(1)).logModifyCloudWarmUpJob(jobCaptor.capture());
+ CloudWarmUpJob replayedJob = copyBySerialization(jobCaptor.getValue());
+ Assert.assertEquals("", replayedJob.getJobInfo(null).get(COL_ERR_MSG));
+ Mockito.verify(client,
Mockito.times(2)).warmUpTablets(Mockito.any(TWarmUpTabletsRequest.class));
+ Mockito.verify(mockBackendPool,
Mockito.times(2)).returnObject(address, client);
+ }
+
+ @Test
+ public void testEventDrivenFailedRetryKeepsErrMsg() throws Exception {
+ CloudSystemInfoService cloudSystemInfoService =
Mockito.mock(CloudSystemInfoService.class);
+ Backend backend = new Backend(1L, "host1", 9050);
+ backend.setBePort(9060);
+
Mockito.when(cloudSystemInfoService.getBackendsByClusterName("source_cluster"))
+ .thenReturn(Arrays.asList(backend));
+
+ TNetworkAddress address = new TNetworkAddress("host1", 9060);
+ BackendService.Client client =
Mockito.mock(BackendService.Client.class);
+ Mockito.when(mockBackendPool.borrowObject(address)).thenReturn(client);
+
Mockito.when(client.warmUpTablets(Mockito.any(TWarmUpTabletsRequest.class)))
+ .thenReturn(failedWarmUpResponse());
+
+ CloudWarmUpJob job = new CloudWarmUpJob.Builder()
+ .setJobId(203L)
+ .setSrcClusterName("source_cluster")
+ .setDstClusterName("target_cluster")
+ .setJobType(JobType.CLUSTER)
+ .setSyncMode(SyncMode.EVENT_DRIVEN)
+ .setSyncEvent(SyncEvent.LOAD)
+ .build();
+ job.setJobState(JobState.RUNNING);
+ setStartTimeMs(job, System.currentTimeMillis());
+ job.setErrMsg("previous failure");
+
+ boolean runningUnitTest = FeConstants.runningUnitTest;
+ FeConstants.runningUnitTest = false;
+ try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+
mockedEnv.when(Env::getCurrentSystemInfo).thenReturn(cloudSystemInfoService);
+ job.run();
+ } finally {
+ FeConstants.runningUnitTest = runningUnitTest;
+ }
+
+ Assert.assertEquals("previous failure",
job.getJobInfo(null).get(COL_ERR_MSG));
+
Mockito.verify(client).warmUpTablets(Mockito.any(TWarmUpTabletsRequest.class));
+ Mockito.verify(mockBackendPool).returnObject(address, client);
+ }
+
+ @Test
+ public void testRunningRetryClearsErrMsgWhenJobFinishes() throws Exception
{
+ TNetworkAddress address = new TNetworkAddress("127.0.0.1", 9050);
+ BackendService.Client client =
Mockito.mock(BackendService.Client.class);
+ Mockito.when(mockBackendPool.borrowObject(address)).thenReturn(client);
+
Mockito.when(client.warmUpTablets(Mockito.any(TWarmUpTabletsRequest.class)))
+ .thenReturn(okWarmUpResponse());
+
+ CloudWarmUpJob job = new CloudWarmUpJob.Builder()
+ .setJobId(202L)
+ .setSrcClusterName("source_cluster")
+ .setDstClusterName("target_cluster")
+ .setJobType(JobType.CLUSTER)
+ .setSyncMode(SyncMode.PERIODIC)
+ .setSyncInterval(60L)
+ .build();
+ job.setJobState(JobState.RUNNING);
+ setStartTimeMs(job, System.currentTimeMillis());
+ setSetJobDone(job, true);
+ job.setErrMsg("previous failure");
+
+ Map<Long, List<List<Long>>> beToTabletIdBatches = new HashMap<>();
+ beToTabletIdBatches.put(1L, Collections.emptyList());
+ job.setBeToTabletIdBatches(beToTabletIdBatches);
+ Map<Long, String> beToThriftAddress = new HashMap<>();
+ beToThriftAddress.put(1L, address.getHostname() + ":" +
address.getPort());
+ job.setBeToThriftAddress(beToThriftAddress);
+
+ CloudEnv cloudEnv = Mockito.mock(CloudEnv.class);
+ CacheHotspotManager cacheHotspotManager =
Mockito.mock(CacheHotspotManager.class);
+ EditLog editLog = Mockito.mock(EditLog.class);
+
Mockito.when(cloudEnv.getCacheHotspotMgr()).thenReturn(cacheHotspotManager);
+ Mockito.when(cloudEnv.getEditLog()).thenReturn(editLog);
+
+ boolean runningUnitTest = FeConstants.runningUnitTest;
+ FeConstants.runningUnitTest = false;
+ try (MockedStatic<Env> mockedEnv = Mockito.mockStatic(Env.class)) {
+ mockedEnv.when(Env::getCurrentEnv).thenReturn(cloudEnv);
+ invokeRunRunningJob(job);
+ } finally {
+ FeConstants.runningUnitTest = runningUnitTest;
+ }
+
+ Assert.assertEquals(JobState.PENDING, job.getJobState());
+ Assert.assertEquals("", job.getJobInfo(null).get(COL_ERR_MSG));
+ Mockito.verify(cacheHotspotManager).notifyJobStop(job);
+ Mockito.verify(editLog,
Mockito.atLeastOnce()).logModifyCloudWarmUpJob(job);
+ Mockito.verify(mockBackendPool).returnObject(address, client);
+ }
+
private CloudWarmUpJob createRunningJob(long jobId, TNetworkAddress
firstAddress,
TNetworkAddress secondAddress) {
CloudWarmUpJob job = new CloudWarmUpJob.Builder()
@@ -160,4 +345,51 @@ public class CloudWarmUpJobTest {
method.setAccessible(true);
method.invoke(job);
}
+
+ private void invokeRunPendingJob(CloudWarmUpJob job) throws Exception {
+ Method method =
CloudWarmUpJob.class.getDeclaredMethod("runPendingJob");
+ method.setAccessible(true);
+ method.invoke(job);
+ }
+
+ private void invokeRunRunningJob(CloudWarmUpJob job) throws Exception {
+ Method method =
CloudWarmUpJob.class.getDeclaredMethod("runRunningJob");
+ method.setAccessible(true);
+ method.invoke(job);
+ }
+
+ private void setStartTimeMs(CloudWarmUpJob job, long startTimeMs) throws
Exception {
+ java.lang.reflect.Field field =
CloudWarmUpJob.class.getDeclaredField("startTimeMs");
+ field.setAccessible(true);
+ field.setLong(job, startTimeMs);
+ }
+
+ private void setSetJobDone(CloudWarmUpJob job, boolean setJobDone) throws
Exception {
+ java.lang.reflect.Field field =
CloudWarmUpJob.class.getDeclaredField("setJobDone");
+ field.setAccessible(true);
+ field.setBoolean(job, setJobDone);
+ }
+
+ private CloudWarmUpJob copyBySerialization(CloudWarmUpJob job) throws
Exception {
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ try (DataOutputStream out = new DataOutputStream(bytes)) {
+ job.write(out);
+ }
+ try (DataInputStream in = new DataInputStream(new
ByteArrayInputStream(bytes.toByteArray()))) {
+ return CloudWarmUpJob.read(in);
+ }
+ }
+
+ private TWarmUpTabletsResponse okWarmUpResponse() {
+ TWarmUpTabletsResponse response = new TWarmUpTabletsResponse();
+ response.setStatus(new TStatus(TStatusCode.OK));
+ return response;
+ }
+
+ private TWarmUpTabletsResponse failedWarmUpResponse() {
+ TWarmUpTabletsResponse response = new TWarmUpTabletsResponse();
+ response.setStatus(new TStatus(TStatusCode.INTERNAL_ERROR));
+ response.getStatus().setErrorMsgs(Collections.emptyList());
+ return response;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]