This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/sonarConfigNode in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2841bd8d5c09c5e711e22dbd7e1c3dcbe1651b37 Author: JackieTien97 <[email protected]> AuthorDate: Wed Jun 21 18:06:05 2023 +0800 Fix sonar bugs and code smells in confignode module about cq --- .../consensus/request/write/cq/ActiveCQPlan.java | 13 ++- .../consensus/request/write/cq/AddCQPlan.java | 13 ++- .../consensus/request/write/cq/DropCQPlan.java | 13 ++- .../consensus/request/write/cq/ShowCQPlan.java | 5 +- .../request/write/cq/UpdateCQLastExecTimePlan.java | 13 ++- .../iotdb/confignode/manager/cq/CQManager.java | 9 +- .../confignode/manager/cq/CQScheduleTask.java | 13 ++- .../iotdb/confignode/persistence/cq/CQInfo.java | 37 ++++-- .../persistence/executor/ConfigPlanExecutor.java | 3 +- .../procedure/impl/cq/CreateCQProcedure.java | 129 +++++++++++---------- .../procedure/state/cq/CreateCQState.java | 1 + 11 files changed, 152 insertions(+), 97 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java index 97dcfd1de4e..5346db2da1f 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.confignode.consensus.request.write.cq; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; @@ -71,9 +72,15 @@ public class ActiveCQPlan extends ConfigPhysicalPlan { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } ActiveCQPlan that = (ActiveCQPlan) o; return cqId.equals(that.cqId) && md5.equals(that.md5); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java index a2b01661d77..3cde53047c9 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.confignode.consensus.request.write.cq; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; @@ -82,9 +83,15 @@ public class AddCQPlan extends ConfigPhysicalPlan { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } AddCQPlan addCQPlan = (AddCQPlan) o; return firstExecutionTime == addCQPlan.firstExecutionTime && Objects.equals(req, addCQPlan.req) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java index 1e7a5a791c7..966b179bb1c 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.confignode.consensus.request.write.cq; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; @@ -79,9 +80,15 @@ public class DropCQPlan extends ConfigPhysicalPlan { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } DropCQPlan that = (DropCQPlan) o; return cqId.equals(that.cqId) && Objects.equals(md5, that.md5); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ShowCQPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ShowCQPlan.java index 558ae0e7efb..7c146238a5e 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ShowCQPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ShowCQPlan.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.confignode.consensus.request.write.cq; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; @@ -38,5 +39,7 @@ public class ShowCQPlan extends ConfigPhysicalPlan { } @Override - protected void deserializeImpl(ByteBuffer buffer) throws IOException {} + protected void deserializeImpl(ByteBuffer buffer) throws IOException { + // no customized field to deserialize from + } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java index 81630aaddc6..98dd9b35168 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.confignode.consensus.request.write.cq; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; @@ -81,9 +82,15 @@ public class UpdateCQLastExecTimePlan extends ConfigPhysicalPlan { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } UpdateCQLastExecTimePlan that = (UpdateCQLastExecTimePlan) o; return executionTime == that.executionTime && cqId.equals(that.cqId) && md5.equals(that.md5); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java index 2559b239ea0..e3c514ddd35 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.confignode.manager.cq; import org.apache.iotdb.common.rpc.thrift.TSStatus; @@ -123,7 +124,7 @@ public class CQManager { if (executor != null) { executor.shutdown(); } - } catch (Throwable t) { + } catch (Exception t) { // just print the error log because we should make sure we can start a new cq schedule pool // successfully in the next steps LOGGER.error("Error happened while shutting down previous cq schedule thread pool.", t); @@ -146,7 +147,7 @@ public class CQManager { } } // keep fetching until we get all CQEntries if this node is still leader - while (allCQs == null && configManager.getConsensusManager().isLeader()) { + while (needFetch(allCQs)) { ConsensusReadResponse response = configManager.getConsensusManager().read(new ShowCQPlan()); if (response.getDataset() != null) { allCQs = ((ShowCQResp) response.getDataset()).getCqList(); @@ -172,6 +173,10 @@ public class CQManager { } } + private boolean needFetch(List<CQInfo.CQEntry> allCQs) { + return allCQs == null && configManager.getConsensusManager().isLeader(); + } + public void stopCQScheduler() { ScheduledExecutorService previous; lock.writeLock().lock(); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java index b1f94f21a13..73de8bc54a6 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.confignode.manager.cq; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; @@ -103,6 +104,7 @@ public class CQScheduleTask implements Runnable { entry.getLastExecutionTime() + entry.getEveryInterval()); } + @SuppressWarnings("squid:S107") public CQScheduleTask( String cqId, long everyInterval, @@ -137,7 +139,6 @@ public class CQScheduleTask implements Runnable { } public static long getFirstExecutionTime(long boundaryTime, long everyInterval, long now) { - // TODO may need to consider nano precision if (now <= boundaryTime) { return boundaryTime; } else { @@ -172,7 +173,7 @@ public class CQScheduleTask implements Runnable { AsyncDataNodeInternalServiceClient client = AsyncDataNodeClientPool.getInstance().getAsyncClient(targetDataNode.get()); client.executeCQ(executeCQReq, new AsyncExecuteCQCallback(startTime, endTime)); - } catch (Throwable t) { + } catch (Exception t) { LOGGER.warn("Execute CQ {} failed", cqId, t); if (needSubmit()) { submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS); @@ -185,15 +186,15 @@ public class CQScheduleTask implements Runnable { submitSelf(Math.max(0, executionTime - System.currentTimeMillis()), TimeUnit.MILLISECONDS); } + private void submitSelf(long delay, TimeUnit unit) { + executor.schedule(this, delay, unit); + } + private boolean needSubmit() { // current node is still leader and thread pool is not shut down. return configManager.getConsensusManager().isLeader() && !executor.isShutdown(); } - private void submitSelf(long delay, TimeUnit unit) { - executor.schedule(this, delay, unit); - } - private class AsyncExecuteCQCallback implements AsyncMethodCallback<TSStatus> { private final long startTime; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java index 328eef6d25a..e4d22cfe880 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.confignode.persistence.cq; import org.apache.iotdb.common.rpc.thrift.TSStatus; @@ -25,7 +26,6 @@ import org.apache.iotdb.commons.snapshot.SnapshotProcessor; import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; -import org.apache.iotdb.confignode.consensus.request.write.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan; import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp; import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; @@ -59,6 +59,10 @@ public class CQInfo implements SnapshotProcessor { private static final String SNAPSHOT_FILENAME = "cq_info.snapshot"; + private static final String CQ_NOT_EXIST_FORMAT = "CQ %s doesn't exist."; + + private static final String MD5_NOT_MATCH_FORMAT = "MD5 of CQ %s doesn't match"; + private final Map<String, CQEntry> cqMap; private final ReadWriteLock lock; @@ -113,11 +117,11 @@ public class CQInfo implements SnapshotProcessor { CQEntry cqEntry = cqMap.get(cqId); if (cqEntry == null) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); - res.message = String.format("CQ %s doesn't exist.", cqId); + res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId); LOGGER.warn("Drop CQ {} failed, because it doesn't exist.", cqId); } else if ((md5.isPresent() && !md5.get().equals(cqEntry.md5))) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); - res.message = String.format("MD5 of CQ %s doesn't match", cqId); + res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId); LOGGER.warn("Drop CQ {} failed, because its MD5 doesn't match.", cqId); } else { cqMap.remove(cqId); @@ -130,7 +134,7 @@ public class CQInfo implements SnapshotProcessor { } } - public ShowCQResp showCQ(ShowCQPlan plan) { + public ShowCQResp showCQ() { lock.readLock().lock(); try { return new ShowCQResp( @@ -155,10 +159,10 @@ public class CQInfo implements SnapshotProcessor { CQEntry cqEntry = cqMap.get(cqId); if (cqEntry == null) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); - res.message = String.format("CQ %s doesn't exist.", cqId); + res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId); } else if (!md5.equals(cqEntry.md5)) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); - res.message = String.format("MD5 of CQ %s doesn't match", cqId); + res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId); } else if (cqEntry.state == CQState.ACTIVE) { res.code = TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode(); res.message = String.format("CQ %s has already been active", cqId); @@ -188,10 +192,10 @@ public class CQInfo implements SnapshotProcessor { CQEntry cqEntry = cqMap.get(cqId); if (cqEntry == null) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); - res.message = String.format("CQ %s doesn't exist.", cqId); + res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId); } else if (!md5.equals(cqEntry.md5)) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); - res.message = String.format("MD5 of CQ %s doesn't match", cqId); + res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId); } else if (cqEntry.lastExecutionTime >= plan.getExecutionTime()) { res.code = TSStatusCode.CQ_UPDATE_LAST_EXEC_TIME_ERROR.getStatusCode(); res.message = @@ -270,8 +274,12 @@ public class CQInfo implements SnapshotProcessor { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } CQInfo cqInfo = (CQInfo) o; return Objects.equals(cqMap, cqInfo.cqMap); } @@ -333,6 +341,7 @@ public class CQInfo implements SnapshotProcessor { other.lastExecutionTime); } + @SuppressWarnings("squid:S107") private CQEntry( String cqId, long everyInterval, @@ -462,8 +471,12 @@ public class CQInfo implements SnapshotProcessor { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } CQEntry cqEntry = (CQEntry) o; return everyInterval == cqEntry.everyInterval && boundaryTime == cqEntry.boundaryTime diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index a6e7710db84..58eb4ebc0bf 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -53,7 +53,6 @@ import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConf import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; -import org.apache.iotdb.confignode.consensus.request.write.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan; import org.apache.iotdb.confignode.consensus.request.write.database.AdjustMaxRegionGroupNumPlan; import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan; @@ -271,7 +270,7 @@ public class ConfigPlanExecutor { case GetSeriesSlotList: return partitionInfo.getSeriesSlotList((GetSeriesSlotListPlan) req); case SHOW_CQ: - return cqInfo.showCQ((ShowCQPlan) req); + return cqInfo.showCQ(); case GetFunctionTable: return udfInfo.getUDFTable(); case GetFunctionJar: diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java index f310d00c208..875db62e02d 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.confignode.procedure.impl.cq; import org.apache.iotdb.common.rpc.thrift.TSStatus; @@ -81,38 +82,11 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { @Override protected Flow executeFromState(ConfigNodeProcedureEnv env, CreateCQState state) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { - ConsensusWriteResponse response; - TSStatus res; + try { switch (state) { case INIT: - response = - env.getConfigManager() - .getConsensusManager() - .write(new AddCQPlan(req, md5, firstExecutionTime)); - res = response.getStatus(); - if (res != null) { - if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.debug("Finish init CQ {} successfully", req.cqId); - setNextState(INACTIVE); - } else if (res.code == TSStatusCode.CQ_AlREADY_EXIST.getStatusCode()) { - LOGGER.info("Failed to init CQ {} because such cq already exists", req.cqId); - setFailure(new ProcedureException(new IoTDBException(res.message, res.code))); - return Flow.HAS_MORE_STATE; - } else { - LOGGER.warn("Failed to init CQ {} because of unknown reasons {}", req.cqId, res); - setFailure(new ProcedureException(new IoTDBException(res.message, res.code))); - return Flow.HAS_MORE_STATE; - } - } else { - LOGGER.warn( - "Failed to init CQ {} because of unexpected exception: ", - req.cqId, - response.getException()); - setFailure(new ProcedureException(response.getException())); - return Flow.HAS_MORE_STATE; - } - break; + return addCQ(env); case INACTIVE: CQScheduleTask cqScheduleTask = new CQScheduleTask(req, firstExecutionTime, md5, executor, env.getConfigManager()); @@ -120,38 +94,12 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { setNextState(SCHEDULED); break; case SCHEDULED: - response = - env.getConfigManager().getConsensusManager().write(new ActiveCQPlan(req.cqId, md5)); - res = response.getStatus(); - if (res != null) { - if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOGGER.debug("Finish Scheduling CQ {} successfully", req.cqId); - } else if (res.code == TSStatusCode.NO_SUCH_CQ.getStatusCode()) { - LOGGER.warn( - "Failed to active CQ {} because of no such cq, detailed error message is {}", - req.cqId, - res.message); - } else if (res.code == TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode()) { - LOGGER.warn( - "Failed to active CQ {} because this cq has already been active", req.cqId); - } else { - LOGGER.warn( - "Failed to active CQ {} successfully because of unknown reasons {}", - req.cqId, - res); - } - } else { - LOGGER.warn( - "Failed to active CQ {} successfully because of unexpected exception: ", - req.cqId, - response.getException()); - } - + activeCQ(env); return Flow.NO_MORE_STATE; default: throw new IllegalArgumentException("Unknown CreateCQState: " + state); } - } catch (Throwable t) { + } catch (Exception t) { if (isRollbackSupported(state)) { LOGGER.error("Fail in CreateCQProcedure", t); setFailure(new ProcedureException(t)); @@ -169,6 +117,59 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { return Flow.HAS_MORE_STATE; } + private Flow addCQ(ConfigNodeProcedureEnv env) { + ConsensusWriteResponse response = + env.getConfigManager() + .getConsensusManager() + .write(new AddCQPlan(req, md5, firstExecutionTime)); + TSStatus res = response.getStatus(); + if (res != null) { + if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.debug("Finish init CQ {} successfully", req.cqId); + setNextState(INACTIVE); + } else if (res.code == TSStatusCode.CQ_AlREADY_EXIST.getStatusCode()) { + LOGGER.info("Failed to init CQ {} because such cq already exists", req.cqId); + setFailure(new ProcedureException(new IoTDBException(res.message, res.code))); + return Flow.HAS_MORE_STATE; + } else { + LOGGER.warn("Failed to init CQ {} because of unknown reasons {}", req.cqId, res); + setFailure(new ProcedureException(new IoTDBException(res.message, res.code))); + return Flow.HAS_MORE_STATE; + } + } else { + LOGGER.warn( + "Failed to init CQ {} because of unexpected exception: ", + req.cqId, + response.getException()); + setFailure(new ProcedureException(response.getException())); + return Flow.HAS_MORE_STATE; + } + return Flow.NO_MORE_STATE; + } + + private void activeCQ(ConfigNodeProcedureEnv env) { + ConsensusWriteResponse response = + env.getConfigManager().getConsensusManager().write(new ActiveCQPlan(req.cqId, md5)); + TSStatus res = response.getStatus(); + if (res != null) { + if (res.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOGGER.debug("Finish Scheduling CQ {} successfully", req.cqId); + } else if (res.code == TSStatusCode.NO_SUCH_CQ.getStatusCode()) { + LOGGER.warn("Failed to active CQ {} because of no such cq: {}", req.cqId, res.message); + } else if (res.code == TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode()) { + LOGGER.warn("Failed to active CQ {} because this cq has already been active", req.cqId); + } else { + LOGGER.warn( + "Failed to active CQ {} successfully because of unknown reasons {}", req.cqId, res); + } + } else { + LOGGER.warn( + "Failed to active CQ {} successfully because of unexpected exception: ", + req.cqId, + response.getException()); + } + } + @Override protected void rollbackState(ConfigNodeProcedureEnv env, CreateCQState state) throws IOException, InterruptedException, ProcedureException { @@ -187,18 +188,18 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { LOGGER.info("Finish [INACTIVE] rollback of CQ {} successfully", req.cqId); } else if (res.code == TSStatusCode.NO_SUCH_CQ.getStatusCode()) { LOGGER.warn( - "Failed to do [INACTIVE] rollback of CQ {} because of no such cq, detailed error message is {}", + "Failed to do [INACTIVE] rollback of CQ {} because of no such cq: {}", req.cqId, res.message); } else { LOGGER.warn( - "Failed to do [INACTIVE] rollback of CQ {} successfully because of unknown reasons {}", + "Failed to do [INACTIVE] rollback of CQ {} because of unknown reasons {}", req.cqId, res); } } else { LOGGER.warn( - "Failed to do [INACTIVE] rollback of CQ {} successfully because of unexpected exception: ", + "Failed to do [INACTIVE] rollback of CQ {} because of unexpected exception: ", req.cqId, response.getException()); } @@ -248,8 +249,12 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } CreateCQProcedure that = (CreateCQProcedure) o; return firstExecutionTime == that.firstExecutionTime && Objects.equals(req, that.req) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/cq/CreateCQState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/cq/CreateCQState.java index f4446fe4950..62f7e6ef9e9 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/cq/CreateCQState.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/cq/CreateCQState.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.confignode.procedure.state.cq; public enum CreateCQState {
