This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch re-re in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 58ac67975d07abe887ad84349fdb6a4ebe4b7a9b Author: Caideyipi <[email protected]> AuthorDate: Wed Jun 3 14:38:39 2026 +0800 Reapply "Fix CQ recovery gap and stale callback contamination (#17734)" (#17827) This reverts commit a4206813a0601c7249d2b50d7ecb246825ff6713. --- .../iotdb/confignode/i18n/ConfigNodeMessages.java | 4 +- .../iotdb/confignode/i18n/ConfigNodeMessages.java | 4 +- .../consensus/request/read/cq/ShowCQPlan.java | 13 +++ .../consensus/request/write/cq/ActiveCQPlan.java | 20 ++-- .../consensus/request/write/cq/AddCQPlan.java | 20 ++-- .../consensus/request/write/cq/DropCQPlan.java | 20 ++-- .../request/write/cq/UpdateCQLastExecTimePlan.java | 23 ++-- .../iotdb/confignode/manager/cq/CQManager.java | 102 +++++++++++++++++- .../confignode/manager/cq/CQScheduleTask.java | 58 ++++++++-- .../iotdb/confignode/persistence/cq/CQInfo.java | 76 +++++++------ .../persistence/executor/ConfigPlanExecutor.java | 3 +- .../procedure/impl/cq/CreateCQProcedure.java | 80 +++++++++++--- .../request/ConfigPhysicalPlanSerDeTest.java | 8 +- .../apache/iotdb/confignode/cq/CQManagerTest.java | 107 +++++++++++++++++++ .../iotdb/confignode/persistence/CQInfoTest.java | 64 ++++++++++- .../procedure/impl/CreateCQProcedureTest.java | 26 +++++ .../impl/cq/CreateCQProcedureRecoveryTest.java | 117 +++++++++++++++++++++ 17 files changed, 639 insertions(+), 106 deletions(-) diff --git a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java index c1c3e877de7..f662fa5871d 100644 --- a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java +++ b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java @@ -130,8 +130,8 @@ public final class ConfigNodeMessages { public static final String DOES_NOT_EXIST = "%s does not exist"; public static final String DROPPING_TAG_OR_TIME_COLUMN_IS_NOT_SUPPORTED = "Dropping tag or time column is not supported."; - public static final String DROP_CQ_FAILED_BECAUSE_ITS_MD5_DOESN_T_MATCH = - "Drop CQ {} failed, because its MD5 doesn't match."; + public static final String DROP_CQ_FAILED_BECAUSE_ITS_TOKEN_DOESN_T_MATCH = + "Drop CQ {} failed, because its token doesn't match."; public static final String DROP_CQ_FAILED_BECAUSE_IT_DOESN_T_EXIST = "Drop CQ {} failed, because it doesn't exist."; public static final String DROP_CQ_SUCCESSFULLY = "Drop CQ {} successfully."; diff --git a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java index 6ca847626c8..8bffa1b0831 100644 --- a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java +++ b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java @@ -126,8 +126,8 @@ public final class ConfigNodeMessages { "Deserialization error for write plan, request: {}, bytebuffer: {}"; public static final String DOES_NOT_EXIST = "%s does not exist"; public static final String DROPPING_TAG_OR_TIME_COLUMN_IS_NOT_SUPPORTED = "不支持删除标签列或时间列。"; - public static final String DROP_CQ_FAILED_BECAUSE_ITS_MD5_DOESN_T_MATCH = - "Drop CQ {} failed, because its MD5 doesn't match."; + public static final String DROP_CQ_FAILED_BECAUSE_ITS_TOKEN_DOESN_T_MATCH = + "Drop CQ {} failed, because its token doesn't match."; public static final String DROP_CQ_FAILED_BECAUSE_IT_DOESN_T_EXIST = "Drop CQ {} failed, because it doesn't exist."; public static final String DROP_CQ_SUCCESSFULLY = "Drop CQ {} successfully."; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java index 5217849deb4..c28838d556b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java @@ -21,11 +21,24 @@ package org.apache.iotdb.confignode.consensus.request.read.cq; import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan; +import java.util.Optional; + import static org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType.SHOW_CQ; public class ShowCQPlan extends ConfigPhysicalReadPlan { + private final String cqId; + public ShowCQPlan() { + this(null); + } + + public ShowCQPlan(String cqId) { super(SHOW_CQ); + this.cqId = cqId; + } + + public Optional<String> getCqId() { + return Optional.ofNullable(cqId); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java index 263aeb9f0d0..3faa1c2d62f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java @@ -35,39 +35,39 @@ public class ActiveCQPlan extends ConfigPhysicalPlan { private String cqId; - private String md5; + private String cqToken; public ActiveCQPlan() { super(ACTIVE_CQ); } - public ActiveCQPlan(String cqId, String md5) { + public ActiveCQPlan(String cqId, String cqToken) { super(ACTIVE_CQ); Validate.notNull(cqId); - Validate.notNull(md5); + Validate.notNull(cqToken); this.cqId = cqId; - this.md5 = md5; + this.cqToken = cqToken; } public String getCqId() { return cqId; } - public String getMd5() { - return md5; + public String getCqToken() { + return cqToken; } @Override protected void serializeImpl(DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); ReadWriteIOUtils.write(cqId, stream); - ReadWriteIOUtils.write(md5, stream); + ReadWriteIOUtils.write(cqToken, stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { cqId = ReadWriteIOUtils.readString(buffer); - md5 = ReadWriteIOUtils.readString(buffer); + cqToken = ReadWriteIOUtils.readString(buffer); } @Override @@ -82,11 +82,11 @@ public class ActiveCQPlan extends ConfigPhysicalPlan { return false; } ActiveCQPlan that = (ActiveCQPlan) o; - return cqId.equals(that.cqId) && md5.equals(that.md5); + return cqId.equals(that.cqId) && cqToken.equals(that.cqToken); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), cqId, md5); + return Objects.hash(super.hashCode(), cqId, cqToken); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java index 62f994688b3..471516c38e5 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java @@ -37,7 +37,7 @@ public class AddCQPlan extends ConfigPhysicalPlan { private TCreateCQReq req; - private String md5; + private String cqToken; private long firstExecutionTime; @@ -45,12 +45,12 @@ public class AddCQPlan extends ConfigPhysicalPlan { super(ADD_CQ); } - public AddCQPlan(TCreateCQReq req, String md5, long firstExecutionTime) { + public AddCQPlan(TCreateCQReq req, String cqToken, long firstExecutionTime) { super(ADD_CQ); Validate.notNull(req); - Validate.notNull(md5); + Validate.notNull(cqToken); this.req = req; - this.md5 = md5; + this.cqToken = cqToken; this.firstExecutionTime = firstExecutionTime; } @@ -58,8 +58,8 @@ public class AddCQPlan extends ConfigPhysicalPlan { return req; } - public String getMd5() { - return md5; + public String getCqToken() { + return cqToken; } public long getFirstExecutionTime() { @@ -70,14 +70,14 @@ public class AddCQPlan extends ConfigPhysicalPlan { protected void serializeImpl(DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); ThriftCommonsSerDeUtils.serializeTCreateCQReq(req, stream); - ReadWriteIOUtils.write(md5, stream); + ReadWriteIOUtils.write(cqToken, stream); ReadWriteIOUtils.write(firstExecutionTime, stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { req = ThriftCommonsSerDeUtils.deserializeTCreateCQReq(buffer); - md5 = ReadWriteIOUtils.readString(buffer); + cqToken = ReadWriteIOUtils.readString(buffer); firstExecutionTime = ReadWriteIOUtils.readLong(buffer); } @@ -95,11 +95,11 @@ public class AddCQPlan extends ConfigPhysicalPlan { AddCQPlan addCQPlan = (AddCQPlan) o; return firstExecutionTime == addCQPlan.firstExecutionTime && Objects.equals(req, addCQPlan.req) - && Objects.equals(md5, addCQPlan.md5); + && Objects.equals(cqToken, addCQPlan.cqToken); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), req, md5, firstExecutionTime); + return Objects.hash(super.hashCode(), req, cqToken, firstExecutionTime); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java index 108241b233d..69c29bff634 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java @@ -37,7 +37,7 @@ public class DropCQPlan extends ConfigPhysicalPlan { private String cqId; // may be null in user call of drop CQ - private String md5; + private String cqToken; public DropCQPlan() { super(DROP_CQ); @@ -49,33 +49,33 @@ public class DropCQPlan extends ConfigPhysicalPlan { this.cqId = cqId; } - public DropCQPlan(String cqId, String md5) { + public DropCQPlan(String cqId, String cqToken) { super(DROP_CQ); Validate.notNull(cqId); - Validate.notNull(md5); + Validate.notNull(cqToken); this.cqId = cqId; - this.md5 = md5; + this.cqToken = cqToken; } public String getCqId() { return cqId; } - public Optional<String> getMd5() { - return Optional.ofNullable(md5); + public Optional<String> getCqToken() { + return Optional.ofNullable(cqToken); } @Override protected void serializeImpl(DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); ReadWriteIOUtils.write(cqId, stream); - ReadWriteIOUtils.write(md5, stream); + ReadWriteIOUtils.write(cqToken, stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { cqId = ReadWriteIOUtils.readString(buffer); - md5 = ReadWriteIOUtils.readString(buffer); + cqToken = ReadWriteIOUtils.readString(buffer); } @Override @@ -90,11 +90,11 @@ public class DropCQPlan extends ConfigPhysicalPlan { return false; } DropCQPlan that = (DropCQPlan) o; - return cqId.equals(that.cqId) && Objects.equals(md5, that.md5); + return cqId.equals(that.cqId) && Objects.equals(cqToken, that.cqToken); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), cqId, md5); + return Objects.hash(super.hashCode(), cqId, cqToken); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java index 861a7d4f51b..a487ae648e3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java @@ -37,20 +37,19 @@ public class UpdateCQLastExecTimePlan extends ConfigPhysicalPlan { private long executionTime; - // may be null in user call of drop CQ - private String md5; + private String cqToken; public UpdateCQLastExecTimePlan() { super(UPDATE_CQ_LAST_EXEC_TIME); } - public UpdateCQLastExecTimePlan(String cqId, long executionTime, String md5) { + public UpdateCQLastExecTimePlan(String cqId, long executionTime, String cqToken) { super(UPDATE_CQ_LAST_EXEC_TIME); Validate.notNull(cqId); - Validate.notNull(md5); + Validate.notNull(cqToken); this.cqId = cqId; this.executionTime = executionTime; - this.md5 = md5; + this.cqToken = cqToken; } public String getCqId() { @@ -61,8 +60,8 @@ public class UpdateCQLastExecTimePlan extends ConfigPhysicalPlan { return executionTime; } - public String getMd5() { - return md5; + public String getCqToken() { + return cqToken; } @Override @@ -70,14 +69,14 @@ public class UpdateCQLastExecTimePlan extends ConfigPhysicalPlan { stream.writeShort(getType().getPlanType()); ReadWriteIOUtils.write(cqId, stream); ReadWriteIOUtils.write(executionTime, stream); - ReadWriteIOUtils.write(md5, stream); + ReadWriteIOUtils.write(cqToken, stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { cqId = ReadWriteIOUtils.readString(buffer); executionTime = ReadWriteIOUtils.readLong(buffer); - md5 = ReadWriteIOUtils.readString(buffer); + cqToken = ReadWriteIOUtils.readString(buffer); } @Override @@ -92,11 +91,13 @@ public class UpdateCQLastExecTimePlan extends ConfigPhysicalPlan { return false; } UpdateCQLastExecTimePlan that = (UpdateCQLastExecTimePlan) o; - return executionTime == that.executionTime && cqId.equals(that.cqId) && md5.equals(that.md5); + return executionTime == that.executionTime + && cqId.equals(that.cqId) + && cqToken.equals(that.cqToken); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), cqId, executionTime, md5); + return Objects.hash(super.hashCode(), cqId, executionTime, cqToken); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java index c4c1e8aede9..29837bc8aa2 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java @@ -43,7 +43,10 @@ import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -57,11 +60,15 @@ public class CQManager { private final ReadWriteLock lock; + // Key: CQ id. Value: the local task and the metadata token it owns. + private final ConcurrentMap<String, LocallyScheduledCQ> locallyScheduledCQs; + private ScheduledExecutorService executor; public CQManager(ConfigManager configManager) { this.configManager = configManager; this.lock = new ReentrantReadWriteLock(); + this.locallyScheduledCQs = new ConcurrentHashMap<>(); this.executor = IoTDBThreadPoolFactory.newScheduledThreadPool( CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName()); @@ -78,14 +85,21 @@ public class CQManager { } public TSStatus dropCQ(TDropCQReq req) { + lock.readLock().lock(); try { - return configManager.getConsensusManager().write(new DropCQPlan(req.cqId)); + TSStatus status = configManager.getConsensusManager().write(new DropCQPlan(req.cqId)); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + cancelLocallyScheduledCQ(req.cqId); + } + return status; } catch (ConsensusException e) { LOGGER.warn(ManagerMessages.UNEXPECTED_ERROR_HAPPENED_WHILE_DROPPING_CQ, req.cqId, e); // consensus layer related errors TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); res.setMessage(e.getMessage()); return res; + } finally { + lock.readLock().unlock(); } } @@ -118,6 +132,7 @@ public class CQManager { try { // 1. shutdown previous cq schedule thread pool try { + cancelAllLocallyScheduledCQs(); if (executor != null) { executor.shutdown(); } @@ -156,7 +171,15 @@ public class CQManager { for (CQInfo.CQEntry entry : allCQs) { if (entry.getState() == CQState.ACTIVE) { CQScheduleTask cqScheduleTask = new CQScheduleTask(entry, executor, configManager); - cqScheduleTask.submitSelf(); + if (!markCQLocallyScheduled(entry.getCqId(), entry.getCqToken(), cqScheduleTask)) { + continue; + } + try { + cqScheduleTask.submitSelf(); + } catch (RuntimeException e) { + unmarkCQLocallyScheduled(entry.getCqId(), entry.getCqToken()); + throw e; + } } } } @@ -176,6 +199,7 @@ public class CQManager { try { previous = executor; executor = null; + cancelAllLocallyScheduledCQs(); } finally { lock.writeLock().unlock(); } @@ -183,4 +207,78 @@ public class CQManager { previous.shutdown(); } } + + public boolean markCQLocallyScheduled(String cqId, String cqToken, CQScheduleTask task) { + AtomicBoolean shouldSchedule = new AtomicBoolean(false); + LocallyScheduledCQ schedule = new LocallyScheduledCQ(cqToken, task); + lock.readLock().lock(); + try { + locallyScheduledCQs.compute( + cqId, + (ignored, previousSchedule) -> { + if (previousSchedule != null && previousSchedule.hasToken(cqToken)) { + return previousSchedule; + } + if (previousSchedule != null) { + previousSchedule.cancel(); + } + shouldSchedule.set(true); + return schedule; + }); + if (!shouldSchedule.get()) { + task.cancel(); + } + return shouldSchedule.get(); + } finally { + lock.readLock().unlock(); + } + } + + public void unmarkCQLocallyScheduled(String cqId, String cqToken) { + lock.readLock().lock(); + try { + locallyScheduledCQs.computeIfPresent( + cqId, + (ignored, schedule) -> { + if (schedule.hasToken(cqToken)) { + schedule.cancel(); + return null; + } + return schedule; + }); + } finally { + lock.readLock().unlock(); + } + } + + private void cancelLocallyScheduledCQ(String cqId) { + LocallyScheduledCQ schedule = locallyScheduledCQs.remove(cqId); + if (schedule != null) { + schedule.cancel(); + } + } + + private void cancelAllLocallyScheduledCQs() { + locallyScheduledCQs.values().forEach(LocallyScheduledCQ::cancel); + locallyScheduledCQs.clear(); + } + + private static class LocallyScheduledCQ { + + private final String cqToken; + private final CQScheduleTask task; + + private LocallyScheduledCQ(String cqToken, CQScheduleTask task) { + this.cqToken = cqToken; + this.task = task; + } + + private boolean hasToken(String cqToken) { + return this.cqToken.equals(cqToken); + } + + private void cancel() { + task.cancel(); + } + } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java index c58f5ade9bc..6b73ffca95f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java @@ -40,7 +40,10 @@ import org.slf4j.LoggerFactory; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; public class CQScheduleTask implements Runnable { @@ -70,7 +73,7 @@ public class CQScheduleTask implements Runnable { private final long endTimeOffset; private final TimeoutPolicy timeoutPolicy; private final String queryBody; - private final String md5; + private final String cqToken; private final String zoneId; @@ -82,12 +85,15 @@ public class CQScheduleTask implements Runnable { private final long retryWaitTimeInMS; + private final AtomicBoolean cancelled; + private final AtomicReference<ScheduledFuture<?>> scheduledFuture; + private long executionTime; public CQScheduleTask( TCreateCQReq req, long firstExecutionTime, - String md5, + String cqToken, ScheduledExecutorService executor, ConfigManager configManager) { this( @@ -97,7 +103,7 @@ public class CQScheduleTask implements Runnable { req.endTimeOffset, TimeoutPolicy.deserialize(req.timeoutPolicy), req.queryBody, - md5, + cqToken, req.zoneId, req.username, executor, @@ -114,7 +120,7 @@ public class CQScheduleTask implements Runnable { entry.getEndTimeOffset(), entry.getTimeoutPolicy(), entry.getQueryBody(), - entry.getMd5(), + entry.getCqToken(), entry.getZoneId(), entry.getUsername(), executor, @@ -130,7 +136,7 @@ public class CQScheduleTask implements Runnable { long endTimeOffset, TimeoutPolicy timeoutPolicy, String queryBody, - String md5, + String cqToken, String zoneId, String username, ScheduledExecutorService executor, @@ -142,12 +148,14 @@ public class CQScheduleTask implements Runnable { this.endTimeOffset = endTimeOffset; this.timeoutPolicy = timeoutPolicy; this.queryBody = queryBody; - this.md5 = md5; + this.cqToken = cqToken; this.zoneId = zoneId; this.username = username; this.executor = executor; this.configManager = configManager; this.retryWaitTimeInMS = Math.min(DEFAULT_RETRY_WAIT_TIME_IN_MS, everyInterval / FACTOR); + this.cancelled = new AtomicBoolean(false); + this.scheduledFuture = new AtomicReference<>(); this.executionTime = executionTime; } @@ -166,6 +174,9 @@ public class CQScheduleTask implements Runnable { @Override public void run() { + if (cancelled.get()) { + return; + } long startTime = executionTime - startTimeOffset; long endTime = executionTime - endTimeOffset; @@ -178,6 +189,9 @@ public class CQScheduleTask implements Runnable { submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS); } } else { + if (cancelled.get()) { + return; + } LOGGER.info( ManagerMessages.STARTEXECUTECQ_EXECUTE_CQ_ON_DATANODE_TIME_RANGE_IS_CURRENT_TIME, cqId, @@ -207,12 +221,32 @@ public class CQScheduleTask implements Runnable { } private void submitSelf(long delay, TimeUnit unit) { - executor.schedule(this, delay, unit); + if (cancelled.get()) { + return; + } + ScheduledFuture<?> newFuture = executor.schedule(this, delay, unit); + ScheduledFuture<?> previousFuture = scheduledFuture.getAndSet(newFuture); + if (previousFuture != null) { + previousFuture.cancel(false); + } + if (cancelled.get() && scheduledFuture.compareAndSet(newFuture, null)) { + newFuture.cancel(false); + } + } + + public void cancel() { + cancelled.set(true); + ScheduledFuture<?> currentFuture = scheduledFuture.getAndSet(null); + if (currentFuture != null) { + currentFuture.cancel(false); + } } private boolean needSubmit() { // current node is still leader and thread pool is not shut down. - return configManager.getConsensusManager().isLeader() && !executor.isShutdown(); + return !cancelled.get() + && configManager.getConsensusManager().isLeader() + && !executor.isShutdown(); } private class AsyncExecuteCQCallback implements AsyncMethodCallback<TSStatus> { @@ -239,6 +273,9 @@ public class CQScheduleTask implements Runnable { @Override public void onComplete(TSStatus response) { + if (cancelled.get()) { + return; + } if (response.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.info( @@ -252,7 +289,7 @@ public class CQScheduleTask implements Runnable { result = configManager .getConsensusManager() - .write(new UpdateCQLastExecTimePlan(cqId, executionTime, md5)); + .write(new UpdateCQLastExecTimePlan(cqId, executionTime, cqToken)); } catch (ConsensusException e) { result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); result.setMessage(e.getMessage()); @@ -291,6 +328,9 @@ public class CQScheduleTask implements Runnable { @Override public void onError(Exception exception) { + if (cancelled.get()) { + return; + } LOGGER.warn(ManagerMessages.EXECUTE_CQ_FAILED, cqId, exception); if (needSubmit()) { submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java index 9c99cfbb0e8..013e2415f94 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.cq.CQState; import org.apache.iotdb.commons.cq.TimeoutPolicy; import org.apache.iotdb.commons.snapshot.SnapshotProcessor; +import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; @@ -45,7 +46,9 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -62,7 +65,7 @@ public class CQInfo implements SnapshotProcessor { private static final String CQ_NOT_EXIST_FORMAT = "CQ %s doesn't exist."; - private static final String MD5_NOT_MATCH_FORMAT = "MD5 of CQ %s doesn't match"; + private static final String CQ_TOKEN_NOT_MATCH_FORMAT = "Token of CQ %s doesn't match"; private final Map<String, CQEntry> cqMap; @@ -92,7 +95,7 @@ public class CQInfo implements SnapshotProcessor { CQEntry cqEntry = new CQEntry( plan.getReq(), - plan.getMd5(), + plan.getCqToken(), plan.getFirstExecutionTime() - plan.getReq().everyInterval); cqMap.put(cqId, cqEntry); res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode(); @@ -106,13 +109,13 @@ public class CQInfo implements SnapshotProcessor { /** * Drop the CQ whose ID is same as <tt>cqId</tt> in plan. * - * @return SUCCESS_STATUS if there is CQ whose ID and md5 is same as <tt>cqId</tt> in plan, + * @return SUCCESS_STATUS if there is CQ whose ID and token is same as <tt>cqId</tt> in plan, * otherwise NO_SUCH_CQ. */ public TSStatus dropCQ(DropCQPlan plan) { TSStatus res = new TSStatus(); String cqId = plan.getCqId(); - Optional<String> md5 = plan.getMd5(); + Optional<String> cqToken = plan.getCqToken(); lock.writeLock().lock(); try { CQEntry cqEntry = cqMap.get(cqId); @@ -120,10 +123,10 @@ public class CQInfo implements SnapshotProcessor { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId); LOGGER.warn(ConfigNodeMessages.DROP_CQ_FAILED_BECAUSE_IT_DOESN_T_EXIST, cqId); - } else if ((md5.isPresent() && !md5.get().equals(cqEntry.md5))) { + } else if ((cqToken.isPresent() && !cqToken.get().equals(cqEntry.cqToken))) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); - res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId); - LOGGER.warn(ConfigNodeMessages.DROP_CQ_FAILED_BECAUSE_ITS_MD5_DOESN_T_MATCH, cqId); + res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId); + LOGGER.warn(ConfigNodeMessages.DROP_CQ_FAILED_BECAUSE_ITS_TOKEN_DOESN_T_MATCH, cqId); } else { cqMap.remove(cqId); res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode(); @@ -136,11 +139,24 @@ public class CQInfo implements SnapshotProcessor { } public ShowCQResp showCQ() { + return showCQ(new ShowCQPlan()); + } + + public ShowCQResp showCQ(ShowCQPlan plan) { lock.readLock().lock(); try { - return new ShowCQResp( - new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), - cqMap.values().stream().map(CQEntry::new).collect(Collectors.toList())); + Optional<String> cqId = plan.getCqId(); + List<CQEntry> cqList; + if (cqId.isPresent()) { + CQEntry cqEntry = cqMap.get(cqId.get()); + cqList = + cqEntry == null + ? Collections.emptyList() + : Collections.singletonList(new CQEntry(cqEntry)); + } else { + cqList = cqMap.values().stream().map(CQEntry::new).collect(Collectors.toList()); + } + return new ShowCQResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), cqList); } finally { lock.readLock().unlock(); } @@ -154,16 +170,16 @@ public class CQInfo implements SnapshotProcessor { public TSStatus activeCQ(ActiveCQPlan plan) { TSStatus res = new TSStatus(); String cqId = plan.getCqId(); - String md5 = plan.getMd5(); + String cqToken = plan.getCqToken(); lock.writeLock().lock(); try { CQEntry cqEntry = cqMap.get(cqId); if (cqEntry == null) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId); - } else if (!md5.equals(cqEntry.md5)) { + } else if (!cqToken.equals(cqEntry.cqToken)) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); - res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId); + res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId); } else if (cqEntry.state == CQState.ACTIVE) { res.code = TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode(); res.message = String.format("CQ %s has already been active", cqId); @@ -181,22 +197,22 @@ public class CQInfo implements SnapshotProcessor { * Update the last execution time of the corresponding CQ. * * @return SUCCESS_STATUS if successfully updated, or NO_SUCH_CQ if 1. the CQ doesn't exist; or 2. - * md5 is different. or CQ_UPDATE_LAST_EXEC_TIME_FAILED 3. original lastExecutionTime >= + * token is different. or CQ_UPDATE_LAST_EXEC_TIME_FAILED 3. original lastExecutionTime >= * current lastExecutionTime; */ public TSStatus updateCQLastExecutionTime(UpdateCQLastExecTimePlan plan) { TSStatus res = new TSStatus(); String cqId = plan.getCqId(); - String md5 = plan.getMd5(); + String cqToken = plan.getCqToken(); lock.writeLock().lock(); try { CQEntry cqEntry = cqMap.get(cqId); if (cqEntry == null) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId); - } else if (!md5.equals(cqEntry.md5)) { + } else if (!cqToken.equals(cqEntry.cqToken)) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); - res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId); + res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId); } else if (cqEntry.lastExecutionTime >= plan.getExecutionTime()) { res.code = TSStatusCode.CQ_UPDATE_LAST_EXEC_TIME_ERROR.getStatusCode(); res.message = @@ -300,7 +316,7 @@ public class CQInfo implements SnapshotProcessor { private final TimeoutPolicy timeoutPolicy; private final String queryBody; private final String sql; - private final String md5; + private final String cqToken; private final String zoneId; @@ -309,7 +325,7 @@ public class CQInfo implements SnapshotProcessor { private CQState state; private long lastExecutionTime; - private CQEntry(TCreateCQReq req, String md5, long lastExecutionTime) { + private CQEntry(TCreateCQReq req, String cqToken, long lastExecutionTime) { this( req.cqId, req.everyInterval, @@ -319,7 +335,7 @@ public class CQInfo implements SnapshotProcessor { TimeoutPolicy.deserialize(req.timeoutPolicy), req.queryBody, req.sql, - md5, + cqToken, req.zoneId, req.username, CQState.INACTIVE, @@ -336,7 +352,7 @@ public class CQInfo implements SnapshotProcessor { other.timeoutPolicy, other.queryBody, other.sql, - other.md5, + other.cqToken, other.zoneId, other.username, other.state, @@ -353,7 +369,7 @@ public class CQInfo implements SnapshotProcessor { TimeoutPolicy timeoutPolicy, String queryBody, String sql, - String md5, + String cqToken, String zoneId, String username, CQState state, @@ -366,7 +382,7 @@ public class CQInfo implements SnapshotProcessor { this.timeoutPolicy = timeoutPolicy; this.queryBody = queryBody; this.sql = sql; - this.md5 = md5; + this.cqToken = cqToken; this.zoneId = zoneId; this.username = username; this.state = state; @@ -382,7 +398,7 @@ public class CQInfo implements SnapshotProcessor { ReadWriteIOUtils.write(timeoutPolicy.getType(), stream); ReadWriteIOUtils.write(queryBody, stream); ReadWriteIOUtils.write(sql, stream); - ReadWriteIOUtils.write(md5, stream); + ReadWriteIOUtils.write(cqToken, stream); ReadWriteIOUtils.write(zoneId, stream); ReadWriteIOUtils.write(username, stream); ReadWriteIOUtils.write(state.getType(), stream); @@ -398,7 +414,7 @@ public class CQInfo implements SnapshotProcessor { TimeoutPolicy timeoutPolicy = TimeoutPolicy.deserialize(ReadWriteIOUtils.readByte(stream)); String queryBody = ReadWriteIOUtils.readString(stream); String sql = ReadWriteIOUtils.readString(stream); - String md5 = ReadWriteIOUtils.readString(stream); + String cqToken = ReadWriteIOUtils.readString(stream); String zoneId = ReadWriteIOUtils.readString(stream); String username = ReadWriteIOUtils.readString(stream); CQState state = CQState.deserialize(ReadWriteIOUtils.readByte(stream)); @@ -412,7 +428,7 @@ public class CQInfo implements SnapshotProcessor { timeoutPolicy, queryBody, sql, - md5, + cqToken, zoneId, username, state, @@ -451,8 +467,8 @@ public class CQInfo implements SnapshotProcessor { return sql; } - public String getMd5() { - return md5; + public String getCqToken() { + return cqToken; } public CQState getState() { @@ -489,7 +505,7 @@ public class CQInfo implements SnapshotProcessor { && timeoutPolicy == cqEntry.timeoutPolicy && Objects.equals(queryBody, cqEntry.queryBody) && Objects.equals(sql, cqEntry.sql) - && Objects.equals(md5, cqEntry.md5) + && Objects.equals(cqToken, cqEntry.cqToken) && Objects.equals(zoneId, cqEntry.zoneId) && Objects.equals(username, cqEntry.username) && state == cqEntry.state; @@ -506,7 +522,7 @@ public class CQInfo implements SnapshotProcessor { timeoutPolicy, queryBody, sql, - md5, + cqToken, zoneId, username, state, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index eb8d5e5538b..96dcdea4648 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -30,6 +30,7 @@ import org.apache.iotdb.commons.snapshot.SnapshotProcessor; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan; import org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan; +import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan; import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan; import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan; @@ -361,7 +362,7 @@ public class ConfigPlanExecutor { case GetRegionGroupsByTime: return partitionInfo.getRegionGroupsByTime((GetRegionGroupsByTimePlan) req); case SHOW_CQ: - return cqInfo.showCQ(); + return cqInfo.showCQ((ShowCQPlan) req); case ShowExternalService: return externalServiceInfo.showService(((ShowExternalServicePlan) req).getDataNodeIds()); case GetFunctionTable: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java index ac964d23ca3..490f723d2e6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java @@ -22,11 +22,15 @@ package org.apache.iotdb.confignode.procedure.impl.cq; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; +import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; +import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp; import org.apache.iotdb.confignode.i18n.ProcedureMessages; +import org.apache.iotdb.confignode.manager.cq.CQManager; import org.apache.iotdb.confignode.manager.cq.CQScheduleTask; +import org.apache.iotdb.confignode.persistence.cq.CQInfo; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure; @@ -36,7 +40,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.tsfile.external.commons.codec.digest.DigestUtils; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +48,8 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Objects; +import java.util.Optional; +import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import static org.apache.iotdb.confignode.procedure.state.cq.CreateCQState.INACTIVE; @@ -60,7 +65,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { private TCreateCQReq req; - private String md5; + private String cqToken; private long firstExecutionTime; @@ -75,7 +80,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { public CreateCQProcedure(TCreateCQReq req, ScheduledExecutorService executor) { super(); this.req = req; - this.md5 = DigestUtils.md2Hex(req.cqId); + this.cqToken = generateCQToken(); this.executor = executor; this.firstExecutionTime = CQScheduleTask.getFirstExecutionTime(req.boundaryTime, req.everyInterval); @@ -91,12 +96,16 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { addCQ(env); return Flow.HAS_MORE_STATE; case INACTIVE: - CQScheduleTask cqScheduleTask = - new CQScheduleTask(req, firstExecutionTime, md5, executor, env.getConfigManager()); - cqScheduleTask.submitSelf(); + submitScheduleTask( + env, + new CQScheduleTask( + req, firstExecutionTime, cqToken, executor, env.getConfigManager())); setNextState(SCHEDULED); break; case SCHEDULED: + if (isStateDeserialized()) { + recoverScheduledTask(env); + } activeCQ(env); return Flow.NO_MORE_STATE; default: @@ -126,7 +135,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { res = env.getConfigManager() .getConsensusManager() - .write(new AddCQPlan(req, md5, firstExecutionTime)); + .write(new AddCQPlan(req, cqToken, firstExecutionTime)); } catch (ConsensusException e) { LOGGER.warn(CONSENSUS_WRITE_ERROR, e); res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); @@ -147,7 +156,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { private void activeCQ(ConfigNodeProcedureEnv env) { TSStatus res; try { - res = env.getConfigManager().getConsensusManager().write(new ActiveCQPlan(req.cqId, md5)); + res = env.getConfigManager().getConsensusManager().write(new ActiveCQPlan(req.cqId, cqToken)); } catch (ConsensusException e) { LOGGER.warn(CONSENSUS_WRITE_ERROR, e); res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); @@ -168,6 +177,42 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { } } + void recoverScheduledTask(ConfigNodeProcedureEnv env) throws ConsensusException { + Optional<CQInfo.CQEntry> cqEntry = getCurrentCQEntry(env); + if (!cqEntry.isPresent()) { + LOGGER.info( + "Skip recovering the schedule task of CQ {} because its metadata is unavailable.", + req.cqId); + return; + } + submitScheduleTask(env, new CQScheduleTask(cqEntry.get(), executor, env.getConfigManager())); + } + + Optional<CQInfo.CQEntry> getCurrentCQEntry(ConfigNodeProcedureEnv env) throws ConsensusException { + ShowCQResp response = + (ShowCQResp) env.getConfigManager().getConsensusManager().read(new ShowCQPlan(req.cqId)); + return response.getCqList().stream() + .filter(entry -> cqToken.equals(entry.getCqToken())) + .findFirst(); + } + + private static String generateCQToken() { + return UUID.randomUUID().toString(); + } + + private void submitScheduleTask(ConfigNodeProcedureEnv env, CQScheduleTask cqScheduleTask) { + CQManager cqManager = env.getConfigManager().getCQManager(); + if (!cqManager.markCQLocallyScheduled(req.cqId, cqToken, cqScheduleTask)) { + return; + } + try { + cqScheduleTask.submitSelf(); + } catch (RuntimeException e) { + cqManager.unmarkCQLocallyScheduled(req.cqId, cqToken); + throw e; + } + } + @Override protected void rollbackState(ConfigNodeProcedureEnv env, CreateCQState state) throws IOException, InterruptedException, ProcedureException { @@ -180,7 +225,8 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { LOGGER.info(ProcedureMessages.START_INACTIVE_ROLLBACK_OF_CQ, req.cqId); TSStatus res; try { - res = env.getConfigManager().getConsensusManager().write(new DropCQPlan(req.cqId, md5)); + res = + env.getConfigManager().getConsensusManager().write(new DropCQPlan(req.cqId, cqToken)); } catch (ConsensusException e) { LOGGER.warn(CONSENSUS_WRITE_ERROR, e); res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); @@ -231,7 +277,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { stream.writeShort(ProcedureType.CREATE_CQ_PROCEDURE.getTypeCode()); super.serialize(stream); ThriftCommonsSerDeUtils.serializeTCreateCQReq(req, stream); - ReadWriteIOUtils.write(md5, stream); + ReadWriteIOUtils.write(cqToken, stream); ReadWriteIOUtils.write(firstExecutionTime, stream); } @@ -239,7 +285,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { public void deserialize(ByteBuffer byteBuffer) { super.deserialize(byteBuffer); this.req = ThriftCommonsSerDeUtils.deserializeTCreateCQReq(byteBuffer); - this.md5 = ReadWriteIOUtils.readString(byteBuffer); + this.cqToken = ReadWriteIOUtils.readString(byteBuffer); this.firstExecutionTime = ReadWriteIOUtils.readLong(byteBuffer); } @@ -258,7 +304,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { && isGeneratedByPipe == that.isGeneratedByPipe && firstExecutionTime == that.firstExecutionTime && Objects.equals(req, that.req) - && Objects.equals(md5, that.md5); + && Objects.equals(cqToken, that.cqToken); } @Override @@ -269,7 +315,15 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { getCycles(), isGeneratedByPipe, req, - md5, + cqToken, firstExecutionTime); } + + public String getCqId() { + return req == null ? null : req.getCqId(); + } + + public String getCqToken() { + return cqToken; + } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java index d6a3d7e3fd7..ea35be6c5d7 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java @@ -1674,7 +1674,7 @@ public class ConfigPhysicalPlanSerDeTest { @Test public void ActiveCQPlanTest() throws IOException { - ActiveCQPlan activeCQPlan0 = new ActiveCQPlan("testCq", "testCq_md5"); + ActiveCQPlan activeCQPlan0 = new ActiveCQPlan("testCq", "testCqToken"); ActiveCQPlan activeCQPlan1 = (ActiveCQPlan) ConfigPhysicalPlan.Factory.create(activeCQPlan0.serializeToByteBuffer()); @@ -1697,7 +1697,7 @@ public class ConfigPhysicalPlanSerDeTest { "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END", "Asia", "root"), - "testCq1_md5", + "testCq1Token", executionTime); AddCQPlan addCQPlan1 = (AddCQPlan) ConfigPhysicalPlan.Factory.create(addCQPlan0.serializeToByteBuffer()); @@ -1712,7 +1712,7 @@ public class ConfigPhysicalPlanSerDeTest { (DropCQPlan) ConfigPhysicalPlan.Factory.create(dropCQPlan0.serializeToByteBuffer()); Assert.assertEquals(dropCQPlan0, dropCQPlan1); - dropCQPlan0 = new DropCQPlan("testCq1", "testCq1_md5"); + dropCQPlan0 = new DropCQPlan("testCq1", "testCq1Token"); dropCQPlan1 = (DropCQPlan) ConfigPhysicalPlan.Factory.create(dropCQPlan0.serializeToByteBuffer()); Assert.assertEquals(dropCQPlan0, dropCQPlan1); @@ -1721,7 +1721,7 @@ public class ConfigPhysicalPlanSerDeTest { @Test public void UpdateCQLastExecTimePlanTest() throws IOException { UpdateCQLastExecTimePlan updateCQLastExecTimePlan0 = - new UpdateCQLastExecTimePlan("testCq", System.currentTimeMillis(), "testCq_md5"); + new UpdateCQLastExecTimePlan("testCq", System.currentTimeMillis(), "testCqToken"); UpdateCQLastExecTimePlan updateCQLastExecTimePlan1 = (UpdateCQLastExecTimePlan) ConfigPhysicalPlan.Factory.create(updateCQLastExecTimePlan0.serializeToByteBuffer()); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java new file mode 100644 index 00000000000..a0bc5a523ba --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.confignode.cq; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.cq.TimeoutPolicy; +import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; +import org.apache.iotdb.confignode.manager.cq.CQManager; +import org.apache.iotdb.confignode.manager.cq.CQScheduleTask; +import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertTrue; + +public class CQManagerTest { + + @SuppressWarnings("unchecked") + @Test + public void dropCQShouldCancelLocallyScheduledTask() throws Exception { + ConfigManager configManager = Mockito.mock(ConfigManager.class); + ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); + Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); + Mockito.when(consensusManager.write(Mockito.any())) + .thenReturn(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + CQManager cqManager = new CQManager(configManager); + ScheduledFuture<?> future = Mockito.mock(ScheduledFuture.class); + CQScheduleTask task = newScheduledTask(configManager, future, "token"); + + try { + assertTrue(cqManager.markCQLocallyScheduled("testCq", "token", task)); + task.submitSelf(); + cqManager.dropCQ(new TDropCQReq("testCq")); + + Mockito.verify(future).cancel(false); + } finally { + cqManager.stopCQScheduler(); + } + } + + @SuppressWarnings("unchecked") + @Test + public void newTokenShouldCancelPreviousLocallyScheduledTask() { + ConfigManager configManager = Mockito.mock(ConfigManager.class); + CQManager cqManager = new CQManager(configManager); + ScheduledFuture<?> previousFuture = Mockito.mock(ScheduledFuture.class); + CQScheduleTask previousTask = newScheduledTask(configManager, previousFuture, "previousToken"); + ScheduledFuture<?> currentFuture = Mockito.mock(ScheduledFuture.class); + CQScheduleTask currentTask = newScheduledTask(configManager, currentFuture, "currentToken"); + + try { + assertTrue(cqManager.markCQLocallyScheduled("testCq", "previousToken", previousTask)); + previousTask.submitSelf(); + assertTrue(cqManager.markCQLocallyScheduled("testCq", "currentToken", currentTask)); + + Mockito.verify(previousFuture).cancel(false); + } finally { + cqManager.stopCQScheduler(); + } + } + + @SuppressWarnings("unchecked") + private CQScheduleTask newScheduledTask( + ConfigManager configManager, ScheduledFuture<?> scheduledFuture, String cqToken) { + ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); + Mockito.when( + executor.schedule( + Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class))) + .thenReturn((ScheduledFuture) scheduledFuture); + return new CQScheduleTask( + "testCq", + 1000, + 0, + 1000, + TimeoutPolicy.BLOCKED, + "select s1 into root.backup.d1.s1 from root.sg.d1", + cqToken, + "Asia", + "root", + executor, + configManager, + System.currentTimeMillis() + 10_000); + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java index 4b409d6cf0c..64bbd69c5b6 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java @@ -18,9 +18,14 @@ */ package org.apache.iotdb.confignode.persistence; +import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; +import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; +import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan; +import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp; import org.apache.iotdb.confignode.persistence.cq.CQInfo; import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; import org.apache.tsfile.external.commons.io.FileUtils; @@ -70,7 +75,7 @@ public class CQInfoTest { "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END", "Asia", "root"), - "testCq1_md5", + "testCq1Token", executionTime); cqInfo.addCQ(addCQPlan); @@ -89,7 +94,7 @@ public class CQInfoTest { "create cq testCq2 BEGIN select s1 into root.backup.d2.s1 from root.sg.d2 END", "Asia", "root"), - "testCq2_md5", + "testCq2Token", executionTime); cqInfo.addCQ(addCQPlan); @@ -99,4 +104,59 @@ public class CQInfoTest { Assert.assertEquals(cqInfo, actualCQInfo); } + + @Test + public void testOldCallbackCannotTouchRecreatedCQ() throws Exception { + long executionTime = System.currentTimeMillis(); + TCreateCQReq req = + new TCreateCQReq( + "testCq3", + 1000, + 0, + 1000, + 0, + (byte) 0, + "select s1 into root.backup.d3.s1 from root.sg.d3", + "create cq testCq3 BEGIN select s1 into root.backup.d3.s1 from root.sg.d3 END", + "Asia", + "root"); + + cqInfo.addCQ(new AddCQPlan(req, "oldToken", executionTime)); + cqInfo.dropCQ(new DropCQPlan("testCq3")); + cqInfo.addCQ(new AddCQPlan(req, "newToken", executionTime)); + + Assert.assertEquals( + TSStatusCode.NO_SUCH_CQ.getStatusCode(), + cqInfo.updateCQLastExecutionTime( + new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, "oldToken")) + .code); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + cqInfo.updateCQLastExecutionTime( + new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, "newToken")) + .code); + } + + @Test + public void testShowCQCanFilterByCQId() throws Exception { + long executionTime = System.currentTimeMillis(); + TCreateCQReq req = + new TCreateCQReq( + "testCq4", + 1000, + 0, + 1000, + 0, + (byte) 0, + "select s1 into root.backup.d4.s1 from root.sg.d4", + "create cq testCq4 BEGIN select s1 into root.backup.d4.s1 from root.sg.d4 END", + "Asia", + "root"); + cqInfo.addCQ(new AddCQPlan(req, "testCq4Token", executionTime)); + + ShowCQResp showCQResp = cqInfo.showCQ(new ShowCQPlan("testCq4")); + + Assert.assertEquals(1, showCQResp.getCqList().size()); + Assert.assertEquals("testCq4", showCQResp.getCqList().get(0).getCqId()); + } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java index d0e92b32816..3e7fd2052ad 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java @@ -36,10 +36,36 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.fail; public class CreateCQProcedureTest { + @Test + public void tokenShouldBeUniqueForSameCQId() { + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + try { + TCreateCQReq req = + new TCreateCQReq( + "testCq1", + 1000, + 0, + 1000, + 0, + (byte) 0, + "select s1 into root.backup.d1(s1) from root.sg.d1", + "create cq testCq1 BEGIN select s1 into root.backup.d1(s1) from root.sg.d1 END", + "Asia", + "root"); + CreateCQProcedure createCQProcedure1 = new CreateCQProcedure(req, executor); + CreateCQProcedure createCQProcedure2 = new CreateCQProcedure(req, executor); + + assertNotEquals(createCQProcedure1.getCqToken(), createCQProcedure2.getCqToken()); + } finally { + executor.shutdown(); + } + } + @Test public void serializeDeserializeTest() { diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java new file mode 100644 index 00000000000..a90e282494f --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.cq; + +import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; +import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; +import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; +import org.apache.iotdb.confignode.manager.cq.CQManager; +import org.apache.iotdb.confignode.manager.cq.CQScheduleTask; +import org.apache.iotdb.confignode.persistence.cq.CQInfo; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; + +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class CreateCQProcedureRecoveryTest { + + private TCreateCQReq newCreateCQReq() { + return new TCreateCQReq( + "testCq1", + 1000, + 0, + 1000, + 0, + (byte) 0, + "select s1 into root.backup.d1.s1 from root.sg.d1", + "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END", + "Asia", + "root"); + } + + @SuppressWarnings("unchecked") + @Test + public void recoverScheduledTaskShouldResubmitFromLatestMetadata() throws Exception { + ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); + Mockito.when( + executor.schedule( + Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class))) + .thenReturn(Mockito.mock(ScheduledFuture.class)); + + ConfigManager configManager = Mockito.mock(ConfigManager.class); + ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); + CQManager cqManager = Mockito.mock(CQManager.class); + ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); + Mockito.when(env.getConfigManager()).thenReturn(configManager); + Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); + Mockito.when(configManager.getCQManager()).thenReturn(cqManager); + Mockito.when( + cqManager.markCQLocallyScheduled( + Mockito.anyString(), Mockito.anyString(), Mockito.any(CQScheduleTask.class))) + .thenReturn(true); + + TCreateCQReq req = newCreateCQReq(); + CreateCQProcedure procedure = new CreateCQProcedure(req, executor); + + CQInfo cqInfo = new CQInfo(); + cqInfo.addCQ(new AddCQPlan(req, procedure.getCqToken(), System.currentTimeMillis() + 10_000)); + Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ()); + + procedure.recoverScheduledTask(env); + + Mockito.verify(executor) + .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class)); + } + + @SuppressWarnings("unchecked") + @Test + public void recoverScheduledTaskShouldSkipDuplicatedLocalSchedule() throws Exception { + ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); + ConfigManager configManager = Mockito.mock(ConfigManager.class); + ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); + CQManager cqManager = Mockito.mock(CQManager.class); + ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); + Mockito.when(env.getConfigManager()).thenReturn(configManager); + Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); + Mockito.when(configManager.getCQManager()).thenReturn(cqManager); + Mockito.when( + cqManager.markCQLocallyScheduled( + Mockito.anyString(), Mockito.anyString(), Mockito.any(CQScheduleTask.class))) + .thenReturn(false); + + TCreateCQReq req = newCreateCQReq(); + CreateCQProcedure procedure = new CreateCQProcedure(req, executor); + + CQInfo cqInfo = new CQInfo(); + cqInfo.addCQ(new AddCQPlan(req, procedure.getCqToken(), System.currentTimeMillis() + 10_000)); + Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ()); + + procedure.recoverScheduledTask(env); + + Mockito.verify(executor, Mockito.never()) + .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class)); + } +}
