This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 0118bfd27ee [fix](task) Abort creating replica task if sending RPC
failed #42276 (#42962)
0118bfd27ee is described below
commit 0118bfd27eea00d61d95ce37a9417b21d24c10d2
Author: walter <[email protected]>
AuthorDate: Thu Oct 31 14:40:04 2024 +0800
[fix](task) Abort creating replica task if sending RPC failed #42276
(#42962)
cherry pick from #42276
---
.../java/org/apache/doris/alter/SchemaChangeJobV2.java | 2 +-
.../main/java/org/apache/doris/backup/RestoreJob.java | 11 ++++++++---
.../org/apache/doris/common/MarkedCountDownLatch.java | 14 ++++++++++++++
.../main/java/org/apache/doris/task/AgentBatchTask.java | 14 ++++++++------
.../src/main/java/org/apache/doris/task/AgentTask.java | 4 ++++
.../java/org/apache/doris/task/CreateReplicaTask.java | 17 +++++++++++++++++
6 files changed, 52 insertions(+), 10 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 70799f0b4b6..d5fb729905d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -320,7 +320,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
ok = false;
}
- if (!ok) {
+ if (!ok || !countDownLatch.getStatus().ok()) {
// create replicas failed. just cancel the job
// clear tasks and show the failed replicas to user
AgentTaskQueue.removeBatchTask(batchTask, TTaskType.CREATE);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 4abf4d3aa3c..173af1c25f3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -952,7 +952,7 @@ public class RestoreJob extends AbstractJob {
ok = true;
}
- if (ok) {
+ if (ok && latch.getStatus().ok()) {
if (LOG.isDebugEnabled()) {
LOG.debug("finished to create all restored replicas. {}",
this);
}
@@ -1016,8 +1016,13 @@ public class RestoreJob extends AbstractJob {
.map(item -> "(backendId = " + item.getKey() + ", tabletId
= " + item.getValue() + ")")
.collect(Collectors.toList());
String idStr = Joiner.on(", ").join(subList);
- status = new Status(ErrCode.COMMON_ERROR,
- "Failed to create replicas for restore. unfinished marks:
" + idStr);
+ String reason = "TIMEDOUT";
+ if (!latch.getStatus().ok()) {
+ reason = latch.getStatus().getErrorMsg();
+ }
+ String errMsg = String.format(
+ "Failed to create replicas for restore: %s, unfinished
marks: %s", reason, idStr);
+ status = new Status(ErrCode.COMMON_ERROR, errMsg);
return;
}
LOG.info("finished to prepare meta. {}", this);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java
b/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java
index 14641d501d7..e1431c4d729 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/MarkedCountDownLatch.java
@@ -47,6 +47,20 @@ public class MarkedCountDownLatch<K, V> extends
CountDownLatch {
return false;
}
+ public synchronized boolean markedCountDownWithStatus(K key, V value,
Status status) {
+ // update status first before countDown.
+ // so that the waiting thread will get the correct status.
+ if (st.ok()) {
+ st = status;
+ }
+
+ if (marks.remove(key, value)) {
+ super.countDown();
+ return true;
+ }
+ return false;
+ }
+
public synchronized List<Entry<K, V>> getLeftMarks() {
return Lists.newArrayList(marks.entries());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index aa654fcd21a..848211f9413 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -153,9 +153,11 @@ public class AgentBatchTask implements Runnable {
BackendService.Client client = null;
TNetworkAddress address = null;
boolean ok = false;
+ String errMsg = "";
try {
Backend backend =
Env.getCurrentSystemInfo().getBackend(backendId);
if (backend == null || !backend.isAlive()) {
+ errMsg = String.format("backend %d is not alive",
backendId);
continue;
}
List<AgentTask> tasks = this.backendIdToTasks.get(backendId);
@@ -165,12 +167,7 @@ public class AgentBatchTask implements Runnable {
client = ClientPool.backendPool.borrowObject(address);
List<TAgentTaskRequest> agentTaskRequests = new
LinkedList<TAgentTaskRequest>();
for (AgentTask task : tasks) {
- try {
- agentTaskRequests.add(toAgentTaskRequest(task));
- } catch (Exception e) {
- task.failed();
- throw e;
- }
+ agentTaskRequests.add(toAgentTaskRequest(task));
}
client.submitTasks(agentTaskRequests);
if (LOG.isDebugEnabled()) {
@@ -182,11 +179,16 @@ public class AgentBatchTask implements Runnable {
ok = true;
} catch (Exception e) {
LOG.warn("task exec error. backend[{}]", backendId, e);
+ errMsg = String.format("task exec error: %s. backend[%d]",
e.getMessage(), backendId);
} finally {
if (ok) {
ClientPool.backendPool.returnObject(address, client);
} else {
ClientPool.backendPool.invalidateObject(address, client);
+ List<AgentTask> tasks =
this.backendIdToTasks.get(backendId);
+ for (AgentTask task : tasks) {
+ task.failedWithMsg(errMsg);
+ }
}
}
} // end for backend
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
index f878fc521f5..0ba998b3808 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentTask.java
@@ -110,6 +110,10 @@ public abstract class AgentTask {
++this.failedTimes;
}
+ public void failedWithMsg(String errMsg) {
+ failed();
+ }
+
public int getFailedTimes() {
return this.failedTimes;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
index 7ea5c951ca6..6947867b389 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
@@ -220,6 +220,23 @@ public class CreateReplicaTask extends AgentTask {
}
}
+ @Override
+ public void failedWithMsg(String errMsg) {
+ super.failedWithMsg(errMsg);
+
+ // CreateReplicaTask will not trigger a retry in ReportTask.
Therefore, it needs to
+ // be marked as failed here and all threads waiting for the result of
+ // CreateReplicaTask need to be awakened.
+ if (this.latch != null) {
+ Status s = new Status(TStatusCode.CANCELLED, errMsg);
+ latch.markedCountDownWithStatus(getBackendId(), getTabletId(), s);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CreateReplicaTask failed with msg: {}, tablet: {},
backend: {}",
+ errMsg, getTabletId(), getBackendId());
+ }
+ }
+ }
+
public void setLatch(MarkedCountDownLatch<Long, Long> latch) {
this.latch = latch;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]