This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch cq-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 09c0a97ae920935d7c2e1456c4e2113673b22ac1
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 21 11:17:04 2026 +0800

    fix
---
 .../iotdb/confignode/manager/cq/CQManager.java     |  42 +++++++-
 .../procedure/impl/cq/CreateCQProcedure.java       |  62 +++++++++++-
 .../iotdb/confignode/persistence/CQInfoTest.java   |  37 +++++++
 .../procedure/impl/CreateCQProcedureTest.java      |  26 +++++
 .../impl/cq/CreateCQProcedureRecoveryTest.java     | 112 +++++++++++++++++++++
 5 files changed, 273 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
index c4c1e8aede9..faad8623990 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
@@ -43,7 +43,10 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -57,11 +60,14 @@ public class CQManager {
 
   private final ReadWriteLock lock;
 
+  private final ConcurrentMap<String, String> locallyScheduledCQs;
+
   private ScheduledExecutorService executor;
 
   public CQManager(ConfigManager configManager) {
     this.configManager = configManager;
     this.lock = new ReentrantReadWriteLock();
+    this.locallyScheduledCQs = new ConcurrentHashMap<>();
     this.executor =
         IoTDBThreadPoolFactory.newScheduledThreadPool(
             CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName());
@@ -79,7 +85,11 @@ public class CQManager {
 
   public TSStatus dropCQ(TDropCQReq req) {
     try {
-      return configManager.getConsensusManager().write(new 
DropCQPlan(req.cqId));
+      TSStatus status = configManager.getConsensusManager().write(new 
DropCQPlan(req.cqId));
+      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        locallyScheduledCQs.remove(req.cqId);
+      }
+      return status;
     } catch (ConsensusException e) {
       LOGGER.warn(ManagerMessages.UNEXPECTED_ERROR_HAPPENED_WHILE_DROPPING_CQ, 
req.cqId, e);
       // consensus layer related errors
@@ -132,6 +142,7 @@ public class CQManager {
       executor =
           IoTDBThreadPoolFactory.newScheduledThreadPool(
               CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName());
+      locallyScheduledCQs.clear();
 
       // 3. get all CQs
       List<CQInfo.CQEntry> allCQs = null;
@@ -155,8 +166,16 @@ public class CQManager {
       if (allCQs != null) {
         for (CQInfo.CQEntry entry : allCQs) {
           if (entry.getState() == CQState.ACTIVE) {
+            if (!markCQLocallyScheduled(entry.getCqId(), entry.getMd5())) {
+              continue;
+            }
             CQScheduleTask cqScheduleTask = new CQScheduleTask(entry, 
executor, configManager);
-            cqScheduleTask.submitSelf();
+            try {
+              cqScheduleTask.submitSelf();
+            } catch (RuntimeException e) {
+              unmarkCQLocallyScheduled(entry.getCqId(), entry.getMd5());
+              throw e;
+            }
           }
         }
       }
@@ -176,6 +195,7 @@ public class CQManager {
     try {
       previous = executor;
       executor = null;
+      locallyScheduledCQs.clear();
     } finally {
       lock.writeLock().unlock();
     }
@@ -183,4 +203,22 @@ public class CQManager {
       previous.shutdown();
     }
   }
+
+  public boolean markCQLocallyScheduled(String cqId, String md5) {
+    AtomicBoolean shouldSchedule = new AtomicBoolean(false);
+    locallyScheduledCQs.compute(
+        cqId,
+        (ignored, previousMd5) -> {
+          if (!md5.equals(previousMd5)) {
+            shouldSchedule.set(true);
+            return md5;
+          }
+          return previousMd5;
+        });
+    return shouldSchedule.get();
+  }
+
+  public void unmarkCQLocallyScheduled(String cqId, String md5) {
+    locallyScheduledCQs.remove(cqId, md5);
+  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
index ac964d23ca3..18b9d99ee18 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
@@ -22,11 +22,15 @@ package org.apache.iotdb.confignode.procedure.impl.cq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
+import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp;
 import org.apache.iotdb.confignode.i18n.ProcedureMessages;
+import org.apache.iotdb.confignode.manager.cq.CQManager;
 import org.apache.iotdb.confignode.manager.cq.CQScheduleTask;
+import org.apache.iotdb.confignode.persistence.cq.CQInfo;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
@@ -45,6 +49,8 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static 
org.apache.iotdb.confignode.procedure.state.cq.CreateCQState.INACTIVE;
@@ -75,7 +81,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
   public CreateCQProcedure(TCreateCQReq req, ScheduledExecutorService 
executor) {
     super();
     this.req = req;
-    this.md5 = DigestUtils.md2Hex(req.cqId);
+    this.md5 = generateCQToken(req.cqId);
     this.executor = executor;
     this.firstExecutionTime =
         CQScheduleTask.getFirstExecutionTime(req.boundaryTime, 
req.everyInterval);
@@ -91,12 +97,15 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
           addCQ(env);
           return Flow.HAS_MORE_STATE;
         case INACTIVE:
-          CQScheduleTask cqScheduleTask =
-              new CQScheduleTask(req, firstExecutionTime, md5, executor, 
env.getConfigManager());
-          cqScheduleTask.submitSelf();
+          submitScheduleTask(
+              env,
+              new CQScheduleTask(req, firstExecutionTime, md5, executor, 
env.getConfigManager()));
           setNextState(SCHEDULED);
           break;
         case SCHEDULED:
+          if (isStateDeserialized()) {
+            recoverScheduledTask(env);
+          }
           activeCQ(env);
           return Flow.NO_MORE_STATE;
         default:
@@ -168,6 +177,43 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
     }
   }
 
+  void recoverScheduledTask(ConfigNodeProcedureEnv env) throws 
ConsensusException {
+    Optional<CQInfo.CQEntry> cqEntry = getCurrentCQEntry(env);
+    if (!cqEntry.isPresent()) {
+      LOGGER.info(
+          "Skip recovering the schedule task of CQ {} because its metadata is 
unavailable.",
+          req.cqId);
+      return;
+    }
+    submitScheduleTask(env, new CQScheduleTask(cqEntry.get(), executor, 
env.getConfigManager()));
+  }
+
+  Optional<CQInfo.CQEntry> getCurrentCQEntry(ConfigNodeProcedureEnv env)
+      throws ConsensusException {
+    ShowCQResp response =
+        (ShowCQResp) env.getConfigManager().getConsensusManager().read(new 
ShowCQPlan());
+    return response.getCqList().stream()
+        .filter(entry -> req.cqId.equals(entry.getCqId()) && 
md5.equals(entry.getMd5()))
+        .findFirst();
+  }
+
+  private static String generateCQToken(String cqId) {
+    return DigestUtils.md2Hex(cqId + "-" + UUID.randomUUID());
+  }
+
+  private void submitScheduleTask(ConfigNodeProcedureEnv env, CQScheduleTask 
cqScheduleTask) {
+    CQManager cqManager = env.getConfigManager().getCQManager();
+    if (!cqManager.markCQLocallyScheduled(req.cqId, md5)) {
+      return;
+    }
+    try {
+      cqScheduleTask.submitSelf();
+    } catch (RuntimeException e) {
+      cqManager.unmarkCQLocallyScheduled(req.cqId, md5);
+      throw e;
+    }
+  }
+
   @Override
   protected void rollbackState(ConfigNodeProcedureEnv env, CreateCQState state)
       throws IOException, InterruptedException, ProcedureException {
@@ -272,4 +318,12 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
         md5,
         firstExecutionTime);
   }
+
+  public String getCqId() {
+    return req == null ? null : req.getCqId();
+  }
+
+  public String getMd5() {
+    return md5;
+  }
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
index 4b409d6cf0c..0978d0bba9b 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
@@ -19,8 +19,11 @@
 package org.apache.iotdb.confignode.persistence;
 
 import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
+import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
+import 
org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan;
 import org.apache.iotdb.confignode.persistence.cq.CQInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
 import org.apache.tsfile.external.commons.io.FileUtils;
@@ -99,4 +102,38 @@ public class CQInfoTest {
 
     Assert.assertEquals(cqInfo, actualCQInfo);
   }
+
+  @Test
+  public void testOldCallbackCannotTouchRecreatedCQ() throws Exception {
+    long executionTime = System.currentTimeMillis();
+    TCreateCQReq req =
+        new TCreateCQReq(
+            "testCq3",
+            1000,
+            0,
+            1000,
+            0,
+            (byte) 0,
+            "select s1 into root.backup.d3.s1 from root.sg.d3",
+            "create cq testCq3 BEGIN select s1 into root.backup.d3.s1 from 
root.sg.d3 END",
+            "Asia",
+            "root");
+
+    cqInfo.addCQ(new AddCQPlan(req, "oldMd5", executionTime));
+    cqInfo.dropCQ(new DropCQPlan("testCq3"));
+    cqInfo.addCQ(new AddCQPlan(req, "newMd5", executionTime));
+
+    Assert.assertEquals(
+        TSStatusCode.NO_SUCH_CQ.getStatusCode(),
+        cqInfo
+            .updateCQLastExecutionTime(
+                new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, 
"oldMd5"))
+            .code);
+    Assert.assertEquals(
+        TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+        cqInfo
+            .updateCQLastExecutionTime(
+                new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, 
"newMd5"))
+            .code);
+  }
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
index d0e92b32816..4e088f578ad 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
@@ -36,10 +36,36 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 
 public class CreateCQProcedureTest {
 
+  @Test
+  public void tokenShouldBeUniqueForSameCQId() {
+    ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+    try {
+      TCreateCQReq req =
+          new TCreateCQReq(
+              "testCq1",
+              1000,
+              0,
+              1000,
+              0,
+              (byte) 0,
+              "select s1 into root.backup.d1(s1) from root.sg.d1",
+              "create cq testCq1 BEGIN select s1 into root.backup.d1(s1) from 
root.sg.d1 END",
+              "Asia",
+              "root");
+      CreateCQProcedure createCQProcedure1 = new CreateCQProcedure(req, 
executor);
+      CreateCQProcedure createCQProcedure2 = new CreateCQProcedure(req, 
executor);
+
+      assertNotEquals(createCQProcedure1.getMd5(), 
createCQProcedure2.getMd5());
+    } finally {
+      executor.shutdown();
+    }
+  }
+
   @Test
   public void serializeDeserializeTest() {
 
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java
new file mode 100644
index 00000000000..ca68f6f3a89
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.cq;
+
+import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
+import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.cq.CQManager;
+import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
+import org.apache.iotdb.confignode.persistence.cq.CQInfo;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public class CreateCQProcedureRecoveryTest {
+
+  private TCreateCQReq newCreateCQReq() {
+    return new TCreateCQReq(
+        "testCq1",
+        1000,
+        0,
+        1000,
+        0,
+        (byte) 0,
+        "select s1 into root.backup.d1.s1 from root.sg.d1",
+        "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from 
root.sg.d1 END",
+        "Asia",
+        "root");
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void recoverScheduledTaskShouldResubmitFromLatestMetadata() throws 
Exception {
+    ScheduledExecutorService executor = 
Mockito.mock(ScheduledExecutorService.class);
+    Mockito.when(
+            executor.schedule(
+                Mockito.any(Runnable.class), Mockito.anyLong(), 
Mockito.any(TimeUnit.class)))
+        .thenReturn(Mockito.mock(ScheduledFuture.class));
+
+    ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);
+    CQManager cqManager = Mockito.mock(CQManager.class);
+    ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class);
+    Mockito.when(env.getConfigManager()).thenReturn(configManager);
+    
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
+    Mockito.when(configManager.getCQManager()).thenReturn(cqManager);
+    Mockito.when(cqManager.markCQLocallyScheduled(Mockito.anyString(), 
Mockito.anyString()))
+        .thenReturn(true);
+
+    TCreateCQReq req = newCreateCQReq();
+    CreateCQProcedure procedure = new CreateCQProcedure(req, executor);
+
+    CQInfo cqInfo = new CQInfo();
+    cqInfo.addCQ(new AddCQPlan(req, procedure.getMd5(), 
System.currentTimeMillis() + 10_000));
+    
Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ());
+
+    procedure.recoverScheduledTask(env);
+
+    Mockito.verify(executor)
+        .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), 
Mockito.any(TimeUnit.class));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void recoverScheduledTaskShouldSkipDuplicatedLocalSchedule() throws 
Exception {
+    ScheduledExecutorService executor = 
Mockito.mock(ScheduledExecutorService.class);
+    ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);
+    CQManager cqManager = Mockito.mock(CQManager.class);
+    ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class);
+    Mockito.when(env.getConfigManager()).thenReturn(configManager);
+    
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
+    Mockito.when(configManager.getCQManager()).thenReturn(cqManager);
+    Mockito.when(cqManager.markCQLocallyScheduled(Mockito.anyString(), 
Mockito.anyString()))
+        .thenReturn(false);
+
+    TCreateCQReq req = newCreateCQReq();
+    CreateCQProcedure procedure = new CreateCQProcedure(req, executor);
+
+    CQInfo cqInfo = new CQInfo();
+    cqInfo.addCQ(new AddCQPlan(req, procedure.getMd5(), 
System.currentTimeMillis() + 10_000));
+    
Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ());
+
+    procedure.recoverScheduledTask(env);
+
+    Mockito.verify(executor, Mockito.never())
+        .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), 
Mockito.any(TimeUnit.class));
+  }
+}

Reply via email to