This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch revert in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1cdfb70796fb8ab4de9e672a9c37f7d7103e9ccb Author: Caideyipi <[email protected]> AuthorDate: Wed Jun 3 12:22:54 2026 +0800 Revert "Fix CQ recovery gap and stale callback contamination (#17734) (#17820)" This reverts commit 24be79aba9c973a4cb11fdb8947cdf32da12ea8e. --- iotdb-core/confignode/pom.xml | 4 + .../consensus/request/read/cq/ShowCQPlan.java | 13 --- .../consensus/request/write/cq/ActiveCQPlan.java | 20 ++-- .../consensus/request/write/cq/AddCQPlan.java | 20 ++-- .../consensus/request/write/cq/DropCQPlan.java | 20 ++-- .../request/write/cq/UpdateCQLastExecTimePlan.java | 23 ++-- .../iotdb/confignode/manager/cq/CQManager.java | 102 +----------------- .../confignode/manager/cq/CQScheduleTask.java | 58 ++-------- .../iotdb/confignode/persistence/cq/CQInfo.java | 76 ++++++------- .../persistence/executor/ConfigPlanExecutor.java | 3 +- .../procedure/impl/cq/CreateCQProcedure.java | 80 +++----------- .../request/ConfigPhysicalPlanSerDeTest.java | 8 +- .../apache/iotdb/confignode/cq/CQManagerTest.java | 107 ------------------- .../iotdb/confignode/persistence/CQInfoTest.java | 64 +---------- .../procedure/impl/CreateCQProcedureTest.java | 26 ----- .../impl/cq/CreateCQProcedureRecoveryTest.java | 117 --------------------- 16 files changed, 106 insertions(+), 635 deletions(-) diff --git a/iotdb-core/confignode/pom.xml b/iotdb-core/confignode/pom.xml index a7cd4b6298b..fb22fd5b091 100644 --- a/iotdb-core/confignode/pom.xml +++ b/iotdb-core/confignode/pom.xml @@ -137,6 +137,10 @@ <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </dependency> <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java index c28838d556b..5217849deb4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java @@ -21,24 +21,11 @@ package org.apache.iotdb.confignode.consensus.request.read.cq; import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan; -import java.util.Optional; - import static org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType.SHOW_CQ; public class ShowCQPlan extends ConfigPhysicalReadPlan { - private final String cqId; - public ShowCQPlan() { - this(null); - } - - public ShowCQPlan(String cqId) { super(SHOW_CQ); - this.cqId = cqId; - } - - public Optional<String> getCqId() { - return Optional.ofNullable(cqId); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java index 4ca33b054a9..e488ac25669 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java @@ -35,39 +35,39 @@ public class ActiveCQPlan extends ConfigPhysicalPlan { private String cqId; - private String cqToken; + private String md5; public ActiveCQPlan() { super(ACTIVE_CQ); } - public ActiveCQPlan(String cqId, String cqToken) { + public ActiveCQPlan(String cqId, String md5) { super(ACTIVE_CQ); Validate.notNull(cqId); - Validate.notNull(cqToken); + Validate.notNull(md5); this.cqId = cqId; - this.cqToken = cqToken; + this.md5 = md5; } public String getCqId() { return cqId; } - public String getCqToken() { - return cqToken; + public String getMd5() { + return md5; } @Override protected void serializeImpl(DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); ReadWriteIOUtils.write(cqId, stream); - ReadWriteIOUtils.write(cqToken, stream); + ReadWriteIOUtils.write(md5, stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { cqId = ReadWriteIOUtils.readString(buffer); - cqToken = ReadWriteIOUtils.readString(buffer); + md5 = ReadWriteIOUtils.readString(buffer); } @Override @@ -82,11 +82,11 @@ public class ActiveCQPlan extends ConfigPhysicalPlan { return false; } ActiveCQPlan that = (ActiveCQPlan) o; - return cqId.equals(that.cqId) && cqToken.equals(that.cqToken); + return cqId.equals(that.cqId) && md5.equals(that.md5); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), cqId, cqToken); + return Objects.hash(super.hashCode(), cqId, md5); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java index 721c83d3d2f..0aae9e2e974 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java @@ -37,7 +37,7 @@ public class AddCQPlan extends ConfigPhysicalPlan { private TCreateCQReq req; - private String cqToken; + private String md5; private long firstExecutionTime; @@ -45,12 +45,12 @@ public class AddCQPlan extends ConfigPhysicalPlan { super(ADD_CQ); } - public AddCQPlan(TCreateCQReq req, String cqToken, long firstExecutionTime) { + public AddCQPlan(TCreateCQReq req, String md5, long firstExecutionTime) { super(ADD_CQ); Validate.notNull(req); - Validate.notNull(cqToken); + Validate.notNull(md5); this.req = req; - this.cqToken = cqToken; + this.md5 = md5; this.firstExecutionTime = firstExecutionTime; } @@ -58,8 +58,8 @@ public class AddCQPlan extends ConfigPhysicalPlan { return req; } - public String getCqToken() { - return cqToken; + public String getMd5() { + return md5; } public long getFirstExecutionTime() { @@ -70,14 +70,14 @@ public class AddCQPlan extends ConfigPhysicalPlan { protected void serializeImpl(DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); ThriftCommonsSerDeUtils.serializeTCreateCQReq(req, stream); - ReadWriteIOUtils.write(cqToken, stream); + ReadWriteIOUtils.write(md5, stream); ReadWriteIOUtils.write(firstExecutionTime, stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { req = ThriftCommonsSerDeUtils.deserializeTCreateCQReq(buffer); - cqToken = ReadWriteIOUtils.readString(buffer); + md5 = ReadWriteIOUtils.readString(buffer); firstExecutionTime = ReadWriteIOUtils.readLong(buffer); } @@ -95,11 +95,11 @@ public class AddCQPlan extends ConfigPhysicalPlan { AddCQPlan addCQPlan = (AddCQPlan) o; return firstExecutionTime == addCQPlan.firstExecutionTime && Objects.equals(req, addCQPlan.req) - && Objects.equals(cqToken, addCQPlan.cqToken); + && Objects.equals(md5, addCQPlan.md5); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), req, cqToken, firstExecutionTime); + return Objects.hash(super.hashCode(), req, md5, firstExecutionTime); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java index f22561c0b80..5c901362997 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java @@ -37,7 +37,7 @@ public class DropCQPlan extends ConfigPhysicalPlan { private String cqId; // may be null in user call of drop CQ - private String cqToken; + private String md5; public DropCQPlan() { super(DROP_CQ); @@ -49,33 +49,33 @@ public class DropCQPlan extends ConfigPhysicalPlan { this.cqId = cqId; } - public DropCQPlan(String cqId, String cqToken) { + public DropCQPlan(String cqId, String md5) { super(DROP_CQ); Validate.notNull(cqId); - Validate.notNull(cqToken); + Validate.notNull(md5); this.cqId = cqId; - this.cqToken = cqToken; + this.md5 = md5; } public String getCqId() { return cqId; } - public Optional<String> getCqToken() { - return Optional.ofNullable(cqToken); + public Optional<String> getMd5() { + return Optional.ofNullable(md5); } @Override protected void serializeImpl(DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); ReadWriteIOUtils.write(cqId, stream); - ReadWriteIOUtils.write(cqToken, stream); + ReadWriteIOUtils.write(md5, stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { cqId = ReadWriteIOUtils.readString(buffer); - cqToken = ReadWriteIOUtils.readString(buffer); + md5 = ReadWriteIOUtils.readString(buffer); } @Override @@ -90,11 +90,11 @@ public class DropCQPlan extends ConfigPhysicalPlan { return false; } DropCQPlan that = (DropCQPlan) o; - return cqId.equals(that.cqId) && Objects.equals(cqToken, that.cqToken); + return cqId.equals(that.cqId) && Objects.equals(md5, that.md5); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), cqId, cqToken); + return Objects.hash(super.hashCode(), cqId, md5); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java index 392f6006ba5..51a8358a04c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java @@ -37,19 +37,20 @@ public class UpdateCQLastExecTimePlan extends ConfigPhysicalPlan { private long executionTime; - private String cqToken; + // may be null in user call of drop CQ + private String md5; public UpdateCQLastExecTimePlan() { super(UPDATE_CQ_LAST_EXEC_TIME); } - public UpdateCQLastExecTimePlan(String cqId, long executionTime, String cqToken) { + public UpdateCQLastExecTimePlan(String cqId, long executionTime, String md5) { super(UPDATE_CQ_LAST_EXEC_TIME); Validate.notNull(cqId); - Validate.notNull(cqToken); + Validate.notNull(md5); this.cqId = cqId; this.executionTime = executionTime; - this.cqToken = cqToken; + this.md5 = md5; } public String getCqId() { @@ -60,8 +61,8 @@ public class UpdateCQLastExecTimePlan extends ConfigPhysicalPlan { return executionTime; } - public String getCqToken() { - return cqToken; + public String getMd5() { + return md5; } @Override @@ -69,14 +70,14 @@ public class UpdateCQLastExecTimePlan extends ConfigPhysicalPlan { stream.writeShort(getType().getPlanType()); ReadWriteIOUtils.write(cqId, stream); ReadWriteIOUtils.write(executionTime, stream); - ReadWriteIOUtils.write(cqToken, stream); + ReadWriteIOUtils.write(md5, stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { cqId = ReadWriteIOUtils.readString(buffer); executionTime = ReadWriteIOUtils.readLong(buffer); - cqToken = ReadWriteIOUtils.readString(buffer); + md5 = ReadWriteIOUtils.readString(buffer); } @Override @@ -91,13 +92,11 @@ public class UpdateCQLastExecTimePlan extends ConfigPhysicalPlan { return false; } UpdateCQLastExecTimePlan that = (UpdateCQLastExecTimePlan) o; - return executionTime == that.executionTime - && cqId.equals(that.cqId) - && cqToken.equals(that.cqToken); + return executionTime == that.executionTime && cqId.equals(that.cqId) && md5.equals(that.md5); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), cqId, executionTime, cqToken); + return Objects.hash(super.hashCode(), cqId, executionTime, md5); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java index a90a21f285f..5726b3ce826 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java @@ -42,10 +42,7 @@ import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.List; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -59,15 +56,11 @@ public class CQManager { private final ReadWriteLock lock; - // Key: CQ id. Value: the local task and the metadata token it owns. - private final ConcurrentMap<String, LocallyScheduledCQ> locallyScheduledCQs; - private ScheduledExecutorService executor; public CQManager(ConfigManager configManager) { this.configManager = configManager; this.lock = new ReentrantReadWriteLock(); - this.locallyScheduledCQs = new ConcurrentHashMap<>(); this.executor = IoTDBThreadPoolFactory.newScheduledThreadPool( CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName()); @@ -84,21 +77,14 @@ public class CQManager { } public TSStatus dropCQ(TDropCQReq req) { - lock.readLock().lock(); try { - TSStatus status = configManager.getConsensusManager().write(new DropCQPlan(req.cqId)); - if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - cancelLocallyScheduledCQ(req.cqId); - } - return status; + return configManager.getConsensusManager().write(new DropCQPlan(req.cqId)); } catch (ConsensusException e) { LOGGER.warn("Unexpected error happened while dropping cq {}: ", req.cqId, e); // consensus layer related errors TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); res.setMessage(e.getMessage()); return res; - } finally { - lock.readLock().unlock(); } } @@ -131,7 +117,6 @@ public class CQManager { try { // 1. shutdown previous cq schedule thread pool try { - cancelAllLocallyScheduledCQs(); if (executor != null) { executor.shutdown(); } @@ -169,15 +154,7 @@ public class CQManager { for (CQInfo.CQEntry entry : allCQs) { if (entry.getState() == CQState.ACTIVE) { CQScheduleTask cqScheduleTask = new CQScheduleTask(entry, executor, configManager); - if (!markCQLocallyScheduled(entry.getCqId(), entry.getCqToken(), cqScheduleTask)) { - continue; - } - try { - cqScheduleTask.submitSelf(); - } catch (RuntimeException e) { - unmarkCQLocallyScheduled(entry.getCqId(), entry.getCqToken()); - throw e; - } + cqScheduleTask.submitSelf(); } } } @@ -197,7 +174,6 @@ public class CQManager { try { previous = executor; executor = null; - cancelAllLocallyScheduledCQs(); } finally { lock.writeLock().unlock(); } @@ -205,78 +181,4 @@ public class CQManager { previous.shutdown(); } } - - public boolean markCQLocallyScheduled(String cqId, String cqToken, CQScheduleTask task) { - AtomicBoolean shouldSchedule = new AtomicBoolean(false); - LocallyScheduledCQ schedule = new LocallyScheduledCQ(cqToken, task); - lock.readLock().lock(); - try { - locallyScheduledCQs.compute( - cqId, - (ignored, previousSchedule) -> { - if (previousSchedule != null && previousSchedule.hasToken(cqToken)) { - return previousSchedule; - } - if (previousSchedule != null) { - previousSchedule.cancel(); - } - shouldSchedule.set(true); - return schedule; - }); - if (!shouldSchedule.get()) { - task.cancel(); - } - return shouldSchedule.get(); - } finally { - lock.readLock().unlock(); - } - } - - public void unmarkCQLocallyScheduled(String cqId, String cqToken) { - lock.readLock().lock(); - try { - locallyScheduledCQs.computeIfPresent( - cqId, - (ignored, schedule) -> { - if (schedule.hasToken(cqToken)) { - schedule.cancel(); - return null; - } - return schedule; - }); - } finally { - lock.readLock().unlock(); - } - } - - private void cancelLocallyScheduledCQ(String cqId) { - LocallyScheduledCQ schedule = locallyScheduledCQs.remove(cqId); - if (schedule != null) { - schedule.cancel(); - } - } - - private void cancelAllLocallyScheduledCQs() { - locallyScheduledCQs.values().forEach(LocallyScheduledCQ::cancel); - locallyScheduledCQs.clear(); - } - - private static class LocallyScheduledCQ { - - private final String cqToken; - private final CQScheduleTask task; - - private LocallyScheduledCQ(String cqToken, CQScheduleTask task) { - this.cqToken = cqToken; - this.task = task; - } - - private boolean hasToken(String cqToken) { - return this.cqToken.equals(cqToken); - } - - private void cancel() { - task.cancel(); - } - } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java index d85d0f12a30..6125edc1ef7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java @@ -39,10 +39,7 @@ import org.slf4j.LoggerFactory; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; public class CQScheduleTask implements Runnable { @@ -72,7 +69,7 @@ public class CQScheduleTask implements Runnable { private final long endTimeOffset; private final TimeoutPolicy timeoutPolicy; private final String queryBody; - private final String cqToken; + private final String md5; private final String zoneId; @@ -84,15 +81,12 @@ public class CQScheduleTask implements Runnable { private final long retryWaitTimeInMS; - private final AtomicBoolean cancelled; - private final AtomicReference<ScheduledFuture<?>> scheduledFuture; - private long executionTime; public CQScheduleTask( TCreateCQReq req, long firstExecutionTime, - String cqToken, + String md5, ScheduledExecutorService executor, ConfigManager configManager) { this( @@ -102,7 +96,7 @@ public class CQScheduleTask implements Runnable { req.endTimeOffset, TimeoutPolicy.deserialize(req.timeoutPolicy), req.queryBody, - cqToken, + md5, req.zoneId, req.username, executor, @@ -119,7 +113,7 @@ public class CQScheduleTask implements Runnable { entry.getEndTimeOffset(), entry.getTimeoutPolicy(), entry.getQueryBody(), - entry.getCqToken(), + entry.getMd5(), entry.getZoneId(), entry.getUsername(), executor, @@ -135,7 +129,7 @@ public class CQScheduleTask implements Runnable { long endTimeOffset, TimeoutPolicy timeoutPolicy, String queryBody, - String cqToken, + String md5, String zoneId, String username, ScheduledExecutorService executor, @@ -147,14 +141,12 @@ public class CQScheduleTask implements Runnable { this.endTimeOffset = endTimeOffset; this.timeoutPolicy = timeoutPolicy; this.queryBody = queryBody; - this.cqToken = cqToken; + this.md5 = md5; this.zoneId = zoneId; this.username = username; this.executor = executor; this.configManager = configManager; this.retryWaitTimeInMS = Math.min(DEFAULT_RETRY_WAIT_TIME_IN_MS, everyInterval / FACTOR); - this.cancelled = new AtomicBoolean(false); - this.scheduledFuture = new AtomicReference<>(); this.executionTime = executionTime; } @@ -173,9 +165,6 @@ public class CQScheduleTask implements Runnable { @Override public void run() { - if (cancelled.get()) { - return; - } long startTime = executionTime - startTimeOffset; long endTime = executionTime - endTimeOffset; @@ -188,9 +177,6 @@ public class CQScheduleTask implements Runnable { submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS); } } else { - if (cancelled.get()) { - return; - } LOGGER.info( "[StartExecuteCQ] execute CQ {} on DataNode[{}], time range is [{}, {}), current time is {}", cqId, @@ -220,32 +206,12 @@ public class CQScheduleTask implements Runnable { } private void submitSelf(long delay, TimeUnit unit) { - if (cancelled.get()) { - return; - } - ScheduledFuture<?> newFuture = executor.schedule(this, delay, unit); - ScheduledFuture<?> previousFuture = scheduledFuture.getAndSet(newFuture); - if (previousFuture != null) { - previousFuture.cancel(false); - } - if (cancelled.get() && scheduledFuture.compareAndSet(newFuture, null)) { - newFuture.cancel(false); - } - } - - public void cancel() { - cancelled.set(true); - ScheduledFuture<?> currentFuture = scheduledFuture.getAndSet(null); - if (currentFuture != null) { - currentFuture.cancel(false); - } + executor.schedule(this, delay, unit); } private boolean needSubmit() { // current node is still leader and thread pool is not shut down. - return !cancelled.get() - && configManager.getConsensusManager().isLeader() - && !executor.isShutdown(); + return configManager.getConsensusManager().isLeader() && !executor.isShutdown(); } private class AsyncExecuteCQCallback implements AsyncMethodCallback<TSStatus> { @@ -272,9 +238,6 @@ public class CQScheduleTask implements Runnable { @Override public void onComplete(TSStatus response) { - if (cancelled.get()) { - return; - } if (response.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.info( @@ -288,7 +251,7 @@ public class CQScheduleTask implements Runnable { result = configManager .getConsensusManager() - .write(new UpdateCQLastExecTimePlan(cqId, executionTime, cqToken)); + .write(new UpdateCQLastExecTimePlan(cqId, executionTime, md5)); } catch (ConsensusException e) { result = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); result.setMessage(e.getMessage()); @@ -328,9 +291,6 @@ public class CQScheduleTask implements Runnable { @Override public void onError(Exception exception) { - if (cancelled.get()) { - return; - } LOGGER.warn("Execute CQ {} failed", cqId, exception); if (needSubmit()) { submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java index b24f1ca9aa5..4c1225ef48a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java @@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.cq.CQState; import org.apache.iotdb.commons.cq.TimeoutPolicy; import org.apache.iotdb.commons.snapshot.SnapshotProcessor; -import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; @@ -45,9 +44,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -64,7 +61,7 @@ public class CQInfo implements SnapshotProcessor { private static final String CQ_NOT_EXIST_FORMAT = "CQ %s doesn't exist."; - private static final String CQ_TOKEN_NOT_MATCH_FORMAT = "Token of CQ %s doesn't match"; + private static final String MD5_NOT_MATCH_FORMAT = "MD5 of CQ %s doesn't match"; private final Map<String, CQEntry> cqMap; @@ -94,7 +91,7 @@ public class CQInfo implements SnapshotProcessor { CQEntry cqEntry = new CQEntry( plan.getReq(), - plan.getCqToken(), + plan.getMd5(), plan.getFirstExecutionTime() - plan.getReq().everyInterval); cqMap.put(cqId, cqEntry); res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode(); @@ -108,13 +105,13 @@ public class CQInfo implements SnapshotProcessor { /** * Drop the CQ whose ID is same as <tt>cqId</tt> in plan. * - * @return SUCCESS_STATUS if there is CQ whose ID and token is same as <tt>cqId</tt> in plan, + * @return SUCCESS_STATUS if there is CQ whose ID and md5 is same as <tt>cqId</tt> in plan, * otherwise NO_SUCH_CQ. */ public TSStatus dropCQ(DropCQPlan plan) { TSStatus res = new TSStatus(); String cqId = plan.getCqId(); - Optional<String> cqToken = plan.getCqToken(); + Optional<String> md5 = plan.getMd5(); lock.writeLock().lock(); try { CQEntry cqEntry = cqMap.get(cqId); @@ -122,10 +119,10 @@ public class CQInfo implements SnapshotProcessor { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId); LOGGER.warn("Drop CQ {} failed, because it doesn't exist.", cqId); - } else if ((cqToken.isPresent() && !cqToken.get().equals(cqEntry.cqToken))) { + } else if ((md5.isPresent() && !md5.get().equals(cqEntry.md5))) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); - res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId); - LOGGER.warn("Drop CQ {} failed, because its token doesn't match.", cqId); + res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId); + LOGGER.warn("Drop CQ {} failed, because its MD5 doesn't match.", cqId); } else { cqMap.remove(cqId); res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode(); @@ -138,24 +135,11 @@ public class CQInfo implements SnapshotProcessor { } public ShowCQResp showCQ() { - return showCQ(new ShowCQPlan()); - } - - public ShowCQResp showCQ(ShowCQPlan plan) { lock.readLock().lock(); try { - Optional<String> cqId = plan.getCqId(); - List<CQEntry> cqList; - if (cqId.isPresent()) { - CQEntry cqEntry = cqMap.get(cqId.get()); - cqList = - cqEntry == null - ? Collections.emptyList() - : Collections.singletonList(new CQEntry(cqEntry)); - } else { - cqList = cqMap.values().stream().map(CQEntry::new).collect(Collectors.toList()); - } - return new ShowCQResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), cqList); + return new ShowCQResp( + new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), + cqMap.values().stream().map(CQEntry::new).collect(Collectors.toList())); } finally { lock.readLock().unlock(); } @@ -169,16 +153,16 @@ public class CQInfo implements SnapshotProcessor { public TSStatus activeCQ(ActiveCQPlan plan) { TSStatus res = new TSStatus(); String cqId = plan.getCqId(); - String cqToken = plan.getCqToken(); + String md5 = plan.getMd5(); lock.writeLock().lock(); try { CQEntry cqEntry = cqMap.get(cqId); if (cqEntry == null) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId); - } else if (!cqToken.equals(cqEntry.cqToken)) { + } else if (!md5.equals(cqEntry.md5)) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); - res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId); + res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId); } else if (cqEntry.state == CQState.ACTIVE) { res.code = TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode(); res.message = String.format("CQ %s has already been active", cqId); @@ -196,22 +180,22 @@ public class CQInfo implements SnapshotProcessor { * Update the last execution time of the corresponding CQ. * * @return SUCCESS_STATUS if successfully updated, or NO_SUCH_CQ if 1. the CQ doesn't exist; or 2. - * token is different. or CQ_UPDATE_LAST_EXEC_TIME_FAILED 3. original lastExecutionTime >= + * md5 is different. or CQ_UPDATE_LAST_EXEC_TIME_FAILED 3. original lastExecutionTime >= * current lastExecutionTime; */ public TSStatus updateCQLastExecutionTime(UpdateCQLastExecTimePlan plan) { TSStatus res = new TSStatus(); String cqId = plan.getCqId(); - String cqToken = plan.getCqToken(); + String md5 = plan.getMd5(); lock.writeLock().lock(); try { CQEntry cqEntry = cqMap.get(cqId); if (cqEntry == null) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId); - } else if (!cqToken.equals(cqEntry.cqToken)) { + } else if (!md5.equals(cqEntry.md5)) { res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode(); - res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId); + res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId); } else if (cqEntry.lastExecutionTime >= plan.getExecutionTime()) { res.code = TSStatusCode.CQ_UPDATE_LAST_EXEC_TIME_ERROR.getStatusCode(); res.message = @@ -315,7 +299,7 @@ public class CQInfo implements SnapshotProcessor { private final TimeoutPolicy timeoutPolicy; private final String queryBody; private final String sql; - private final String cqToken; + private final String md5; private final String zoneId; @@ -324,7 +308,7 @@ public class CQInfo implements SnapshotProcessor { private CQState state; private long lastExecutionTime; - private CQEntry(TCreateCQReq req, String cqToken, long lastExecutionTime) { + private CQEntry(TCreateCQReq req, String md5, long lastExecutionTime) { this( req.cqId, req.everyInterval, @@ -334,7 +318,7 @@ public class CQInfo implements SnapshotProcessor { TimeoutPolicy.deserialize(req.timeoutPolicy), req.queryBody, req.sql, - cqToken, + md5, req.zoneId, req.username, CQState.INACTIVE, @@ -351,7 +335,7 @@ public class CQInfo implements SnapshotProcessor { other.timeoutPolicy, other.queryBody, other.sql, - other.cqToken, + other.md5, other.zoneId, other.username, other.state, @@ -368,7 +352,7 @@ public class CQInfo implements SnapshotProcessor { TimeoutPolicy timeoutPolicy, String queryBody, String sql, - String cqToken, + String md5, String zoneId, String username, CQState state, @@ -381,7 +365,7 @@ public class CQInfo implements SnapshotProcessor { this.timeoutPolicy = timeoutPolicy; this.queryBody = queryBody; this.sql = sql; - this.cqToken = cqToken; + this.md5 = md5; this.zoneId = zoneId; this.username = username; this.state = state; @@ -397,7 +381,7 @@ public class CQInfo implements SnapshotProcessor { ReadWriteIOUtils.write(timeoutPolicy.getType(), stream); ReadWriteIOUtils.write(queryBody, stream); ReadWriteIOUtils.write(sql, stream); - ReadWriteIOUtils.write(cqToken, stream); + ReadWriteIOUtils.write(md5, stream); ReadWriteIOUtils.write(zoneId, stream); ReadWriteIOUtils.write(username, stream); ReadWriteIOUtils.write(state.getType(), stream); @@ -413,7 +397,7 @@ public class CQInfo implements SnapshotProcessor { TimeoutPolicy timeoutPolicy = TimeoutPolicy.deserialize(ReadWriteIOUtils.readByte(stream)); String queryBody = ReadWriteIOUtils.readString(stream); String sql = ReadWriteIOUtils.readString(stream); - String cqToken = ReadWriteIOUtils.readString(stream); + String md5 = ReadWriteIOUtils.readString(stream); String zoneId = ReadWriteIOUtils.readString(stream); String username = ReadWriteIOUtils.readString(stream); CQState state = CQState.deserialize(ReadWriteIOUtils.readByte(stream)); @@ -427,7 +411,7 @@ public class CQInfo implements SnapshotProcessor { timeoutPolicy, queryBody, sql, - cqToken, + md5, zoneId, username, state, @@ -466,8 +450,8 @@ public class CQInfo implements SnapshotProcessor { return sql; } - public String getCqToken() { - return cqToken; + public String getMd5() { + return md5; } public CQState getState() { @@ -504,7 +488,7 @@ public class CQInfo implements SnapshotProcessor { && timeoutPolicy == cqEntry.timeoutPolicy && Objects.equals(queryBody, cqEntry.queryBody) && Objects.equals(sql, cqEntry.sql) - && Objects.equals(cqToken, cqEntry.cqToken) + && Objects.equals(md5, cqEntry.md5) && Objects.equals(zoneId, cqEntry.zoneId) && Objects.equals(username, cqEntry.username) && state == cqEntry.state; @@ -521,7 +505,7 @@ public class CQInfo implements SnapshotProcessor { timeoutPolicy, queryBody, sql, - cqToken, + md5, zoneId, username, state, diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index 60ec748fb54..09017a841c7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -30,7 +30,6 @@ import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan; import org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan; import org.apache.iotdb.confignode.consensus.request.read.auth.AuthorReadPlan; -import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan; import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan; import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan; @@ -323,7 +322,7 @@ public class ConfigPlanExecutor { case GetSeriesSlotList: return partitionInfo.getSeriesSlotList((GetSeriesSlotListPlan) req); case SHOW_CQ: - return cqInfo.showCQ((ShowCQPlan) req); + return cqInfo.showCQ(); case GetFunctionTable: return udfInfo.getUDFTable(); case GetFunctionJar: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java index af7f968e8a5..3da60c5ad84 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java @@ -22,14 +22,10 @@ package org.apache.iotdb.confignode.procedure.impl.cq; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; -import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; -import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp; -import org.apache.iotdb.confignode.manager.cq.CQManager; import org.apache.iotdb.confignode.manager.cq.CQScheduleTask; -import org.apache.iotdb.confignode.persistence.cq.CQInfo; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure; @@ -39,6 +35,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,8 +44,6 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Objects; -import java.util.Optional; -import java.util.UUID; import java.util.concurrent.ScheduledExecutorService; import static org.apache.iotdb.confignode.procedure.state.cq.CreateCQState.INACTIVE; @@ -64,7 +59,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { private TCreateCQReq req; - private String cqToken; + private String md5; private long firstExecutionTime; @@ -79,7 +74,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { public CreateCQProcedure(TCreateCQReq req, ScheduledExecutorService executor) { super(); this.req = req; - this.cqToken = generateCQToken(); + this.md5 = DigestUtils.md2Hex(req.cqId); this.executor = executor; this.firstExecutionTime = CQScheduleTask.getFirstExecutionTime(req.boundaryTime, req.everyInterval); @@ -95,16 +90,12 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { addCQ(env); return Flow.HAS_MORE_STATE; case INACTIVE: - submitScheduleTask( - env, - new CQScheduleTask( - req, firstExecutionTime, cqToken, executor, env.getConfigManager())); + CQScheduleTask cqScheduleTask = + new CQScheduleTask(req, firstExecutionTime, md5, executor, env.getConfigManager()); + cqScheduleTask.submitSelf(); setNextState(SCHEDULED); break; case SCHEDULED: - if (isStateDeserialized()) { - recoverScheduledTask(env); - } activeCQ(env); return Flow.NO_MORE_STATE; default: @@ -134,7 +125,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { res = env.getConfigManager() .getConsensusManager() - .write(new AddCQPlan(req, cqToken, firstExecutionTime)); + .write(new AddCQPlan(req, md5, firstExecutionTime)); } catch (ConsensusException e) { LOGGER.warn(CONSENSUS_WRITE_ERROR, e); res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); @@ -155,7 +146,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { private void activeCQ(ConfigNodeProcedureEnv env) { TSStatus res; try { - res = env.getConfigManager().getConsensusManager().write(new ActiveCQPlan(req.cqId, cqToken)); + res = env.getConfigManager().getConsensusManager().write(new ActiveCQPlan(req.cqId, md5)); } catch (ConsensusException e) { LOGGER.warn(CONSENSUS_WRITE_ERROR, e); res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); @@ -173,42 +164,6 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { } } - void recoverScheduledTask(ConfigNodeProcedureEnv env) throws ConsensusException { - Optional<CQInfo.CQEntry> cqEntry = getCurrentCQEntry(env); - if (!cqEntry.isPresent()) { - LOGGER.info( - "Skip recovering the schedule task of CQ {} because its metadata is unavailable.", - req.cqId); - return; - } - submitScheduleTask(env, new CQScheduleTask(cqEntry.get(), executor, env.getConfigManager())); - } - - Optional<CQInfo.CQEntry> getCurrentCQEntry(ConfigNodeProcedureEnv env) throws ConsensusException { - ShowCQResp response = - (ShowCQResp) env.getConfigManager().getConsensusManager().read(new ShowCQPlan(req.cqId)); - return response.getCqList().stream() - .filter(entry -> cqToken.equals(entry.getCqToken())) - .findFirst(); - } - - private static String generateCQToken() { - return UUID.randomUUID().toString(); - } - - private void submitScheduleTask(ConfigNodeProcedureEnv env, CQScheduleTask cqScheduleTask) { - CQManager cqManager = env.getConfigManager().getCQManager(); - if (!cqManager.markCQLocallyScheduled(req.cqId, cqToken, cqScheduleTask)) { - return; - } - try { - cqScheduleTask.submitSelf(); - } catch (RuntimeException e) { - cqManager.unmarkCQLocallyScheduled(req.cqId, cqToken); - throw e; - } - } - @Override protected void rollbackState(ConfigNodeProcedureEnv env, CreateCQState state) throws IOException, InterruptedException, ProcedureException { @@ -221,8 +176,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { LOGGER.info("Start [INACTIVE] rollback of CQ {}", req.cqId); TSStatus res; try { - res = - env.getConfigManager().getConsensusManager().write(new DropCQPlan(req.cqId, cqToken)); + res = env.getConfigManager().getConsensusManager().write(new DropCQPlan(req.cqId, md5)); } catch (ConsensusException e) { LOGGER.warn(CONSENSUS_WRITE_ERROR, e); res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); @@ -273,7 +227,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { stream.writeShort(ProcedureType.CREATE_CQ_PROCEDURE.getTypeCode()); super.serialize(stream); ThriftCommonsSerDeUtils.serializeTCreateCQReq(req, stream); - ReadWriteIOUtils.write(cqToken, stream); + ReadWriteIOUtils.write(md5, stream); ReadWriteIOUtils.write(firstExecutionTime, stream); } @@ -281,7 +235,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { public void deserialize(ByteBuffer byteBuffer) { super.deserialize(byteBuffer); this.req = ThriftCommonsSerDeUtils.deserializeTCreateCQReq(byteBuffer); - this.cqToken = ReadWriteIOUtils.readString(byteBuffer); + this.md5 = ReadWriteIOUtils.readString(byteBuffer); this.firstExecutionTime = ReadWriteIOUtils.readLong(byteBuffer); } @@ -300,7 +254,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { && isGeneratedByPipe == that.isGeneratedByPipe && firstExecutionTime == that.firstExecutionTime && Objects.equals(req, that.req) - && Objects.equals(cqToken, that.cqToken); + && Objects.equals(md5, that.md5); } @Override @@ -311,15 +265,7 @@ public class CreateCQProcedure extends AbstractNodeProcedure<CreateCQState> { getCycles(), isGeneratedByPipe, req, - cqToken, + md5, firstExecutionTime); } - - public String getCqId() { - return req == null ? null : req.getCqId(); - } - - public String getCqToken() { - return cqToken; - } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java index 1c73fe323ce..3b0554baa11 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java @@ -1213,7 +1213,7 @@ public class ConfigPhysicalPlanSerDeTest { @Test public void ActiveCQPlanTest() throws IOException { - ActiveCQPlan activeCQPlan0 = new ActiveCQPlan("testCq", "testCqToken"); + ActiveCQPlan activeCQPlan0 = new ActiveCQPlan("testCq", "testCq_md5"); ActiveCQPlan activeCQPlan1 = (ActiveCQPlan) ConfigPhysicalPlan.Factory.create(activeCQPlan0.serializeToByteBuffer()); @@ -1236,7 +1236,7 @@ public class ConfigPhysicalPlanSerDeTest { "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END", "Asia", "root"), - "testCq1Token", + "testCq1_md5", executionTime); AddCQPlan addCQPlan1 = (AddCQPlan) ConfigPhysicalPlan.Factory.create(addCQPlan0.serializeToByteBuffer()); @@ -1251,7 +1251,7 @@ public class ConfigPhysicalPlanSerDeTest { (DropCQPlan) ConfigPhysicalPlan.Factory.create(dropCQPlan0.serializeToByteBuffer()); Assert.assertEquals(dropCQPlan0, dropCQPlan1); - dropCQPlan0 = new DropCQPlan("testCq1", "testCq1Token"); + dropCQPlan0 = new DropCQPlan("testCq1", "testCq1_md5"); dropCQPlan1 = (DropCQPlan) ConfigPhysicalPlan.Factory.create(dropCQPlan0.serializeToByteBuffer()); Assert.assertEquals(dropCQPlan0, dropCQPlan1); @@ -1260,7 +1260,7 @@ public class ConfigPhysicalPlanSerDeTest { @Test public void UpdateCQLastExecTimePlanTest() throws IOException { UpdateCQLastExecTimePlan updateCQLastExecTimePlan0 = - new UpdateCQLastExecTimePlan("testCq", System.currentTimeMillis(), "testCqToken"); + new UpdateCQLastExecTimePlan("testCq", System.currentTimeMillis(), "testCq_md5"); UpdateCQLastExecTimePlan updateCQLastExecTimePlan1 = (UpdateCQLastExecTimePlan) ConfigPhysicalPlan.Factory.create(updateCQLastExecTimePlan0.serializeToByteBuffer()); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java deleted file mode 100644 index a0bc5a523ba..00000000000 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iotdb.confignode.cq; - -import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.cq.TimeoutPolicy; -import org.apache.iotdb.confignode.manager.ConfigManager; -import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; -import org.apache.iotdb.confignode.manager.cq.CQManager; -import org.apache.iotdb.confignode.manager.cq.CQScheduleTask; -import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq; -import org.apache.iotdb.rpc.TSStatusCode; - -import org.junit.Test; -import org.mockito.Mockito; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertTrue; - -public class CQManagerTest { - - @SuppressWarnings("unchecked") - @Test - public void dropCQShouldCancelLocallyScheduledTask() throws Exception { - ConfigManager configManager = Mockito.mock(ConfigManager.class); - ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); - Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); - Mockito.when(consensusManager.write(Mockito.any())) - .thenReturn(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); - CQManager cqManager = new CQManager(configManager); - ScheduledFuture<?> future = Mockito.mock(ScheduledFuture.class); - CQScheduleTask task = newScheduledTask(configManager, future, "token"); - - try { - assertTrue(cqManager.markCQLocallyScheduled("testCq", "token", task)); - task.submitSelf(); - cqManager.dropCQ(new TDropCQReq("testCq")); - - Mockito.verify(future).cancel(false); - } finally { - cqManager.stopCQScheduler(); - } - } - - @SuppressWarnings("unchecked") - @Test - public void newTokenShouldCancelPreviousLocallyScheduledTask() { - ConfigManager configManager = Mockito.mock(ConfigManager.class); - CQManager cqManager = new CQManager(configManager); - ScheduledFuture<?> previousFuture = Mockito.mock(ScheduledFuture.class); - CQScheduleTask previousTask = newScheduledTask(configManager, previousFuture, "previousToken"); - ScheduledFuture<?> currentFuture = Mockito.mock(ScheduledFuture.class); - CQScheduleTask currentTask = newScheduledTask(configManager, currentFuture, "currentToken"); - - try { - assertTrue(cqManager.markCQLocallyScheduled("testCq", "previousToken", previousTask)); - previousTask.submitSelf(); - assertTrue(cqManager.markCQLocallyScheduled("testCq", "currentToken", currentTask)); - - Mockito.verify(previousFuture).cancel(false); - } finally { - cqManager.stopCQScheduler(); - } - } - - @SuppressWarnings("unchecked") - private CQScheduleTask newScheduledTask( - ConfigManager configManager, ScheduledFuture<?> scheduledFuture, String cqToken) { - ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); - Mockito.when( - executor.schedule( - Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class))) - .thenReturn((ScheduledFuture) scheduledFuture); - return new CQScheduleTask( - "testCq", - 1000, - 0, - 1000, - TimeoutPolicy.BLOCKED, - "select s1 into root.backup.d1.s1 from root.sg.d1", - cqToken, - "Asia", - "root", - executor, - configManager, - System.currentTimeMillis() + 10_000); - } -} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java index 8bf2d12cd59..bfadd3d05e8 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java @@ -18,14 +18,9 @@ */ package org.apache.iotdb.confignode.persistence; -import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; -import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; -import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan; -import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp; import org.apache.iotdb.confignode.persistence.cq.CQInfo; import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; -import org.apache.iotdb.rpc.TSStatusCode; import org.apache.commons.io.FileUtils; import org.apache.thrift.TException; @@ -75,7 +70,7 @@ public class CQInfoTest { "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END", "Asia", "root"), - "testCq1Token", + "testCq1_md5", executionTime); cqInfo.addCQ(addCQPlan); @@ -94,7 +89,7 @@ public class CQInfoTest { "create cq testCq2 BEGIN select s1 into root.backup.d2.s1 from root.sg.d2 END", "Asia", "root"), - "testCq2Token", + "testCq2_md5", executionTime); cqInfo.addCQ(addCQPlan); @@ -104,59 +99,4 @@ public class CQInfoTest { Assert.assertEquals(cqInfo, actualCQInfo); } - - @Test - public void testOldCallbackCannotTouchRecreatedCQ() throws Exception { - long executionTime = System.currentTimeMillis(); - TCreateCQReq req = - new TCreateCQReq( - "testCq3", - 1000, - 0, - 1000, - 0, - (byte) 0, - "select s1 into root.backup.d3.s1 from root.sg.d3", - "create cq testCq3 BEGIN select s1 into root.backup.d3.s1 from root.sg.d3 END", - "Asia", - "root"); - - cqInfo.addCQ(new AddCQPlan(req, "oldToken", executionTime)); - cqInfo.dropCQ(new DropCQPlan("testCq3")); - cqInfo.addCQ(new AddCQPlan(req, "newToken", executionTime)); - - Assert.assertEquals( - TSStatusCode.NO_SUCH_CQ.getStatusCode(), - cqInfo.updateCQLastExecutionTime( - new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, "oldToken")) - .code); - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), - cqInfo.updateCQLastExecutionTime( - new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000, "newToken")) - .code); - } - - @Test - public void testShowCQCanFilterByCQId() throws Exception { - long executionTime = System.currentTimeMillis(); - TCreateCQReq req = - new TCreateCQReq( - "testCq4", - 1000, - 0, - 1000, - 0, - (byte) 0, - "select s1 into root.backup.d4.s1 from root.sg.d4", - "create cq testCq4 BEGIN select s1 into root.backup.d4.s1 from root.sg.d4 END", - "Asia", - "root"); - cqInfo.addCQ(new AddCQPlan(req, "testCq4Token", executionTime)); - - ShowCQResp showCQResp = cqInfo.showCQ(new ShowCQPlan("testCq4")); - - Assert.assertEquals(1, showCQResp.getCqList().size()); - Assert.assertEquals("testCq4", showCQResp.getCqList().get(0).getCqId()); - } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java index 3e7fd2052ad..d0e92b32816 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java @@ -36,36 +36,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.fail; public class CreateCQProcedureTest { - @Test - public void tokenShouldBeUniqueForSameCQId() { - ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); - try { - TCreateCQReq req = - new TCreateCQReq( - "testCq1", - 1000, - 0, - 1000, - 0, - (byte) 0, - "select s1 into root.backup.d1(s1) from root.sg.d1", - "create cq testCq1 BEGIN select s1 into root.backup.d1(s1) from root.sg.d1 END", - "Asia", - "root"); - CreateCQProcedure createCQProcedure1 = new CreateCQProcedure(req, executor); - CreateCQProcedure createCQProcedure2 = new CreateCQProcedure(req, executor); - - assertNotEquals(createCQProcedure1.getCqToken(), createCQProcedure2.getCqToken()); - } finally { - executor.shutdown(); - } - } - @Test public void serializeDeserializeTest() { diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java deleted file mode 100644 index a90e282494f..00000000000 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.confignode.procedure.impl.cq; - -import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan; -import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; -import org.apache.iotdb.confignode.manager.ConfigManager; -import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; -import org.apache.iotdb.confignode.manager.cq.CQManager; -import org.apache.iotdb.confignode.manager.cq.CQScheduleTask; -import org.apache.iotdb.confignode.persistence.cq.CQInfo; -import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; -import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq; - -import org.junit.Test; -import org.mockito.Mockito; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -public class CreateCQProcedureRecoveryTest { - - private TCreateCQReq newCreateCQReq() { - return new TCreateCQReq( - "testCq1", - 1000, - 0, - 1000, - 0, - (byte) 0, - "select s1 into root.backup.d1.s1 from root.sg.d1", - "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from root.sg.d1 END", - "Asia", - "root"); - } - - @SuppressWarnings("unchecked") - @Test - public void recoverScheduledTaskShouldResubmitFromLatestMetadata() throws Exception { - ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); - Mockito.when( - executor.schedule( - Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class))) - .thenReturn(Mockito.mock(ScheduledFuture.class)); - - ConfigManager configManager = Mockito.mock(ConfigManager.class); - ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); - CQManager cqManager = Mockito.mock(CQManager.class); - ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); - Mockito.when(env.getConfigManager()).thenReturn(configManager); - Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); - Mockito.when(configManager.getCQManager()).thenReturn(cqManager); - Mockito.when( - cqManager.markCQLocallyScheduled( - Mockito.anyString(), Mockito.anyString(), Mockito.any(CQScheduleTask.class))) - .thenReturn(true); - - TCreateCQReq req = newCreateCQReq(); - CreateCQProcedure procedure = new CreateCQProcedure(req, executor); - - CQInfo cqInfo = new CQInfo(); - cqInfo.addCQ(new AddCQPlan(req, procedure.getCqToken(), System.currentTimeMillis() + 10_000)); - Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ()); - - procedure.recoverScheduledTask(env); - - Mockito.verify(executor) - .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class)); - } - - @SuppressWarnings("unchecked") - @Test - public void recoverScheduledTaskShouldSkipDuplicatedLocalSchedule() throws Exception { - ScheduledExecutorService executor = Mockito.mock(ScheduledExecutorService.class); - ConfigManager configManager = Mockito.mock(ConfigManager.class); - ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class); - CQManager cqManager = Mockito.mock(CQManager.class); - ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); - Mockito.when(env.getConfigManager()).thenReturn(configManager); - Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager); - Mockito.when(configManager.getCQManager()).thenReturn(cqManager); - Mockito.when( - cqManager.markCQLocallyScheduled( - Mockito.anyString(), Mockito.anyString(), Mockito.any(CQScheduleTask.class))) - .thenReturn(false); - - TCreateCQReq req = newCreateCQReq(); - CreateCQProcedure procedure = new CreateCQProcedure(req, executor); - - CQInfo cqInfo = new CQInfo(); - cqInfo.addCQ(new AddCQPlan(req, procedure.getCqToken(), System.currentTimeMillis() + 10_000)); - Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ()); - - procedure.recoverScheduledTask(env); - - Mockito.verify(executor, Mockito.never()) - .schedule(Mockito.any(Runnable.class), Mockito.anyLong(), Mockito.any(TimeUnit.class)); - } -}
