This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch revert
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1cdfb70796fb8ab4de9e672a9c37f7d7103e9ccb
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 3 12:22:54 2026 +0800

    Revert "Fix CQ recovery gap and stale callback contamination (#17734) 
(#17820)"
    
    This reverts commit 24be79aba9c973a4cb11fdb8947cdf32da12ea8e.
---
 iotdb-core/confignode/pom.xml                      |   4 +
 .../consensus/request/read/cq/ShowCQPlan.java      |  13 ---
 .../consensus/request/write/cq/ActiveCQPlan.java   |  20 ++--
 .../consensus/request/write/cq/AddCQPlan.java      |  20 ++--
 .../consensus/request/write/cq/DropCQPlan.java     |  20 ++--
 .../request/write/cq/UpdateCQLastExecTimePlan.java |  23 ++--
 .../iotdb/confignode/manager/cq/CQManager.java     | 102 +-----------------
 .../confignode/manager/cq/CQScheduleTask.java      |  58 ++--------
 .../iotdb/confignode/persistence/cq/CQInfo.java    |  76 ++++++-------
 .../persistence/executor/ConfigPlanExecutor.java   |   3 +-
 .../procedure/impl/cq/CreateCQProcedure.java       |  80 +++-----------
 .../request/ConfigPhysicalPlanSerDeTest.java       |   8 +-
 .../apache/iotdb/confignode/cq/CQManagerTest.java  | 107 -------------------
 .../iotdb/confignode/persistence/CQInfoTest.java   |  64 +----------
 .../procedure/impl/CreateCQProcedureTest.java      |  26 -----
 .../impl/cq/CreateCQProcedureRecoveryTest.java     | 117 ---------------------
 16 files changed, 106 insertions(+), 635 deletions(-)

diff --git a/iotdb-core/confignode/pom.xml b/iotdb-core/confignode/pom.xml
index a7cd4b6298b..fb22fd5b091 100644
--- a/iotdb-core/confignode/pom.xml
+++ b/iotdb-core/confignode/pom.xml
@@ -137,6 +137,10 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
         </dependency>
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.thrift</groupId>
             <artifactId>libthrift</artifactId>
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java
index c28838d556b..5217849deb4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java
@@ -21,24 +21,11 @@ package 
org.apache.iotdb.confignode.consensus.request.read.cq;
 
 import 
org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
 
-import java.util.Optional;
-
 import static 
org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType.SHOW_CQ;
 
 public class ShowCQPlan extends ConfigPhysicalReadPlan {
 
-  private final String cqId;
-
   public ShowCQPlan() {
-    this(null);
-  }
-
-  public ShowCQPlan(String cqId) {
     super(SHOW_CQ);
-    this.cqId = cqId;
-  }
-
-  public Optional<String> getCqId() {
-    return Optional.ofNullable(cqId);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
index 4ca33b054a9..e488ac25669 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
@@ -35,39 +35,39 @@ public class ActiveCQPlan extends ConfigPhysicalPlan {
 
   private String cqId;
 
-  private String cqToken;
+  private String md5;
 
   public ActiveCQPlan() {
     super(ACTIVE_CQ);
   }
 
-  public ActiveCQPlan(String cqId, String cqToken) {
+  public ActiveCQPlan(String cqId, String md5) {
     super(ACTIVE_CQ);
     Validate.notNull(cqId);
-    Validate.notNull(cqToken);
+    Validate.notNull(md5);
     this.cqId = cqId;
-    this.cqToken = cqToken;
+    this.md5 = md5;
   }
 
   public String getCqId() {
     return cqId;
   }
 
-  public String getCqToken() {
-    return cqToken;
+  public String getMd5() {
+    return md5;
   }
 
   @Override
   protected void serializeImpl(DataOutputStream stream) throws IOException {
     stream.writeShort(getType().getPlanType());
     ReadWriteIOUtils.write(cqId, stream);
-    ReadWriteIOUtils.write(cqToken, stream);
+    ReadWriteIOUtils.write(md5, stream);
   }
 
   @Override
   protected void deserializeImpl(ByteBuffer buffer) throws IOException {
     cqId = ReadWriteIOUtils.readString(buffer);
-    cqToken = ReadWriteIOUtils.readString(buffer);
+    md5 = ReadWriteIOUtils.readString(buffer);
   }
 
   @Override
@@ -82,11 +82,11 @@ public class ActiveCQPlan extends ConfigPhysicalPlan {
       return false;
     }
     ActiveCQPlan that = (ActiveCQPlan) o;
-    return cqId.equals(that.cqId) && cqToken.equals(that.cqToken);
+    return cqId.equals(that.cqId) && md5.equals(that.md5);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), cqId, cqToken);
+    return Objects.hash(super.hashCode(), cqId, md5);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
index 721c83d3d2f..0aae9e2e974 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
@@ -37,7 +37,7 @@ public class AddCQPlan extends ConfigPhysicalPlan {
 
   private TCreateCQReq req;
 
-  private String cqToken;
+  private String md5;
 
   private long firstExecutionTime;
 
@@ -45,12 +45,12 @@ public class AddCQPlan extends ConfigPhysicalPlan {
     super(ADD_CQ);
   }
 
-  public AddCQPlan(TCreateCQReq req, String cqToken, long firstExecutionTime) {
+  public AddCQPlan(TCreateCQReq req, String md5, long firstExecutionTime) {
     super(ADD_CQ);
     Validate.notNull(req);
-    Validate.notNull(cqToken);
+    Validate.notNull(md5);
     this.req = req;
-    this.cqToken = cqToken;
+    this.md5 = md5;
     this.firstExecutionTime = firstExecutionTime;
   }
 
@@ -58,8 +58,8 @@ public class AddCQPlan extends ConfigPhysicalPlan {
     return req;
   }
 
-  public String getCqToken() {
-    return cqToken;
+  public String getMd5() {
+    return md5;
   }
 
   public long getFirstExecutionTime() {
@@ -70,14 +70,14 @@ public class AddCQPlan extends ConfigPhysicalPlan {
   protected void serializeImpl(DataOutputStream stream) throws IOException {
     stream.writeShort(getType().getPlanType());
     ThriftCommonsSerDeUtils.serializeTCreateCQReq(req, stream);
-    ReadWriteIOUtils.write(cqToken, stream);
+    ReadWriteIOUtils.write(md5, stream);
     ReadWriteIOUtils.write(firstExecutionTime, stream);
   }
 
   @Override
   protected void deserializeImpl(ByteBuffer buffer) throws IOException {
     req = ThriftCommonsSerDeUtils.deserializeTCreateCQReq(buffer);
-    cqToken = ReadWriteIOUtils.readString(buffer);
+    md5 = ReadWriteIOUtils.readString(buffer);
     firstExecutionTime = ReadWriteIOUtils.readLong(buffer);
   }
 
@@ -95,11 +95,11 @@ public class AddCQPlan extends ConfigPhysicalPlan {
     AddCQPlan addCQPlan = (AddCQPlan) o;
     return firstExecutionTime == addCQPlan.firstExecutionTime
         && Objects.equals(req, addCQPlan.req)
-        && Objects.equals(cqToken, addCQPlan.cqToken);
+        && Objects.equals(md5, addCQPlan.md5);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), req, cqToken, firstExecutionTime);
+    return Objects.hash(super.hashCode(), req, md5, firstExecutionTime);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
index f22561c0b80..5c901362997 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
@@ -37,7 +37,7 @@ public class DropCQPlan extends ConfigPhysicalPlan {
   private String cqId;
 
   // may be null in user call of drop CQ
-  private String cqToken;
+  private String md5;
 
   public DropCQPlan() {
     super(DROP_CQ);
@@ -49,33 +49,33 @@ public class DropCQPlan extends ConfigPhysicalPlan {
     this.cqId = cqId;
   }
 
-  public DropCQPlan(String cqId, String cqToken) {
+  public DropCQPlan(String cqId, String md5) {
     super(DROP_CQ);
     Validate.notNull(cqId);
-    Validate.notNull(cqToken);
+    Validate.notNull(md5);
     this.cqId = cqId;
-    this.cqToken = cqToken;
+    this.md5 = md5;
   }
 
   public String getCqId() {
     return cqId;
   }
 
-  public Optional<String> getCqToken() {
-    return Optional.ofNullable(cqToken);
+  public Optional<String> getMd5() {
+    return Optional.ofNullable(md5);
   }
 
   @Override
   protected void serializeImpl(DataOutputStream stream) throws IOException {
     stream.writeShort(getType().getPlanType());
     ReadWriteIOUtils.write(cqId, stream);
-    ReadWriteIOUtils.write(cqToken, stream);
+    ReadWriteIOUtils.write(md5, stream);
   }
 
   @Override
   protected void deserializeImpl(ByteBuffer buffer) throws IOException {
     cqId = ReadWriteIOUtils.readString(buffer);
-    cqToken = ReadWriteIOUtils.readString(buffer);
+    md5 = ReadWriteIOUtils.readString(buffer);
   }
 
   @Override
@@ -90,11 +90,11 @@ public class DropCQPlan extends ConfigPhysicalPlan {
       return false;
     }
     DropCQPlan that = (DropCQPlan) o;
-    return cqId.equals(that.cqId) && Objects.equals(cqToken, that.cqToken);
+    return cqId.equals(that.cqId) && Objects.equals(md5, that.md5);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), cqId, cqToken);
+    return Objects.hash(super.hashCode(), cqId, md5);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
index 392f6006ba5..51a8358a04c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
@@ -37,19 +37,20 @@ public class UpdateCQLastExecTimePlan extends 
ConfigPhysicalPlan {
 
   private long executionTime;
 
-  private String cqToken;
+  // may be null in user call of drop CQ
+  private String md5;
 
   public UpdateCQLastExecTimePlan() {
     super(UPDATE_CQ_LAST_EXEC_TIME);
   }
 
-  public UpdateCQLastExecTimePlan(String cqId, long executionTime, String 
cqToken) {
+  public UpdateCQLastExecTimePlan(String cqId, long executionTime, String md5) 
{
     super(UPDATE_CQ_LAST_EXEC_TIME);
     Validate.notNull(cqId);
-    Validate.notNull(cqToken);
+    Validate.notNull(md5);
     this.cqId = cqId;
     this.executionTime = executionTime;
-    this.cqToken = cqToken;
+    this.md5 = md5;
   }
 
   public String getCqId() {
@@ -60,8 +61,8 @@ public class UpdateCQLastExecTimePlan extends 
ConfigPhysicalPlan {
     return executionTime;
   }
 
-  public String getCqToken() {
-    return cqToken;
+  public String getMd5() {
+    return md5;
   }
 
   @Override
@@ -69,14 +70,14 @@ public class UpdateCQLastExecTimePlan extends 
ConfigPhysicalPlan {
     stream.writeShort(getType().getPlanType());
     ReadWriteIOUtils.write(cqId, stream);
     ReadWriteIOUtils.write(executionTime, stream);
-    ReadWriteIOUtils.write(cqToken, stream);
+    ReadWriteIOUtils.write(md5, stream);
   }
 
   @Override
   protected void deserializeImpl(ByteBuffer buffer) throws IOException {
     cqId = ReadWriteIOUtils.readString(buffer);
     executionTime = ReadWriteIOUtils.readLong(buffer);
-    cqToken = ReadWriteIOUtils.readString(buffer);
+    md5 = ReadWriteIOUtils.readString(buffer);
   }
 
   @Override
@@ -91,13 +92,11 @@ public class UpdateCQLastExecTimePlan extends 
ConfigPhysicalPlan {
       return false;
     }
     UpdateCQLastExecTimePlan that = (UpdateCQLastExecTimePlan) o;
-    return executionTime == that.executionTime
-        && cqId.equals(that.cqId)
-        && cqToken.equals(that.cqToken);
+    return executionTime == that.executionTime && cqId.equals(that.cqId) && 
md5.equals(that.md5);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), cqId, executionTime, cqToken);
+    return Objects.hash(super.hashCode(), cqId, executionTime, md5);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
index a90a21f285f..5726b3ce826 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
@@ -42,10 +42,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -59,15 +56,11 @@ public class CQManager {
 
   private final ReadWriteLock lock;
 
-  // Key: CQ id. Value: the local task and the metadata token it owns.
-  private final ConcurrentMap<String, LocallyScheduledCQ> locallyScheduledCQs;
-
   private ScheduledExecutorService executor;
 
   public CQManager(ConfigManager configManager) {
     this.configManager = configManager;
     this.lock = new ReentrantReadWriteLock();
-    this.locallyScheduledCQs = new ConcurrentHashMap<>();
     this.executor =
         IoTDBThreadPoolFactory.newScheduledThreadPool(
             CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName());
@@ -84,21 +77,14 @@ public class CQManager {
   }
 
   public TSStatus dropCQ(TDropCQReq req) {
-    lock.readLock().lock();
     try {
-      TSStatus status = configManager.getConsensusManager().write(new 
DropCQPlan(req.cqId));
-      if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        cancelLocallyScheduledCQ(req.cqId);
-      }
-      return status;
+      return configManager.getConsensusManager().write(new 
DropCQPlan(req.cqId));
     } catch (ConsensusException e) {
       LOGGER.warn("Unexpected error happened while dropping cq {}: ", 
req.cqId, e);
       // consensus layer related errors
       TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
       res.setMessage(e.getMessage());
       return res;
-    } finally {
-      lock.readLock().unlock();
     }
   }
 
@@ -131,7 +117,6 @@ public class CQManager {
     try {
       // 1. shutdown previous cq schedule thread pool
       try {
-        cancelAllLocallyScheduledCQs();
         if (executor != null) {
           executor.shutdown();
         }
@@ -169,15 +154,7 @@ public class CQManager {
         for (CQInfo.CQEntry entry : allCQs) {
           if (entry.getState() == CQState.ACTIVE) {
             CQScheduleTask cqScheduleTask = new CQScheduleTask(entry, 
executor, configManager);
-            if (!markCQLocallyScheduled(entry.getCqId(), entry.getCqToken(), 
cqScheduleTask)) {
-              continue;
-            }
-            try {
-              cqScheduleTask.submitSelf();
-            } catch (RuntimeException e) {
-              unmarkCQLocallyScheduled(entry.getCqId(), entry.getCqToken());
-              throw e;
-            }
+            cqScheduleTask.submitSelf();
           }
         }
       }
@@ -197,7 +174,6 @@ public class CQManager {
     try {
       previous = executor;
       executor = null;
-      cancelAllLocallyScheduledCQs();
     } finally {
       lock.writeLock().unlock();
     }
@@ -205,78 +181,4 @@ public class CQManager {
       previous.shutdown();
     }
   }
-
-  public boolean markCQLocallyScheduled(String cqId, String cqToken, 
CQScheduleTask task) {
-    AtomicBoolean shouldSchedule = new AtomicBoolean(false);
-    LocallyScheduledCQ schedule = new LocallyScheduledCQ(cqToken, task);
-    lock.readLock().lock();
-    try {
-      locallyScheduledCQs.compute(
-          cqId,
-          (ignored, previousSchedule) -> {
-            if (previousSchedule != null && 
previousSchedule.hasToken(cqToken)) {
-              return previousSchedule;
-            }
-            if (previousSchedule != null) {
-              previousSchedule.cancel();
-            }
-            shouldSchedule.set(true);
-            return schedule;
-          });
-      if (!shouldSchedule.get()) {
-        task.cancel();
-      }
-      return shouldSchedule.get();
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  public void unmarkCQLocallyScheduled(String cqId, String cqToken) {
-    lock.readLock().lock();
-    try {
-      locallyScheduledCQs.computeIfPresent(
-          cqId,
-          (ignored, schedule) -> {
-            if (schedule.hasToken(cqToken)) {
-              schedule.cancel();
-              return null;
-            }
-            return schedule;
-          });
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  private void cancelLocallyScheduledCQ(String cqId) {
-    LocallyScheduledCQ schedule = locallyScheduledCQs.remove(cqId);
-    if (schedule != null) {
-      schedule.cancel();
-    }
-  }
-
-  private void cancelAllLocallyScheduledCQs() {
-    locallyScheduledCQs.values().forEach(LocallyScheduledCQ::cancel);
-    locallyScheduledCQs.clear();
-  }
-
-  private static class LocallyScheduledCQ {
-
-    private final String cqToken;
-    private final CQScheduleTask task;
-
-    private LocallyScheduledCQ(String cqToken, CQScheduleTask task) {
-      this.cqToken = cqToken;
-      this.task = task;
-    }
-
-    private boolean hasToken(String cqToken) {
-      return this.cqToken.equals(cqToken);
-    }
-
-    private void cancel() {
-      task.cancel();
-    }
-  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
index d85d0f12a30..6125edc1ef7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
@@ -39,10 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 public class CQScheduleTask implements Runnable {
 
@@ -72,7 +69,7 @@ public class CQScheduleTask implements Runnable {
   private final long endTimeOffset;
   private final TimeoutPolicy timeoutPolicy;
   private final String queryBody;
-  private final String cqToken;
+  private final String md5;
 
   private final String zoneId;
 
@@ -84,15 +81,12 @@ public class CQScheduleTask implements Runnable {
 
   private final long retryWaitTimeInMS;
 
-  private final AtomicBoolean cancelled;
-  private final AtomicReference<ScheduledFuture<?>> scheduledFuture;
-
   private long executionTime;
 
   public CQScheduleTask(
       TCreateCQReq req,
       long firstExecutionTime,
-      String cqToken,
+      String md5,
       ScheduledExecutorService executor,
       ConfigManager configManager) {
     this(
@@ -102,7 +96,7 @@ public class CQScheduleTask implements Runnable {
         req.endTimeOffset,
         TimeoutPolicy.deserialize(req.timeoutPolicy),
         req.queryBody,
-        cqToken,
+        md5,
         req.zoneId,
         req.username,
         executor,
@@ -119,7 +113,7 @@ public class CQScheduleTask implements Runnable {
         entry.getEndTimeOffset(),
         entry.getTimeoutPolicy(),
         entry.getQueryBody(),
-        entry.getCqToken(),
+        entry.getMd5(),
         entry.getZoneId(),
         entry.getUsername(),
         executor,
@@ -135,7 +129,7 @@ public class CQScheduleTask implements Runnable {
       long endTimeOffset,
       TimeoutPolicy timeoutPolicy,
       String queryBody,
-      String cqToken,
+      String md5,
       String zoneId,
       String username,
       ScheduledExecutorService executor,
@@ -147,14 +141,12 @@ public class CQScheduleTask implements Runnable {
     this.endTimeOffset = endTimeOffset;
     this.timeoutPolicy = timeoutPolicy;
     this.queryBody = queryBody;
-    this.cqToken = cqToken;
+    this.md5 = md5;
     this.zoneId = zoneId;
     this.username = username;
     this.executor = executor;
     this.configManager = configManager;
     this.retryWaitTimeInMS = Math.min(DEFAULT_RETRY_WAIT_TIME_IN_MS, 
everyInterval / FACTOR);
-    this.cancelled = new AtomicBoolean(false);
-    this.scheduledFuture = new AtomicReference<>();
     this.executionTime = executionTime;
   }
 
@@ -173,9 +165,6 @@ public class CQScheduleTask implements Runnable {
 
   @Override
   public void run() {
-    if (cancelled.get()) {
-      return;
-    }
     long startTime = executionTime - startTimeOffset;
     long endTime = executionTime - endTimeOffset;
 
@@ -188,9 +177,6 @@ public class CQScheduleTask implements Runnable {
         submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS);
       }
     } else {
-      if (cancelled.get()) {
-        return;
-      }
       LOGGER.info(
           "[StartExecuteCQ] execute CQ {} on DataNode[{}], time range is [{}, 
{}), current time is {}",
           cqId,
@@ -220,32 +206,12 @@ public class CQScheduleTask implements Runnable {
   }
 
   private void submitSelf(long delay, TimeUnit unit) {
-    if (cancelled.get()) {
-      return;
-    }
-    ScheduledFuture<?> newFuture = executor.schedule(this, delay, unit);
-    ScheduledFuture<?> previousFuture = scheduledFuture.getAndSet(newFuture);
-    if (previousFuture != null) {
-      previousFuture.cancel(false);
-    }
-    if (cancelled.get() && scheduledFuture.compareAndSet(newFuture, null)) {
-      newFuture.cancel(false);
-    }
-  }
-
-  public void cancel() {
-    cancelled.set(true);
-    ScheduledFuture<?> currentFuture = scheduledFuture.getAndSet(null);
-    if (currentFuture != null) {
-      currentFuture.cancel(false);
-    }
+    executor.schedule(this, delay, unit);
   }
 
   private boolean needSubmit() {
     // current node is still leader and thread pool is not shut down.
-    return !cancelled.get()
-        && configManager.getConsensusManager().isLeader()
-        && !executor.isShutdown();
+    return configManager.getConsensusManager().isLeader() && 
!executor.isShutdown();
   }
 
   private class AsyncExecuteCQCallback implements 
AsyncMethodCallback<TSStatus> {
@@ -272,9 +238,6 @@ public class CQScheduleTask implements Runnable {
 
     @Override
     public void onComplete(TSStatus response) {
-      if (cancelled.get()) {
-        return;
-      }
       if (response.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
 
         LOGGER.info(
@@ -288,7 +251,7 @@ public class CQScheduleTask implements Runnable {
           result =
               configManager
                   .getConsensusManager()
-                  .write(new UpdateCQLastExecTimePlan(cqId, executionTime, 
cqToken));
+                  .write(new UpdateCQLastExecTimePlan(cqId, executionTime, 
md5));
         } catch (ConsensusException e) {
           result = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
           result.setMessage(e.getMessage());
@@ -328,9 +291,6 @@ public class CQScheduleTask implements Runnable {
 
     @Override
     public void onError(Exception exception) {
-      if (cancelled.get()) {
-        return;
-      }
       LOGGER.warn("Execute CQ {} failed", cqId, exception);
       if (needSubmit()) {
         submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
index b24f1ca9aa5..4c1225ef48a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.cq.CQState;
 import org.apache.iotdb.commons.cq.TimeoutPolicy;
 import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
-import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
@@ -45,9 +44,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -64,7 +61,7 @@ public class CQInfo implements SnapshotProcessor {
 
   private static final String CQ_NOT_EXIST_FORMAT = "CQ %s doesn't exist.";
 
-  private static final String CQ_TOKEN_NOT_MATCH_FORMAT = "Token of CQ %s 
doesn't match";
+  private static final String MD5_NOT_MATCH_FORMAT = "MD5 of CQ %s doesn't 
match";
 
   private final Map<String, CQEntry> cqMap;
 
@@ -94,7 +91,7 @@ public class CQInfo implements SnapshotProcessor {
         CQEntry cqEntry =
             new CQEntry(
                 plan.getReq(),
-                plan.getCqToken(),
+                plan.getMd5(),
                 plan.getFirstExecutionTime() - plan.getReq().everyInterval);
         cqMap.put(cqId, cqEntry);
         res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode();
@@ -108,13 +105,13 @@ public class CQInfo implements SnapshotProcessor {
   /**
    * Drop the CQ whose ID is same as <tt>cqId</tt> in plan.
    *
-   * @return SUCCESS_STATUS if there is CQ whose ID and token is same as 
<tt>cqId</tt> in plan,
+   * @return SUCCESS_STATUS if there is CQ whose ID and md5 is same as 
<tt>cqId</tt> in plan,
    *     otherwise NO_SUCH_CQ.
    */
   public TSStatus dropCQ(DropCQPlan plan) {
     TSStatus res = new TSStatus();
     String cqId = plan.getCqId();
-    Optional<String> cqToken = plan.getCqToken();
+    Optional<String> md5 = plan.getMd5();
     lock.writeLock().lock();
     try {
       CQEntry cqEntry = cqMap.get(cqId);
@@ -122,10 +119,10 @@ public class CQInfo implements SnapshotProcessor {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
         res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId);
         LOGGER.warn("Drop CQ {} failed, because it doesn't exist.", cqId);
-      } else if ((cqToken.isPresent() && 
!cqToken.get().equals(cqEntry.cqToken))) {
+      } else if ((md5.isPresent() && !md5.get().equals(cqEntry.md5))) {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
-        res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId);
-        LOGGER.warn("Drop CQ {} failed, because its token doesn't match.", 
cqId);
+        res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId);
+        LOGGER.warn("Drop CQ {} failed, because its MD5 doesn't match.", cqId);
       } else {
         cqMap.remove(cqId);
         res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode();
@@ -138,24 +135,11 @@ public class CQInfo implements SnapshotProcessor {
   }
 
   public ShowCQResp showCQ() {
-    return showCQ(new ShowCQPlan());
-  }
-
-  public ShowCQResp showCQ(ShowCQPlan plan) {
     lock.readLock().lock();
     try {
-      Optional<String> cqId = plan.getCqId();
-      List<CQEntry> cqList;
-      if (cqId.isPresent()) {
-        CQEntry cqEntry = cqMap.get(cqId.get());
-        cqList =
-            cqEntry == null
-                ? Collections.emptyList()
-                : Collections.singletonList(new CQEntry(cqEntry));
-      } else {
-        cqList = 
cqMap.values().stream().map(CQEntry::new).collect(Collectors.toList());
-      }
-      return new ShowCQResp(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), cqList);
+      return new ShowCQResp(
+          new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+          
cqMap.values().stream().map(CQEntry::new).collect(Collectors.toList()));
     } finally {
       lock.readLock().unlock();
     }
@@ -169,16 +153,16 @@ public class CQInfo implements SnapshotProcessor {
   public TSStatus activeCQ(ActiveCQPlan plan) {
     TSStatus res = new TSStatus();
     String cqId = plan.getCqId();
-    String cqToken = plan.getCqToken();
+    String md5 = plan.getMd5();
     lock.writeLock().lock();
     try {
       CQEntry cqEntry = cqMap.get(cqId);
       if (cqEntry == null) {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
         res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId);
-      } else if (!cqToken.equals(cqEntry.cqToken)) {
+      } else if (!md5.equals(cqEntry.md5)) {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
-        res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId);
+        res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId);
       } else if (cqEntry.state == CQState.ACTIVE) {
         res.code = TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode();
         res.message = String.format("CQ %s has already been active", cqId);
@@ -196,22 +180,22 @@ public class CQInfo implements SnapshotProcessor {
    * Update the last execution time of the corresponding CQ.
    *
    * @return SUCCESS_STATUS if successfully updated, or NO_SUCH_CQ if 1. the 
CQ doesn't exist; or 2.
-   *     token is different. or CQ_UPDATE_LAST_EXEC_TIME_FAILED 3. original 
lastExecutionTime >=
+   *     md5 is different. or CQ_UPDATE_LAST_EXEC_TIME_FAILED 3. original 
lastExecutionTime >=
    *     current lastExecutionTime;
    */
   public TSStatus updateCQLastExecutionTime(UpdateCQLastExecTimePlan plan) {
     TSStatus res = new TSStatus();
     String cqId = plan.getCqId();
-    String cqToken = plan.getCqToken();
+    String md5 = plan.getMd5();
     lock.writeLock().lock();
     try {
       CQEntry cqEntry = cqMap.get(cqId);
       if (cqEntry == null) {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
         res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId);
-      } else if (!cqToken.equals(cqEntry.cqToken)) {
+      } else if (!md5.equals(cqEntry.md5)) {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
-        res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId);
+        res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId);
       } else if (cqEntry.lastExecutionTime >= plan.getExecutionTime()) {
         res.code = TSStatusCode.CQ_UPDATE_LAST_EXEC_TIME_ERROR.getStatusCode();
         res.message =
@@ -315,7 +299,7 @@ public class CQInfo implements SnapshotProcessor {
     private final TimeoutPolicy timeoutPolicy;
     private final String queryBody;
     private final String sql;
-    private final String cqToken;
+    private final String md5;
 
     private final String zoneId;
 
@@ -324,7 +308,7 @@ public class CQInfo implements SnapshotProcessor {
     private CQState state;
     private long lastExecutionTime;
 
-    private CQEntry(TCreateCQReq req, String cqToken, long lastExecutionTime) {
+    private CQEntry(TCreateCQReq req, String md5, long lastExecutionTime) {
       this(
           req.cqId,
           req.everyInterval,
@@ -334,7 +318,7 @@ public class CQInfo implements SnapshotProcessor {
           TimeoutPolicy.deserialize(req.timeoutPolicy),
           req.queryBody,
           req.sql,
-          cqToken,
+          md5,
           req.zoneId,
           req.username,
           CQState.INACTIVE,
@@ -351,7 +335,7 @@ public class CQInfo implements SnapshotProcessor {
           other.timeoutPolicy,
           other.queryBody,
           other.sql,
-          other.cqToken,
+          other.md5,
           other.zoneId,
           other.username,
           other.state,
@@ -368,7 +352,7 @@ public class CQInfo implements SnapshotProcessor {
         TimeoutPolicy timeoutPolicy,
         String queryBody,
         String sql,
-        String cqToken,
+        String md5,
         String zoneId,
         String username,
         CQState state,
@@ -381,7 +365,7 @@ public class CQInfo implements SnapshotProcessor {
       this.timeoutPolicy = timeoutPolicy;
       this.queryBody = queryBody;
       this.sql = sql;
-      this.cqToken = cqToken;
+      this.md5 = md5;
       this.zoneId = zoneId;
       this.username = username;
       this.state = state;
@@ -397,7 +381,7 @@ public class CQInfo implements SnapshotProcessor {
       ReadWriteIOUtils.write(timeoutPolicy.getType(), stream);
       ReadWriteIOUtils.write(queryBody, stream);
       ReadWriteIOUtils.write(sql, stream);
-      ReadWriteIOUtils.write(cqToken, stream);
+      ReadWriteIOUtils.write(md5, stream);
       ReadWriteIOUtils.write(zoneId, stream);
       ReadWriteIOUtils.write(username, stream);
       ReadWriteIOUtils.write(state.getType(), stream);
@@ -413,7 +397,7 @@ public class CQInfo implements SnapshotProcessor {
       TimeoutPolicy timeoutPolicy = 
TimeoutPolicy.deserialize(ReadWriteIOUtils.readByte(stream));
       String queryBody = ReadWriteIOUtils.readString(stream);
       String sql = ReadWriteIOUtils.readString(stream);
-      String cqToken = ReadWriteIOUtils.readString(stream);
+      String md5 = ReadWriteIOUtils.readString(stream);
       String zoneId = ReadWriteIOUtils.readString(stream);
       String username = ReadWriteIOUtils.readString(stream);
       CQState state = CQState.deserialize(ReadWriteIOUtils.readByte(stream));
@@ -427,7 +411,7 @@ public class CQInfo implements SnapshotProcessor {
           timeoutPolicy,
           queryBody,
           sql,
-          cqToken,
+          md5,
           zoneId,
           username,
           state,
@@ -466,8 +450,8 @@ public class CQInfo implements SnapshotProcessor {
       return sql;
     }
 
-    public String getCqToken() {
-      return cqToken;
+    public String getMd5() {
+      return md5;
     }
 
     public CQState getState() {
@@ -504,7 +488,7 @@ public class CQInfo implements SnapshotProcessor {
           && timeoutPolicy == cqEntry.timeoutPolicy
           && Objects.equals(queryBody, cqEntry.queryBody)
           && Objects.equals(sql, cqEntry.sql)
-          && Objects.equals(cqToken, cqEntry.cqToken)
+          && Objects.equals(md5, cqEntry.md5)
           && Objects.equals(zoneId, cqEntry.zoneId)
           && Objects.equals(username, cqEntry.username)
           && state == cqEntry.state;
@@ -521,7 +505,7 @@ public class CQInfo implements SnapshotProcessor {
           timeoutPolicy,
           queryBody,
           sql,
-          cqToken,
+          md5,
           zoneId,
           username,
           state,
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 60ec748fb54..09017a841c7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -30,7 +30,6 @@ import 
org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan;
 import org.apache.iotdb.confignode.consensus.request.read.auth.AuthorReadPlan;
-import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
@@ -323,7 +322,7 @@ public class ConfigPlanExecutor {
       case GetSeriesSlotList:
         return partitionInfo.getSeriesSlotList((GetSeriesSlotListPlan) req);
       case SHOW_CQ:
-        return cqInfo.showCQ((ShowCQPlan) req);
+        return cqInfo.showCQ();
       case GetFunctionTable:
         return udfInfo.getUDFTable();
       case GetFunctionJar:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
index af7f968e8a5..3da60c5ad84 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
@@ -22,14 +22,10 @@ package org.apache.iotdb.confignode.procedure.impl.cq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
-import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
-import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp;
-import org.apache.iotdb.confignode.manager.cq.CQManager;
 import org.apache.iotdb.confignode.manager.cq.CQScheduleTask;
-import org.apache.iotdb.confignode.persistence.cq.CQInfo;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
@@ -39,6 +35,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,8 +44,6 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
-import java.util.Optional;
-import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static 
org.apache.iotdb.confignode.procedure.state.cq.CreateCQState.INACTIVE;
@@ -64,7 +59,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
 
   private TCreateCQReq req;
 
-  private String cqToken;
+  private String md5;
 
   private long firstExecutionTime;
 
@@ -79,7 +74,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
   public CreateCQProcedure(TCreateCQReq req, ScheduledExecutorService 
executor) {
     super();
     this.req = req;
-    this.cqToken = generateCQToken();
+    this.md5 = DigestUtils.md2Hex(req.cqId);
     this.executor = executor;
     this.firstExecutionTime =
         CQScheduleTask.getFirstExecutionTime(req.boundaryTime, 
req.everyInterval);
@@ -95,16 +90,12 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
           addCQ(env);
           return Flow.HAS_MORE_STATE;
         case INACTIVE:
-          submitScheduleTask(
-              env,
-              new CQScheduleTask(
-                  req, firstExecutionTime, cqToken, executor, 
env.getConfigManager()));
+          CQScheduleTask cqScheduleTask =
+              new CQScheduleTask(req, firstExecutionTime, md5, executor, 
env.getConfigManager());
+          cqScheduleTask.submitSelf();
           setNextState(SCHEDULED);
           break;
         case SCHEDULED:
-          if (isStateDeserialized()) {
-            recoverScheduledTask(env);
-          }
           activeCQ(env);
           return Flow.NO_MORE_STATE;
         default:
@@ -134,7 +125,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
       res =
           env.getConfigManager()
               .getConsensusManager()
-              .write(new AddCQPlan(req, cqToken, firstExecutionTime));
+              .write(new AddCQPlan(req, md5, firstExecutionTime));
     } catch (ConsensusException e) {
       LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
       res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
@@ -155,7 +146,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
   private void activeCQ(ConfigNodeProcedureEnv env) {
     TSStatus res;
     try {
-      res = env.getConfigManager().getConsensusManager().write(new 
ActiveCQPlan(req.cqId, cqToken));
+      res = env.getConfigManager().getConsensusManager().write(new 
ActiveCQPlan(req.cqId, md5));
     } catch (ConsensusException e) {
       LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
       res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
@@ -173,42 +164,6 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
     }
   }
 
-  void recoverScheduledTask(ConfigNodeProcedureEnv env) throws 
ConsensusException {
-    Optional<CQInfo.CQEntry> cqEntry = getCurrentCQEntry(env);
-    if (!cqEntry.isPresent()) {
-      LOGGER.info(
-          "Skip recovering the schedule task of CQ {} because its metadata is 
unavailable.",
-          req.cqId);
-      return;
-    }
-    submitScheduleTask(env, new CQScheduleTask(cqEntry.get(), executor, 
env.getConfigManager()));
-  }
-
-  Optional<CQInfo.CQEntry> getCurrentCQEntry(ConfigNodeProcedureEnv env) 
throws ConsensusException {
-    ShowCQResp response =
-        (ShowCQResp) env.getConfigManager().getConsensusManager().read(new 
ShowCQPlan(req.cqId));
-    return response.getCqList().stream()
-        .filter(entry -> cqToken.equals(entry.getCqToken()))
-        .findFirst();
-  }
-
-  private static String generateCQToken() {
-    return UUID.randomUUID().toString();
-  }
-
-  private void submitScheduleTask(ConfigNodeProcedureEnv env, CQScheduleTask 
cqScheduleTask) {
-    CQManager cqManager = env.getConfigManager().getCQManager();
-    if (!cqManager.markCQLocallyScheduled(req.cqId, cqToken, cqScheduleTask)) {
-      return;
-    }
-    try {
-      cqScheduleTask.submitSelf();
-    } catch (RuntimeException e) {
-      cqManager.unmarkCQLocallyScheduled(req.cqId, cqToken);
-      throw e;
-    }
-  }
-
   @Override
   protected void rollbackState(ConfigNodeProcedureEnv env, CreateCQState state)
       throws IOException, InterruptedException, ProcedureException {
@@ -221,8 +176,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
         LOGGER.info("Start [INACTIVE] rollback of CQ {}", req.cqId);
         TSStatus res;
         try {
-          res =
-              env.getConfigManager().getConsensusManager().write(new 
DropCQPlan(req.cqId, cqToken));
+          res = env.getConfigManager().getConsensusManager().write(new 
DropCQPlan(req.cqId, md5));
         } catch (ConsensusException e) {
           LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
           res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
@@ -273,7 +227,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
     stream.writeShort(ProcedureType.CREATE_CQ_PROCEDURE.getTypeCode());
     super.serialize(stream);
     ThriftCommonsSerDeUtils.serializeTCreateCQReq(req, stream);
-    ReadWriteIOUtils.write(cqToken, stream);
+    ReadWriteIOUtils.write(md5, stream);
     ReadWriteIOUtils.write(firstExecutionTime, stream);
   }
 
@@ -281,7 +235,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
   public void deserialize(ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
     this.req = ThriftCommonsSerDeUtils.deserializeTCreateCQReq(byteBuffer);
-    this.cqToken = ReadWriteIOUtils.readString(byteBuffer);
+    this.md5 = ReadWriteIOUtils.readString(byteBuffer);
     this.firstExecutionTime = ReadWriteIOUtils.readLong(byteBuffer);
   }
 
@@ -300,7 +254,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
         && isGeneratedByPipe == that.isGeneratedByPipe
         && firstExecutionTime == that.firstExecutionTime
         && Objects.equals(req, that.req)
-        && Objects.equals(cqToken, that.cqToken);
+        && Objects.equals(md5, that.md5);
   }
 
   @Override
@@ -311,15 +265,7 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
         getCycles(),
         isGeneratedByPipe,
         req,
-        cqToken,
+        md5,
         firstExecutionTime);
   }
-
-  public String getCqId() {
-    return req == null ? null : req.getCqId();
-  }
-
-  public String getCqToken() {
-    return cqToken;
-  }
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 1c73fe323ce..3b0554baa11 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -1213,7 +1213,7 @@ public class ConfigPhysicalPlanSerDeTest {
 
   @Test
   public void ActiveCQPlanTest() throws IOException {
-    ActiveCQPlan activeCQPlan0 = new ActiveCQPlan("testCq", "testCqToken");
+    ActiveCQPlan activeCQPlan0 = new ActiveCQPlan("testCq", "testCq_md5");
     ActiveCQPlan activeCQPlan1 =
         (ActiveCQPlan) 
ConfigPhysicalPlan.Factory.create(activeCQPlan0.serializeToByteBuffer());
 
@@ -1236,7 +1236,7 @@ public class ConfigPhysicalPlanSerDeTest {
                 "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from 
root.sg.d1 END",
                 "Asia",
                 "root"),
-            "testCq1Token",
+            "testCq1_md5",
             executionTime);
     AddCQPlan addCQPlan1 =
         (AddCQPlan) 
ConfigPhysicalPlan.Factory.create(addCQPlan0.serializeToByteBuffer());
@@ -1251,7 +1251,7 @@ public class ConfigPhysicalPlanSerDeTest {
         (DropCQPlan) 
ConfigPhysicalPlan.Factory.create(dropCQPlan0.serializeToByteBuffer());
     Assert.assertEquals(dropCQPlan0, dropCQPlan1);
 
-    dropCQPlan0 = new DropCQPlan("testCq1", "testCq1Token");
+    dropCQPlan0 = new DropCQPlan("testCq1", "testCq1_md5");
     dropCQPlan1 =
         (DropCQPlan) 
ConfigPhysicalPlan.Factory.create(dropCQPlan0.serializeToByteBuffer());
     Assert.assertEquals(dropCQPlan0, dropCQPlan1);
@@ -1260,7 +1260,7 @@ public class ConfigPhysicalPlanSerDeTest {
   @Test
   public void UpdateCQLastExecTimePlanTest() throws IOException {
     UpdateCQLastExecTimePlan updateCQLastExecTimePlan0 =
-        new UpdateCQLastExecTimePlan("testCq", System.currentTimeMillis(), 
"testCqToken");
+        new UpdateCQLastExecTimePlan("testCq", System.currentTimeMillis(), 
"testCq_md5");
     UpdateCQLastExecTimePlan updateCQLastExecTimePlan1 =
         (UpdateCQLastExecTimePlan)
             
ConfigPhysicalPlan.Factory.create(updateCQLastExecTimePlan0.serializeToByteBuffer());
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java
deleted file mode 100644
index a0bc5a523ba..00000000000
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.cq;
-
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.cq.TimeoutPolicy;
-import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
-import org.apache.iotdb.confignode.manager.cq.CQManager;
-import org.apache.iotdb.confignode.manager.cq.CQScheduleTask;
-import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertTrue;
-
-public class CQManagerTest {
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void dropCQShouldCancelLocallyScheduledTask() throws Exception {
-    ConfigManager configManager = Mockito.mock(ConfigManager.class);
-    ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);
-    
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
-    Mockito.when(consensusManager.write(Mockito.any()))
-        .thenReturn(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
-    CQManager cqManager = new CQManager(configManager);
-    ScheduledFuture<?> future = Mockito.mock(ScheduledFuture.class);
-    CQScheduleTask task = newScheduledTask(configManager, future, "token");
-
-    try {
-      assertTrue(cqManager.markCQLocallyScheduled("testCq", "token", task));
-      task.submitSelf();
-      cqManager.dropCQ(new TDropCQReq("testCq"));
-
-      Mockito.verify(future).cancel(false);
-    } finally {
-      cqManager.stopCQScheduler();
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void newTokenShouldCancelPreviousLocallyScheduledTask() {
-    ConfigManager configManager = Mockito.mock(ConfigManager.class);
-    CQManager cqManager = new CQManager(configManager);
-    ScheduledFuture<?> previousFuture = Mockito.mock(ScheduledFuture.class);
-    CQScheduleTask previousTask = newScheduledTask(configManager, 
previousFuture, "previousToken");
-    ScheduledFuture<?> currentFuture = Mockito.mock(ScheduledFuture.class);
-    CQScheduleTask currentTask = newScheduledTask(configManager, 
currentFuture, "currentToken");
-
-    try {
-      assertTrue(cqManager.markCQLocallyScheduled("testCq", "previousToken", 
previousTask));
-      previousTask.submitSelf();
-      assertTrue(cqManager.markCQLocallyScheduled("testCq", "currentToken", 
currentTask));
-
-      Mockito.verify(previousFuture).cancel(false);
-    } finally {
-      cqManager.stopCQScheduler();
-    }
-  }
-
-  @SuppressWarnings("unchecked")
-  private CQScheduleTask newScheduledTask(
-      ConfigManager configManager, ScheduledFuture<?> scheduledFuture, String 
cqToken) {
-    ScheduledExecutorService executor = 
Mockito.mock(ScheduledExecutorService.class);
-    Mockito.when(
-            executor.schedule(
-                Mockito.any(Runnable.class), Mockito.anyLong(), 
Mockito.any(TimeUnit.class)))
-        .thenReturn((ScheduledFuture) scheduledFuture);
-    return new CQScheduleTask(
-        "testCq",
-        1000,
-        0,
-        1000,
-        TimeoutPolicy.BLOCKED,
-        "select s1 into root.backup.d1.s1 from root.sg.d1",
-        cqToken,
-        "Asia",
-        "root",
-        executor,
-        configManager,
-        System.currentTimeMillis() + 10_000);
-  }
-}
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
index 8bf2d12cd59..bfadd3d05e8 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
@@ -18,14 +18,9 @@
  */
 package org.apache.iotdb.confignode.persistence;
 
-import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
-import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
-import 
org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan;
-import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp;
 import org.apache.iotdb.confignode.persistence.cq.CQInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
-import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.thrift.TException;
@@ -75,7 +70,7 @@ public class CQInfoTest {
                 "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from 
root.sg.d1 END",
                 "Asia",
                 "root"),
-            "testCq1Token",
+            "testCq1_md5",
             executionTime);
 
     cqInfo.addCQ(addCQPlan);
@@ -94,7 +89,7 @@ public class CQInfoTest {
                 "create cq testCq2 BEGIN select s1 into root.backup.d2.s1 from 
root.sg.d2 END",
                 "Asia",
                 "root"),
-            "testCq2Token",
+            "testCq2_md5",
             executionTime);
     cqInfo.addCQ(addCQPlan);
 
@@ -104,59 +99,4 @@ public class CQInfoTest {
 
     Assert.assertEquals(cqInfo, actualCQInfo);
   }
-
-  @Test
-  public void testOldCallbackCannotTouchRecreatedCQ() throws Exception {
-    long executionTime = System.currentTimeMillis();
-    TCreateCQReq req =
-        new TCreateCQReq(
-            "testCq3",
-            1000,
-            0,
-            1000,
-            0,
-            (byte) 0,
-            "select s1 into root.backup.d3.s1 from root.sg.d3",
-            "create cq testCq3 BEGIN select s1 into root.backup.d3.s1 from 
root.sg.d3 END",
-            "Asia",
-            "root");
-
-    cqInfo.addCQ(new AddCQPlan(req, "oldToken", executionTime));
-    cqInfo.dropCQ(new DropCQPlan("testCq3"));
-    cqInfo.addCQ(new AddCQPlan(req, "newToken", executionTime));
-
-    Assert.assertEquals(
-        TSStatusCode.NO_SUCH_CQ.getStatusCode(),
-        cqInfo.updateCQLastExecutionTime(
-                new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, 
"oldToken"))
-            .code);
-    Assert.assertEquals(
-        TSStatusCode.SUCCESS_STATUS.getStatusCode(),
-        cqInfo.updateCQLastExecutionTime(
-                new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, 
"newToken"))
-            .code);
-  }
-
-  @Test
-  public void testShowCQCanFilterByCQId() throws Exception {
-    long executionTime = System.currentTimeMillis();
-    TCreateCQReq req =
-        new TCreateCQReq(
-            "testCq4",
-            1000,
-            0,
-            1000,
-            0,
-            (byte) 0,
-            "select s1 into root.backup.d4.s1 from root.sg.d4",
-            "create cq testCq4 BEGIN select s1 into root.backup.d4.s1 from 
root.sg.d4 END",
-            "Asia",
-            "root");
-    cqInfo.addCQ(new AddCQPlan(req, "testCq4Token", executionTime));
-
-    ShowCQResp showCQResp = cqInfo.showCQ(new ShowCQPlan("testCq4"));
-
-    Assert.assertEquals(1, showCQResp.getCqList().size());
-    Assert.assertEquals("testCq4", showCQResp.getCqList().get(0).getCqId());
-  }
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
index 3e7fd2052ad..d0e92b32816 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
@@ -36,36 +36,10 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 
 public class CreateCQProcedureTest {
 
-  @Test
-  public void tokenShouldBeUniqueForSameCQId() {
-    ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
-    try {
-      TCreateCQReq req =
-          new TCreateCQReq(
-              "testCq1",
-              1000,
-              0,
-              1000,
-              0,
-              (byte) 0,
-              "select s1 into root.backup.d1(s1) from root.sg.d1",
-              "create cq testCq1 BEGIN select s1 into root.backup.d1(s1) from 
root.sg.d1 END",
-              "Asia",
-              "root");
-      CreateCQProcedure createCQProcedure1 = new CreateCQProcedure(req, 
executor);
-      CreateCQProcedure createCQProcedure2 = new CreateCQProcedure(req, 
executor);
-
-      assertNotEquals(createCQProcedure1.getCqToken(), 
createCQProcedure2.getCqToken());
-    } finally {
-      executor.shutdown();
-    }
-  }
-
   @Test
   public void serializeDeserializeTest() {
 
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java
deleted file mode 100644
index a90e282494f..00000000000
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.procedure.impl.cq;
-
-import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
-import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
-import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
-import org.apache.iotdb.confignode.manager.cq.CQManager;
-import org.apache.iotdb.confignode.manager.cq.CQScheduleTask;
-import org.apache.iotdb.confignode.persistence.cq.CQInfo;
-import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
-
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-public class CreateCQProcedureRecoveryTest {
-
-  private TCreateCQReq newCreateCQReq() {
-    return new TCreateCQReq(
-        "testCq1",
-        1000,
-        0,
-        1000,
-        0,
-        (byte) 0,
-        "select s1 into root.backup.d1.s1 from root.sg.d1",
-        "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from 
root.sg.d1 END",
-        "Asia",
-        "root");
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void recoverScheduledTaskShouldResubmitFromLatestMetadata() throws 
Exception {
-    ScheduledExecutorService executor = 
Mockito.mock(ScheduledExecutorService.class);
-    Mockito.when(
-            executor.schedule(
-                Mockito.any(Runnable.class), Mockito.anyLong(), 
Mockito.any(TimeUnit.class)))
-        .thenReturn(Mockito.mock(ScheduledFuture.class));
-
-    ConfigManager configManager = Mockito.mock(ConfigManager.class);
-    ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);
-    CQManager cqManager = Mockito.mock(CQManager.class);
-    ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class);
-    Mockito.when(env.getConfigManager()).thenReturn(configManager);
-    
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
-    Mockito.when(configManager.getCQManager()).thenReturn(cqManager);
-    Mockito.when(
-            cqManager.markCQLocallyScheduled(
-                Mockito.anyString(), Mockito.anyString(), 
Mockito.any(CQScheduleTask.class)))
-        .thenReturn(true);
-
-    TCreateCQReq req = newCreateCQReq();
-    CreateCQProcedure procedure = new CreateCQProcedure(req, executor);
-
-    CQInfo cqInfo = new CQInfo();
-    cqInfo.addCQ(new AddCQPlan(req, procedure.getCqToken(), 
System.currentTimeMillis() + 10_000));
-    
Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ());
-
-    procedure.recoverScheduledTask(env);
-
-    Mockito.verify(executor)
-        .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), 
Mockito.any(TimeUnit.class));
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
-  public void recoverScheduledTaskShouldSkipDuplicatedLocalSchedule() throws 
Exception {
-    ScheduledExecutorService executor = 
Mockito.mock(ScheduledExecutorService.class);
-    ConfigManager configManager = Mockito.mock(ConfigManager.class);
-    ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);
-    CQManager cqManager = Mockito.mock(CQManager.class);
-    ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class);
-    Mockito.when(env.getConfigManager()).thenReturn(configManager);
-    
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
-    Mockito.when(configManager.getCQManager()).thenReturn(cqManager);
-    Mockito.when(
-            cqManager.markCQLocallyScheduled(
-                Mockito.anyString(), Mockito.anyString(), 
Mockito.any(CQScheduleTask.class)))
-        .thenReturn(false);
-
-    TCreateCQReq req = newCreateCQReq();
-    CreateCQProcedure procedure = new CreateCQProcedure(req, executor);
-
-    CQInfo cqInfo = new CQInfo();
-    cqInfo.addCQ(new AddCQPlan(req, procedure.getCqToken(), 
System.currentTimeMillis() + 10_000));
-    
Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ());
-
-    procedure.recoverScheduledTask(env);
-
-    Mockito.verify(executor, Mockito.never())
-        .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), 
Mockito.any(TimeUnit.class));
-  }
-}

Reply via email to