This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch IOTDB-4619
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-4619 by this push:
     new bb70d20e68 Fix some bugs while leadership change
bb70d20e68 is described below

commit bb70d20e6840f9e87b01d2a7731441560addead1
Author: JackieTien97 <[email protected]>
AuthorDate: Sun Oct 23 12:04:11 2022 +0800

    Fix some bugs while leadership change
---
 .../confignode/manager/cq/CQScheduleTask.java      | 28 ++++++++++++++--------
 1 file changed, 18 insertions(+), 10 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
index c048802c78..fdee430730 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
@@ -225,21 +225,29 @@ public class CQScheduleTask implements Runnable {
                 .getConsensusManager()
                 .write(new UpdateCQLastExecTimePlan(cqId, executionTime, md5));
 
-        if (result.isSuccessful()) {
-          if (needSubmit()) {
-            updateExecutionTime();
-            submitSelf();
-          } else {
-            LOGGER.info(
-                "Stop submitting CQ {} because current node is not leader or 
current scheduled thread pool is shut down.",
-                cqId);
-          }
-        } else {
+        // while leadership changed, the update last exec time operation for 
CQTasks in new leader
+        // may still update failed because stale CQTask in old leader may 
update it in advance
+        if (!result.isSuccessful()) {
           LOGGER.warn(
               "Failed to update the last execution time {} of CQ {}, because 
{}",
               executionTime,
               cqId,
               result.getErrorMessage());
+          // no such cq, we don't need to submit it again
+          if (result.getStatus() != null
+              && result.getStatus().code == 
TSStatusCode.NO_SUCH_CQ.getStatusCode()) {
+            LOGGER.info("Stop submitting CQ {} because {}", cqId, 
result.getStatus().message);
+            return;
+          }
+        }
+
+        if (needSubmit()) {
+          updateExecutionTime();
+          submitSelf();
+        } else {
+          LOGGER.info(
+              "Stop submitting CQ {} because current node is not leader or 
current scheduled thread pool is shut down.",
+              cqId);
         }
 
       } else {

Reply via email to