This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/sonarConfigNode
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2841bd8d5c09c5e711e22dbd7e1c3dcbe1651b37
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Jun 21 18:06:05 2023 +0800

    Fix sonar bugs and code smells in confignode module about cq
---
 .../consensus/request/write/cq/ActiveCQPlan.java   |  13 ++-
 .../consensus/request/write/cq/AddCQPlan.java      |  13 ++-
 .../consensus/request/write/cq/DropCQPlan.java     |  13 ++-
 .../consensus/request/write/cq/ShowCQPlan.java     |   5 +-
 .../request/write/cq/UpdateCQLastExecTimePlan.java |  13 ++-
 .../iotdb/confignode/manager/cq/CQManager.java     |   9 +-
 .../confignode/manager/cq/CQScheduleTask.java      |  13 ++-
 .../iotdb/confignode/persistence/cq/CQInfo.java    |  37 ++++--
 .../persistence/executor/ConfigPlanExecutor.java   |   3 +-
 .../procedure/impl/cq/CreateCQProcedure.java       | 129 +++++++++++----------
 .../procedure/state/cq/CreateCQState.java          |   1 +
 11 files changed, 152 insertions(+), 97 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
index 97dcfd1de4e..5346db2da1f 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.consensus.request.write.cq;
 
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
@@ -71,9 +72,15 @@ public class ActiveCQPlan extends ConfigPhysicalPlan {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    if (!super.equals(o)) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
     ActiveCQPlan that = (ActiveCQPlan) o;
     return cqId.equals(that.cqId) && md5.equals(that.md5);
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
index a2b01661d77..3cde53047c9 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.consensus.request.write.cq;
 
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
@@ -82,9 +83,15 @@ public class AddCQPlan extends ConfigPhysicalPlan {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    if (!super.equals(o)) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
     AddCQPlan addCQPlan = (AddCQPlan) o;
     return firstExecutionTime == addCQPlan.firstExecutionTime
         && Objects.equals(req, addCQPlan.req)
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
index 1e7a5a791c7..966b179bb1c 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.consensus.request.write.cq;
 
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
@@ -79,9 +80,15 @@ public class DropCQPlan extends ConfigPhysicalPlan {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    if (!super.equals(o)) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
     DropCQPlan that = (DropCQPlan) o;
     return cqId.equals(that.cqId) && Objects.equals(md5, that.md5);
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ShowCQPlan.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ShowCQPlan.java
index 558ae0e7efb..7c146238a5e 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ShowCQPlan.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ShowCQPlan.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.consensus.request.write.cq;
 
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
@@ -38,5 +39,7 @@ public class ShowCQPlan extends ConfigPhysicalPlan {
   }
 
   @Override
-  protected void deserializeImpl(ByteBuffer buffer) throws IOException {}
+  protected void deserializeImpl(ByteBuffer buffer) throws IOException {
+    // no customized field to deserialize from
+  }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
index 81630aaddc6..98dd9b35168 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.consensus.request.write.cq;
 
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
@@ -81,9 +82,15 @@ public class UpdateCQLastExecTimePlan extends 
ConfigPhysicalPlan {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
-    if (!super.equals(o)) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
     UpdateCQLastExecTimePlan that = (UpdateCQLastExecTimePlan) o;
     return executionTime == that.executionTime && cqId.equals(that.cqId) && 
md5.equals(that.md5);
   }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
index 2559b239ea0..e3c514ddd35 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.manager.cq;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -123,7 +124,7 @@ public class CQManager {
         if (executor != null) {
           executor.shutdown();
         }
-      } catch (Throwable t) {
+      } catch (Exception t) {
         // just print the error log because we should make sure we can start a 
new cq schedule pool
         // successfully in the next steps
         LOGGER.error("Error happened while shutting down previous cq schedule 
thread pool.", t);
@@ -146,7 +147,7 @@ public class CQManager {
         }
       }
       // keep fetching until we get all CQEntries if this node is still leader
-      while (allCQs == null && configManager.getConsensusManager().isLeader()) 
{
+      while (needFetch(allCQs)) {
         ConsensusReadResponse response = 
configManager.getConsensusManager().read(new ShowCQPlan());
         if (response.getDataset() != null) {
           allCQs = ((ShowCQResp) response.getDataset()).getCqList();
@@ -172,6 +173,10 @@ public class CQManager {
     }
   }
 
+  private boolean needFetch(List<CQInfo.CQEntry> allCQs) {
+    return allCQs == null && configManager.getConsensusManager().isLeader();
+  }
+
   public void stopCQScheduler() {
     ScheduledExecutorService previous;
     lock.writeLock().lock();
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
index b1f94f21a13..73de8bc54a6 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.manager.cq;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -103,6 +104,7 @@ public class CQScheduleTask implements Runnable {
         entry.getLastExecutionTime() + entry.getEveryInterval());
   }
 
+  @SuppressWarnings("squid:S107")
   public CQScheduleTask(
       String cqId,
       long everyInterval,
@@ -137,7 +139,6 @@ public class CQScheduleTask implements Runnable {
   }
 
   public static long getFirstExecutionTime(long boundaryTime, long 
everyInterval, long now) {
-    // TODO may need to consider nano precision
     if (now <= boundaryTime) {
       return boundaryTime;
     } else {
@@ -172,7 +173,7 @@ public class CQScheduleTask implements Runnable {
         AsyncDataNodeInternalServiceClient client =
             
AsyncDataNodeClientPool.getInstance().getAsyncClient(targetDataNode.get());
         client.executeCQ(executeCQReq, new AsyncExecuteCQCallback(startTime, 
endTime));
-      } catch (Throwable t) {
+      } catch (Exception t) {
         LOGGER.warn("Execute CQ {} failed", cqId, t);
         if (needSubmit()) {
           submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS);
@@ -185,15 +186,15 @@ public class CQScheduleTask implements Runnable {
     submitSelf(Math.max(0, executionTime - System.currentTimeMillis()), 
TimeUnit.MILLISECONDS);
   }
 
+  private void submitSelf(long delay, TimeUnit unit) {
+    executor.schedule(this, delay, unit);
+  }
+
   private boolean needSubmit() {
     // current node is still leader and thread pool is not shut down.
     return configManager.getConsensusManager().isLeader() && 
!executor.isShutdown();
   }
 
-  private void submitSelf(long delay, TimeUnit unit) {
-    executor.schedule(this, delay, unit);
-  }
-
   private class AsyncExecuteCQCallback implements 
AsyncMethodCallback<TSStatus> {
 
     private final long startTime;
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
index 328eef6d25a..e4d22cfe880 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.persistence.cq;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -25,7 +26,6 @@ import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
 import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
-import org.apache.iotdb.confignode.consensus.request.write.cq.ShowCQPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan;
 import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
@@ -59,6 +59,10 @@ public class CQInfo implements SnapshotProcessor {
 
   private static final String SNAPSHOT_FILENAME = "cq_info.snapshot";
 
+  private static final String CQ_NOT_EXIST_FORMAT = "CQ %s doesn't exist.";
+
+  private static final String MD5_NOT_MATCH_FORMAT = "MD5 of CQ %s doesn't 
match";
+
   private final Map<String, CQEntry> cqMap;
 
   private final ReadWriteLock lock;
@@ -113,11 +117,11 @@ public class CQInfo implements SnapshotProcessor {
       CQEntry cqEntry = cqMap.get(cqId);
       if (cqEntry == null) {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
-        res.message = String.format("CQ %s doesn't exist.", cqId);
+        res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId);
         LOGGER.warn("Drop CQ {} failed, because it doesn't exist.", cqId);
       } else if ((md5.isPresent() && !md5.get().equals(cqEntry.md5))) {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
-        res.message = String.format("MD5 of CQ %s doesn't match", cqId);
+        res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId);
         LOGGER.warn("Drop CQ {} failed, because its MD5 doesn't match.", cqId);
       } else {
         cqMap.remove(cqId);
@@ -130,7 +134,7 @@ public class CQInfo implements SnapshotProcessor {
     }
   }
 
-  public ShowCQResp showCQ(ShowCQPlan plan) {
+  public ShowCQResp showCQ() {
     lock.readLock().lock();
     try {
       return new ShowCQResp(
@@ -155,10 +159,10 @@ public class CQInfo implements SnapshotProcessor {
       CQEntry cqEntry = cqMap.get(cqId);
       if (cqEntry == null) {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
-        res.message = String.format("CQ %s doesn't exist.", cqId);
+        res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId);
       } else if (!md5.equals(cqEntry.md5)) {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
-        res.message = String.format("MD5 of CQ %s doesn't match", cqId);
+        res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId);
       } else if (cqEntry.state == CQState.ACTIVE) {
         res.code = TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode();
         res.message = String.format("CQ %s has already been active", cqId);
@@ -188,10 +192,10 @@ public class CQInfo implements SnapshotProcessor {
       CQEntry cqEntry = cqMap.get(cqId);
       if (cqEntry == null) {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
-        res.message = String.format("CQ %s doesn't exist.", cqId);
+        res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId);
       } else if (!md5.equals(cqEntry.md5)) {
         res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
-        res.message = String.format("MD5 of CQ %s doesn't match", cqId);
+        res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId);
       } else if (cqEntry.lastExecutionTime >= plan.getExecutionTime()) {
         res.code = TSStatusCode.CQ_UPDATE_LAST_EXEC_TIME_ERROR.getStatusCode();
         res.message =
@@ -270,8 +274,12 @@ public class CQInfo implements SnapshotProcessor {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
     CQInfo cqInfo = (CQInfo) o;
     return Objects.equals(cqMap, cqInfo.cqMap);
   }
@@ -333,6 +341,7 @@ public class CQInfo implements SnapshotProcessor {
           other.lastExecutionTime);
     }
 
+    @SuppressWarnings("squid:S107")
     private CQEntry(
         String cqId,
         long everyInterval,
@@ -462,8 +471,12 @@ public class CQInfo implements SnapshotProcessor {
 
     @Override
     public boolean equals(Object o) {
-      if (this == o) return true;
-      if (o == null || getClass() != o.getClass()) return false;
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
       CQEntry cqEntry = (CQEntry) o;
       return everyInterval == cqEntry.everyInterval
           && boundaryTime == cqEntry.boundaryTime
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index a6e7710db84..58eb4ebc0bf 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -53,7 +53,6 @@ import 
org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConf
 import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
 import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
-import org.apache.iotdb.confignode.consensus.request.write.cq.ShowCQPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.database.AdjustMaxRegionGroupNumPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
@@ -271,7 +270,7 @@ public class ConfigPlanExecutor {
       case GetSeriesSlotList:
         return partitionInfo.getSeriesSlotList((GetSeriesSlotListPlan) req);
       case SHOW_CQ:
-        return cqInfo.showCQ((ShowCQPlan) req);
+        return cqInfo.showCQ();
       case GetFunctionTable:
         return udfInfo.getUDFTable();
       case GetFunctionJar:
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
index f310d00c208..875db62e02d 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.procedure.impl.cq;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
@@ -81,38 +82,11 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
   @Override
   protected Flow executeFromState(ConfigNodeProcedureEnv env, CreateCQState 
state)
       throws ProcedureSuspendedException, ProcedureYieldException, 
InterruptedException {
-    ConsensusWriteResponse response;
-    TSStatus res;
+
     try {
       switch (state) {
         case INIT:
-          response =
-              env.getConfigManager()
-                  .getConsensusManager()
-                  .write(new AddCQPlan(req, md5, firstExecutionTime));
-          res = response.getStatus();
-          if (res != null) {
-            if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-              LOGGER.debug("Finish init CQ {} successfully", req.cqId);
-              setNextState(INACTIVE);
-            } else if (res.code == 
TSStatusCode.CQ_AlREADY_EXIST.getStatusCode()) {
-              LOGGER.info("Failed to init CQ {} because such cq already 
exists", req.cqId);
-              setFailure(new ProcedureException(new 
IoTDBException(res.message, res.code)));
-              return Flow.HAS_MORE_STATE;
-            } else {
-              LOGGER.warn("Failed to init CQ {} because of unknown reasons 
{}", req.cqId, res);
-              setFailure(new ProcedureException(new 
IoTDBException(res.message, res.code)));
-              return Flow.HAS_MORE_STATE;
-            }
-          } else {
-            LOGGER.warn(
-                "Failed to init CQ {} because of unexpected exception: ",
-                req.cqId,
-                response.getException());
-            setFailure(new ProcedureException(response.getException()));
-            return Flow.HAS_MORE_STATE;
-          }
-          break;
+          return addCQ(env);
         case INACTIVE:
           CQScheduleTask cqScheduleTask =
               new CQScheduleTask(req, firstExecutionTime, md5, executor, 
env.getConfigManager());
@@ -120,38 +94,12 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
           setNextState(SCHEDULED);
           break;
         case SCHEDULED:
-          response =
-              env.getConfigManager().getConsensusManager().write(new 
ActiveCQPlan(req.cqId, md5));
-          res = response.getStatus();
-          if (res != null) {
-            if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-              LOGGER.debug("Finish Scheduling CQ {} successfully", req.cqId);
-            } else if (res.code == TSStatusCode.NO_SUCH_CQ.getStatusCode()) {
-              LOGGER.warn(
-                  "Failed to active CQ {} because of no such cq, detailed 
error message is {}",
-                  req.cqId,
-                  res.message);
-            } else if (res.code == 
TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode()) {
-              LOGGER.warn(
-                  "Failed to active CQ {} because this cq has already been 
active", req.cqId);
-            } else {
-              LOGGER.warn(
-                  "Failed to active CQ {} successfully because of unknown 
reasons {}",
-                  req.cqId,
-                  res);
-            }
-          } else {
-            LOGGER.warn(
-                "Failed to active CQ {} successfully because of unexpected 
exception: ",
-                req.cqId,
-                response.getException());
-          }
-
+          activeCQ(env);
           return Flow.NO_MORE_STATE;
         default:
           throw new IllegalArgumentException("Unknown CreateCQState: " + 
state);
       }
-    } catch (Throwable t) {
+    } catch (Exception t) {
       if (isRollbackSupported(state)) {
         LOGGER.error("Fail in CreateCQProcedure", t);
         setFailure(new ProcedureException(t));
@@ -169,6 +117,59 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
     return Flow.HAS_MORE_STATE;
   }
 
+  private Flow addCQ(ConfigNodeProcedureEnv env) {
+    ConsensusWriteResponse response =
+        env.getConfigManager()
+            .getConsensusManager()
+            .write(new AddCQPlan(req, md5, firstExecutionTime));
+    TSStatus res = response.getStatus();
+    if (res != null) {
+      if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        LOGGER.debug("Finish init CQ {} successfully", req.cqId);
+        setNextState(INACTIVE);
+      } else if (res.code == TSStatusCode.CQ_AlREADY_EXIST.getStatusCode()) {
+        LOGGER.info("Failed to init CQ {} because such cq already exists", 
req.cqId);
+        setFailure(new ProcedureException(new IoTDBException(res.message, 
res.code)));
+        return Flow.HAS_MORE_STATE;
+      } else {
+        LOGGER.warn("Failed to init CQ {} because of unknown reasons {}", 
req.cqId, res);
+        setFailure(new ProcedureException(new IoTDBException(res.message, 
res.code)));
+        return Flow.HAS_MORE_STATE;
+      }
+    } else {
+      LOGGER.warn(
+          "Failed to init CQ {} because of unexpected exception: ",
+          req.cqId,
+          response.getException());
+      setFailure(new ProcedureException(response.getException()));
+      return Flow.HAS_MORE_STATE;
+    }
+    return Flow.NO_MORE_STATE;
+  }
+
+  private void activeCQ(ConfigNodeProcedureEnv env) {
+    ConsensusWriteResponse response =
+        env.getConfigManager().getConsensusManager().write(new 
ActiveCQPlan(req.cqId, md5));
+    TSStatus res = response.getStatus();
+    if (res != null) {
+      if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        LOGGER.debug("Finish Scheduling CQ {} successfully", req.cqId);
+      } else if (res.code == TSStatusCode.NO_SUCH_CQ.getStatusCode()) {
+        LOGGER.warn("Failed to active CQ {} because of no such cq: {}", 
req.cqId, res.message);
+      } else if (res.code == TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode()) {
+        LOGGER.warn("Failed to active CQ {} because this cq has already been 
active", req.cqId);
+      } else {
+        LOGGER.warn(
+            "Failed to active CQ {} successfully because of unknown reasons 
{}", req.cqId, res);
+      }
+    } else {
+      LOGGER.warn(
+          "Failed to active CQ {} successfully because of unexpected 
exception: ",
+          req.cqId,
+          response.getException());
+    }
+  }
+
   @Override
   protected void rollbackState(ConfigNodeProcedureEnv env, CreateCQState state)
       throws IOException, InterruptedException, ProcedureException {
@@ -187,18 +188,18 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
             LOGGER.info("Finish [INACTIVE] rollback of CQ {} successfully", 
req.cqId);
           } else if (res.code == TSStatusCode.NO_SUCH_CQ.getStatusCode()) {
             LOGGER.warn(
-                "Failed to do [INACTIVE] rollback of CQ {} because of no such 
cq, detailed error message is {}",
+                "Failed to do [INACTIVE] rollback of CQ {} because of no such 
cq: {}",
                 req.cqId,
                 res.message);
           } else {
             LOGGER.warn(
-                "Failed to do [INACTIVE] rollback of CQ {} successfully 
because of unknown reasons {}",
+                "Failed to do [INACTIVE] rollback of CQ {} because of unknown 
reasons {}",
                 req.cqId,
                 res);
           }
         } else {
           LOGGER.warn(
-              "Failed to do [INACTIVE] rollback of CQ {} successfully because 
of unexpected exception: ",
+              "Failed to do [INACTIVE] rollback of CQ {} because of unexpected 
exception: ",
               req.cqId,
               response.getException());
         }
@@ -248,8 +249,12 @@ public class CreateCQProcedure extends 
AbstractNodeProcedure<CreateCQState> {
 
   @Override
   public boolean equals(Object o) {
-    if (this == o) return true;
-    if (o == null || getClass() != o.getClass()) return false;
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
     CreateCQProcedure that = (CreateCQProcedure) o;
     return firstExecutionTime == that.firstExecutionTime
         && Objects.equals(req, that.req)
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/cq/CreateCQState.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/cq/CreateCQState.java
index f4446fe4950..62f7e6ef9e9 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/cq/CreateCQState.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/cq/CreateCQState.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.confignode.procedure.state.cq;
 
 public enum CreateCQState {

Reply via email to