This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-4619 by this push:
new bb70d20e68 Fix some bugs while leadership change
bb70d20e68 is described below
commit bb70d20e6840f9e87b01d2a7731441560addead1
Author: JackieTien97 <[email protected]>
AuthorDate: Sun Oct 23 12:04:11 2022 +0800
Fix some bugs while leadership change
---
.../confignode/manager/cq/CQScheduleTask.java | 28 ++++++++++++++--------
1 file changed, 18 insertions(+), 10 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
index c048802c78..fdee430730 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
@@ -225,21 +225,29 @@ public class CQScheduleTask implements Runnable {
.getConsensusManager()
.write(new UpdateCQLastExecTimePlan(cqId, executionTime, md5));
- if (result.isSuccessful()) {
- if (needSubmit()) {
- updateExecutionTime();
- submitSelf();
- } else {
- LOGGER.info(
- "Stop submitting CQ {} because current node is not leader or
current scheduled thread pool is shut down.",
- cqId);
- }
- } else {
+ // while leadership changed, the update last exec time operation for
CQTasks in new leader
+ // may still update failed because stale CQTask in old leader may
update it in advance
+ if (!result.isSuccessful()) {
LOGGER.warn(
"Failed to update the last execution time {} of CQ {}, because
{}",
executionTime,
cqId,
result.getErrorMessage());
+ // no such cq, we don't need to submit it again
+ if (result.getStatus() != null
+ && result.getStatus().code ==
TSStatusCode.NO_SUCH_CQ.getStatusCode()) {
+ LOGGER.info("Stop submitting CQ {} because {}", cqId,
result.getStatus().message);
+ return;
+ }
+ }
+
+ if (needSubmit()) {
+ updateExecutionTime();
+ submitSelf();
+ } else {
+ LOGGER.info(
+ "Stop submitting CQ {} because current node is not leader or
current scheduled thread pool is shut down.",
+ cqId);
}
} else {