This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a4206813a06 Revert "Fix CQ recovery gap and stale callback
contamination (#17734)" (#17827)
a4206813a06 is described below
commit a4206813a0601c7249d2b50d7ecb246825ff6713
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 3 12:26:52 2026 +0800
Revert "Fix CQ recovery gap and stale callback contamination (#17734)"
(#17827)
This reverts commit 29d0d51dc74081c1b9295e29c18569762a72a579.
---
.../iotdb/confignode/i18n/ConfigNodeMessages.java | 4 +-
.../iotdb/confignode/i18n/ConfigNodeMessages.java | 4 +-
.../consensus/request/read/cq/ShowCQPlan.java | 13 ---
.../consensus/request/write/cq/ActiveCQPlan.java | 20 ++--
.../consensus/request/write/cq/AddCQPlan.java | 20 ++--
.../consensus/request/write/cq/DropCQPlan.java | 20 ++--
.../request/write/cq/UpdateCQLastExecTimePlan.java | 23 ++--
.../iotdb/confignode/manager/cq/CQManager.java | 102 +-----------------
.../confignode/manager/cq/CQScheduleTask.java | 58 ++--------
.../iotdb/confignode/persistence/cq/CQInfo.java | 76 ++++++-------
.../persistence/executor/ConfigPlanExecutor.java | 3 +-
.../procedure/impl/cq/CreateCQProcedure.java | 80 +++-----------
.../request/ConfigPhysicalPlanSerDeTest.java | 8 +-
.../apache/iotdb/confignode/cq/CQManagerTest.java | 107 -------------------
.../iotdb/confignode/persistence/CQInfoTest.java | 64 +----------
.../procedure/impl/CreateCQProcedureTest.java | 26 -----
.../impl/cq/CreateCQProcedureRecoveryTest.java | 117 ---------------------
17 files changed, 106 insertions(+), 639 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
index f662fa5871d..c1c3e877de7 100644
---
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
+++
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
@@ -130,8 +130,8 @@ public final class ConfigNodeMessages {
public static final String DOES_NOT_EXIST = "%s does not exist";
public static final String DROPPING_TAG_OR_TIME_COLUMN_IS_NOT_SUPPORTED =
"Dropping tag or time column is not supported.";
- public static final String DROP_CQ_FAILED_BECAUSE_ITS_TOKEN_DOESN_T_MATCH =
- "Drop CQ {} failed, because its token doesn't match.";
+ public static final String DROP_CQ_FAILED_BECAUSE_ITS_MD5_DOESN_T_MATCH =
+ "Drop CQ {} failed, because its MD5 doesn't match.";
public static final String DROP_CQ_FAILED_BECAUSE_IT_DOESN_T_EXIST =
"Drop CQ {} failed, because it doesn't exist.";
public static final String DROP_CQ_SUCCESSFULLY = "Drop CQ {} successfully.";
diff --git
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
index 8bffa1b0831..6ca847626c8 100644
---
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
+++
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ConfigNodeMessages.java
@@ -126,8 +126,8 @@ public final class ConfigNodeMessages {
"Deserialization error for write plan, request: {}, bytebuffer: {}";
public static final String DOES_NOT_EXIST = "%s does not exist";
public static final String DROPPING_TAG_OR_TIME_COLUMN_IS_NOT_SUPPORTED =
"不支持删除标签列或时间列。";
- public static final String DROP_CQ_FAILED_BECAUSE_ITS_TOKEN_DOESN_T_MATCH =
- "Drop CQ {} failed, because its token doesn't match.";
+ public static final String DROP_CQ_FAILED_BECAUSE_ITS_MD5_DOESN_T_MATCH =
+ "Drop CQ {} failed, because its MD5 doesn't match.";
public static final String DROP_CQ_FAILED_BECAUSE_IT_DOESN_T_EXIST =
"Drop CQ {} failed, because it doesn't exist.";
public static final String DROP_CQ_SUCCESSFULLY = "Drop CQ {} successfully.";
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java
index c28838d556b..5217849deb4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/cq/ShowCQPlan.java
@@ -21,24 +21,11 @@ package
org.apache.iotdb.confignode.consensus.request.read.cq;
import
org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
-import java.util.Optional;
-
import static
org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType.SHOW_CQ;
public class ShowCQPlan extends ConfigPhysicalReadPlan {
- private final String cqId;
-
public ShowCQPlan() {
- this(null);
- }
-
- public ShowCQPlan(String cqId) {
super(SHOW_CQ);
- this.cqId = cqId;
- }
-
- public Optional<String> getCqId() {
- return Optional.ofNullable(cqId);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
index 3faa1c2d62f..263aeb9f0d0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/ActiveCQPlan.java
@@ -35,39 +35,39 @@ public class ActiveCQPlan extends ConfigPhysicalPlan {
private String cqId;
- private String cqToken;
+ private String md5;
public ActiveCQPlan() {
super(ACTIVE_CQ);
}
- public ActiveCQPlan(String cqId, String cqToken) {
+ public ActiveCQPlan(String cqId, String md5) {
super(ACTIVE_CQ);
Validate.notNull(cqId);
- Validate.notNull(cqToken);
+ Validate.notNull(md5);
this.cqId = cqId;
- this.cqToken = cqToken;
+ this.md5 = md5;
}
public String getCqId() {
return cqId;
}
- public String getCqToken() {
- return cqToken;
+ public String getMd5() {
+ return md5;
}
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());
ReadWriteIOUtils.write(cqId, stream);
- ReadWriteIOUtils.write(cqToken, stream);
+ ReadWriteIOUtils.write(md5, stream);
}
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
cqId = ReadWriteIOUtils.readString(buffer);
- cqToken = ReadWriteIOUtils.readString(buffer);
+ md5 = ReadWriteIOUtils.readString(buffer);
}
@Override
@@ -82,11 +82,11 @@ public class ActiveCQPlan extends ConfigPhysicalPlan {
return false;
}
ActiveCQPlan that = (ActiveCQPlan) o;
- return cqId.equals(that.cqId) && cqToken.equals(that.cqToken);
+ return cqId.equals(that.cqId) && md5.equals(that.md5);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), cqId, cqToken);
+ return Objects.hash(super.hashCode(), cqId, md5);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
index 471516c38e5..62f994688b3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/AddCQPlan.java
@@ -37,7 +37,7 @@ public class AddCQPlan extends ConfigPhysicalPlan {
private TCreateCQReq req;
- private String cqToken;
+ private String md5;
private long firstExecutionTime;
@@ -45,12 +45,12 @@ public class AddCQPlan extends ConfigPhysicalPlan {
super(ADD_CQ);
}
- public AddCQPlan(TCreateCQReq req, String cqToken, long firstExecutionTime) {
+ public AddCQPlan(TCreateCQReq req, String md5, long firstExecutionTime) {
super(ADD_CQ);
Validate.notNull(req);
- Validate.notNull(cqToken);
+ Validate.notNull(md5);
this.req = req;
- this.cqToken = cqToken;
+ this.md5 = md5;
this.firstExecutionTime = firstExecutionTime;
}
@@ -58,8 +58,8 @@ public class AddCQPlan extends ConfigPhysicalPlan {
return req;
}
- public String getCqToken() {
- return cqToken;
+ public String getMd5() {
+ return md5;
}
public long getFirstExecutionTime() {
@@ -70,14 +70,14 @@ public class AddCQPlan extends ConfigPhysicalPlan {
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());
ThriftCommonsSerDeUtils.serializeTCreateCQReq(req, stream);
- ReadWriteIOUtils.write(cqToken, stream);
+ ReadWriteIOUtils.write(md5, stream);
ReadWriteIOUtils.write(firstExecutionTime, stream);
}
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
req = ThriftCommonsSerDeUtils.deserializeTCreateCQReq(buffer);
- cqToken = ReadWriteIOUtils.readString(buffer);
+ md5 = ReadWriteIOUtils.readString(buffer);
firstExecutionTime = ReadWriteIOUtils.readLong(buffer);
}
@@ -95,11 +95,11 @@ public class AddCQPlan extends ConfigPhysicalPlan {
AddCQPlan addCQPlan = (AddCQPlan) o;
return firstExecutionTime == addCQPlan.firstExecutionTime
&& Objects.equals(req, addCQPlan.req)
- && Objects.equals(cqToken, addCQPlan.cqToken);
+ && Objects.equals(md5, addCQPlan.md5);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), req, cqToken, firstExecutionTime);
+ return Objects.hash(super.hashCode(), req, md5, firstExecutionTime);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
index 69c29bff634..108241b233d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/DropCQPlan.java
@@ -37,7 +37,7 @@ public class DropCQPlan extends ConfigPhysicalPlan {
private String cqId;
// may be null in user call of drop CQ
- private String cqToken;
+ private String md5;
public DropCQPlan() {
super(DROP_CQ);
@@ -49,33 +49,33 @@ public class DropCQPlan extends ConfigPhysicalPlan {
this.cqId = cqId;
}
- public DropCQPlan(String cqId, String cqToken) {
+ public DropCQPlan(String cqId, String md5) {
super(DROP_CQ);
Validate.notNull(cqId);
- Validate.notNull(cqToken);
+ Validate.notNull(md5);
this.cqId = cqId;
- this.cqToken = cqToken;
+ this.md5 = md5;
}
public String getCqId() {
return cqId;
}
- public Optional<String> getCqToken() {
- return Optional.ofNullable(cqToken);
+ public Optional<String> getMd5() {
+ return Optional.ofNullable(md5);
}
@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());
ReadWriteIOUtils.write(cqId, stream);
- ReadWriteIOUtils.write(cqToken, stream);
+ ReadWriteIOUtils.write(md5, stream);
}
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
cqId = ReadWriteIOUtils.readString(buffer);
- cqToken = ReadWriteIOUtils.readString(buffer);
+ md5 = ReadWriteIOUtils.readString(buffer);
}
@Override
@@ -90,11 +90,11 @@ public class DropCQPlan extends ConfigPhysicalPlan {
return false;
}
DropCQPlan that = (DropCQPlan) o;
- return cqId.equals(that.cqId) && Objects.equals(cqToken, that.cqToken);
+ return cqId.equals(that.cqId) && Objects.equals(md5, that.md5);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), cqId, cqToken);
+ return Objects.hash(super.hashCode(), cqId, md5);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
index a487ae648e3..861a7d4f51b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/cq/UpdateCQLastExecTimePlan.java
@@ -37,19 +37,20 @@ public class UpdateCQLastExecTimePlan extends
ConfigPhysicalPlan {
private long executionTime;
- private String cqToken;
+ // may be null in user call of drop CQ
+ private String md5;
public UpdateCQLastExecTimePlan() {
super(UPDATE_CQ_LAST_EXEC_TIME);
}
- public UpdateCQLastExecTimePlan(String cqId, long executionTime, String
cqToken) {
+ public UpdateCQLastExecTimePlan(String cqId, long executionTime, String md5)
{
super(UPDATE_CQ_LAST_EXEC_TIME);
Validate.notNull(cqId);
- Validate.notNull(cqToken);
+ Validate.notNull(md5);
this.cqId = cqId;
this.executionTime = executionTime;
- this.cqToken = cqToken;
+ this.md5 = md5;
}
public String getCqId() {
@@ -60,8 +61,8 @@ public class UpdateCQLastExecTimePlan extends
ConfigPhysicalPlan {
return executionTime;
}
- public String getCqToken() {
- return cqToken;
+ public String getMd5() {
+ return md5;
}
@Override
@@ -69,14 +70,14 @@ public class UpdateCQLastExecTimePlan extends
ConfigPhysicalPlan {
stream.writeShort(getType().getPlanType());
ReadWriteIOUtils.write(cqId, stream);
ReadWriteIOUtils.write(executionTime, stream);
- ReadWriteIOUtils.write(cqToken, stream);
+ ReadWriteIOUtils.write(md5, stream);
}
@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
cqId = ReadWriteIOUtils.readString(buffer);
executionTime = ReadWriteIOUtils.readLong(buffer);
- cqToken = ReadWriteIOUtils.readString(buffer);
+ md5 = ReadWriteIOUtils.readString(buffer);
}
@Override
@@ -91,13 +92,11 @@ public class UpdateCQLastExecTimePlan extends
ConfigPhysicalPlan {
return false;
}
UpdateCQLastExecTimePlan that = (UpdateCQLastExecTimePlan) o;
- return executionTime == that.executionTime
- && cqId.equals(that.cqId)
- && cqToken.equals(that.cqToken);
+ return executionTime == that.executionTime && cqId.equals(that.cqId) &&
md5.equals(that.md5);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), cqId, executionTime, cqToken);
+ return Objects.hash(super.hashCode(), cqId, executionTime, md5);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
index 29837bc8aa2..c4c1e8aede9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQManager.java
@@ -43,10 +43,7 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -60,15 +57,11 @@ public class CQManager {
private final ReadWriteLock lock;
- // Key: CQ id. Value: the local task and the metadata token it owns.
- private final ConcurrentMap<String, LocallyScheduledCQ> locallyScheduledCQs;
-
private ScheduledExecutorService executor;
public CQManager(ConfigManager configManager) {
this.configManager = configManager;
this.lock = new ReentrantReadWriteLock();
- this.locallyScheduledCQs = new ConcurrentHashMap<>();
this.executor =
IoTDBThreadPoolFactory.newScheduledThreadPool(
CONF.getCqSubmitThread(), ThreadName.CQ_SCHEDULER.getName());
@@ -85,21 +78,14 @@ public class CQManager {
}
public TSStatus dropCQ(TDropCQReq req) {
- lock.readLock().lock();
try {
- TSStatus status = configManager.getConsensusManager().write(new
DropCQPlan(req.cqId));
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- cancelLocallyScheduledCQ(req.cqId);
- }
- return status;
+ return configManager.getConsensusManager().write(new
DropCQPlan(req.cqId));
} catch (ConsensusException e) {
LOGGER.warn(ManagerMessages.UNEXPECTED_ERROR_HAPPENED_WHILE_DROPPING_CQ,
req.cqId, e);
// consensus layer related errors
TSStatus res = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
return res;
- } finally {
- lock.readLock().unlock();
}
}
@@ -132,7 +118,6 @@ public class CQManager {
try {
// 1. shutdown previous cq schedule thread pool
try {
- cancelAllLocallyScheduledCQs();
if (executor != null) {
executor.shutdown();
}
@@ -171,15 +156,7 @@ public class CQManager {
for (CQInfo.CQEntry entry : allCQs) {
if (entry.getState() == CQState.ACTIVE) {
CQScheduleTask cqScheduleTask = new CQScheduleTask(entry,
executor, configManager);
- if (!markCQLocallyScheduled(entry.getCqId(), entry.getCqToken(),
cqScheduleTask)) {
- continue;
- }
- try {
- cqScheduleTask.submitSelf();
- } catch (RuntimeException e) {
- unmarkCQLocallyScheduled(entry.getCqId(), entry.getCqToken());
- throw e;
- }
+ cqScheduleTask.submitSelf();
}
}
}
@@ -199,7 +176,6 @@ public class CQManager {
try {
previous = executor;
executor = null;
- cancelAllLocallyScheduledCQs();
} finally {
lock.writeLock().unlock();
}
@@ -207,78 +183,4 @@ public class CQManager {
previous.shutdown();
}
}
-
- public boolean markCQLocallyScheduled(String cqId, String cqToken,
CQScheduleTask task) {
- AtomicBoolean shouldSchedule = new AtomicBoolean(false);
- LocallyScheduledCQ schedule = new LocallyScheduledCQ(cqToken, task);
- lock.readLock().lock();
- try {
- locallyScheduledCQs.compute(
- cqId,
- (ignored, previousSchedule) -> {
- if (previousSchedule != null &&
previousSchedule.hasToken(cqToken)) {
- return previousSchedule;
- }
- if (previousSchedule != null) {
- previousSchedule.cancel();
- }
- shouldSchedule.set(true);
- return schedule;
- });
- if (!shouldSchedule.get()) {
- task.cancel();
- }
- return shouldSchedule.get();
- } finally {
- lock.readLock().unlock();
- }
- }
-
- public void unmarkCQLocallyScheduled(String cqId, String cqToken) {
- lock.readLock().lock();
- try {
- locallyScheduledCQs.computeIfPresent(
- cqId,
- (ignored, schedule) -> {
- if (schedule.hasToken(cqToken)) {
- schedule.cancel();
- return null;
- }
- return schedule;
- });
- } finally {
- lock.readLock().unlock();
- }
- }
-
- private void cancelLocallyScheduledCQ(String cqId) {
- LocallyScheduledCQ schedule = locallyScheduledCQs.remove(cqId);
- if (schedule != null) {
- schedule.cancel();
- }
- }
-
- private void cancelAllLocallyScheduledCQs() {
- locallyScheduledCQs.values().forEach(LocallyScheduledCQ::cancel);
- locallyScheduledCQs.clear();
- }
-
- private static class LocallyScheduledCQ {
-
- private final String cqToken;
- private final CQScheduleTask task;
-
- private LocallyScheduledCQ(String cqToken, CQScheduleTask task) {
- this.cqToken = cqToken;
- this.task = task;
- }
-
- private boolean hasToken(String cqToken) {
- return this.cqToken.equals(cqToken);
- }
-
- private void cancel() {
- task.cancel();
- }
- }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
index 6b73ffca95f..c58f5ade9bc 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/cq/CQScheduleTask.java
@@ -40,10 +40,7 @@ import org.slf4j.LoggerFactory;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
public class CQScheduleTask implements Runnable {
@@ -73,7 +70,7 @@ public class CQScheduleTask implements Runnable {
private final long endTimeOffset;
private final TimeoutPolicy timeoutPolicy;
private final String queryBody;
- private final String cqToken;
+ private final String md5;
private final String zoneId;
@@ -85,15 +82,12 @@ public class CQScheduleTask implements Runnable {
private final long retryWaitTimeInMS;
- private final AtomicBoolean cancelled;
- private final AtomicReference<ScheduledFuture<?>> scheduledFuture;
-
private long executionTime;
public CQScheduleTask(
TCreateCQReq req,
long firstExecutionTime,
- String cqToken,
+ String md5,
ScheduledExecutorService executor,
ConfigManager configManager) {
this(
@@ -103,7 +97,7 @@ public class CQScheduleTask implements Runnable {
req.endTimeOffset,
TimeoutPolicy.deserialize(req.timeoutPolicy),
req.queryBody,
- cqToken,
+ md5,
req.zoneId,
req.username,
executor,
@@ -120,7 +114,7 @@ public class CQScheduleTask implements Runnable {
entry.getEndTimeOffset(),
entry.getTimeoutPolicy(),
entry.getQueryBody(),
- entry.getCqToken(),
+ entry.getMd5(),
entry.getZoneId(),
entry.getUsername(),
executor,
@@ -136,7 +130,7 @@ public class CQScheduleTask implements Runnable {
long endTimeOffset,
TimeoutPolicy timeoutPolicy,
String queryBody,
- String cqToken,
+ String md5,
String zoneId,
String username,
ScheduledExecutorService executor,
@@ -148,14 +142,12 @@ public class CQScheduleTask implements Runnable {
this.endTimeOffset = endTimeOffset;
this.timeoutPolicy = timeoutPolicy;
this.queryBody = queryBody;
- this.cqToken = cqToken;
+ this.md5 = md5;
this.zoneId = zoneId;
this.username = username;
this.executor = executor;
this.configManager = configManager;
this.retryWaitTimeInMS = Math.min(DEFAULT_RETRY_WAIT_TIME_IN_MS,
everyInterval / FACTOR);
- this.cancelled = new AtomicBoolean(false);
- this.scheduledFuture = new AtomicReference<>();
this.executionTime = executionTime;
}
@@ -174,9 +166,6 @@ public class CQScheduleTask implements Runnable {
@Override
public void run() {
- if (cancelled.get()) {
- return;
- }
long startTime = executionTime - startTimeOffset;
long endTime = executionTime - endTimeOffset;
@@ -189,9 +178,6 @@ public class CQScheduleTask implements Runnable {
submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS);
}
} else {
- if (cancelled.get()) {
- return;
- }
LOGGER.info(
ManagerMessages.STARTEXECUTECQ_EXECUTE_CQ_ON_DATANODE_TIME_RANGE_IS_CURRENT_TIME,
cqId,
@@ -221,32 +207,12 @@ public class CQScheduleTask implements Runnable {
}
private void submitSelf(long delay, TimeUnit unit) {
- if (cancelled.get()) {
- return;
- }
- ScheduledFuture<?> newFuture = executor.schedule(this, delay, unit);
- ScheduledFuture<?> previousFuture = scheduledFuture.getAndSet(newFuture);
- if (previousFuture != null) {
- previousFuture.cancel(false);
- }
- if (cancelled.get() && scheduledFuture.compareAndSet(newFuture, null)) {
- newFuture.cancel(false);
- }
- }
-
- public void cancel() {
- cancelled.set(true);
- ScheduledFuture<?> currentFuture = scheduledFuture.getAndSet(null);
- if (currentFuture != null) {
- currentFuture.cancel(false);
- }
+ executor.schedule(this, delay, unit);
}
private boolean needSubmit() {
// current node is still leader and thread pool is not shut down.
- return !cancelled.get()
- && configManager.getConsensusManager().isLeader()
- && !executor.isShutdown();
+ return configManager.getConsensusManager().isLeader() &&
!executor.isShutdown();
}
private class AsyncExecuteCQCallback implements
AsyncMethodCallback<TSStatus> {
@@ -273,9 +239,6 @@ public class CQScheduleTask implements Runnable {
@Override
public void onComplete(TSStatus response) {
- if (cancelled.get()) {
- return;
- }
if (response.code == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.info(
@@ -289,7 +252,7 @@ public class CQScheduleTask implements Runnable {
result =
configManager
.getConsensusManager()
- .write(new UpdateCQLastExecTimePlan(cqId, executionTime,
cqToken));
+ .write(new UpdateCQLastExecTimePlan(cqId, executionTime,
md5));
} catch (ConsensusException e) {
result = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
result.setMessage(e.getMessage());
@@ -328,9 +291,6 @@ public class CQScheduleTask implements Runnable {
@Override
public void onError(Exception exception) {
- if (cancelled.get()) {
- return;
- }
LOGGER.warn(ManagerMessages.EXECUTE_CQ_FAILED, cqId, exception);
if (needSubmit()) {
submitSelf(retryWaitTimeInMS, TimeUnit.MILLISECONDS);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
index 013e2415f94..9c99cfbb0e8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/cq/CQInfo.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cq.CQState;
import org.apache.iotdb.commons.cq.TimeoutPolicy;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
-import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
@@ -46,9 +45,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -65,7 +62,7 @@ public class CQInfo implements SnapshotProcessor {
private static final String CQ_NOT_EXIST_FORMAT = "CQ %s doesn't exist.";
- private static final String CQ_TOKEN_NOT_MATCH_FORMAT = "Token of CQ %s
doesn't match";
+ private static final String MD5_NOT_MATCH_FORMAT = "MD5 of CQ %s doesn't
match";
private final Map<String, CQEntry> cqMap;
@@ -95,7 +92,7 @@ public class CQInfo implements SnapshotProcessor {
CQEntry cqEntry =
new CQEntry(
plan.getReq(),
- plan.getCqToken(),
+ plan.getMd5(),
plan.getFirstExecutionTime() - plan.getReq().everyInterval);
cqMap.put(cqId, cqEntry);
res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode();
@@ -109,13 +106,13 @@ public class CQInfo implements SnapshotProcessor {
/**
* Drop the CQ whose ID is same as <tt>cqId</tt> in plan.
*
- * @return SUCCESS_STATUS if there is CQ whose ID and token is same as
<tt>cqId</tt> in plan,
+ * @return SUCCESS_STATUS if there is CQ whose ID and md5 is same as
<tt>cqId</tt> in plan,
* otherwise NO_SUCH_CQ.
*/
public TSStatus dropCQ(DropCQPlan plan) {
TSStatus res = new TSStatus();
String cqId = plan.getCqId();
- Optional<String> cqToken = plan.getCqToken();
+ Optional<String> md5 = plan.getMd5();
lock.writeLock().lock();
try {
CQEntry cqEntry = cqMap.get(cqId);
@@ -123,10 +120,10 @@ public class CQInfo implements SnapshotProcessor {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId);
LOGGER.warn(ConfigNodeMessages.DROP_CQ_FAILED_BECAUSE_IT_DOESN_T_EXIST, cqId);
- } else if ((cqToken.isPresent() &&
!cqToken.get().equals(cqEntry.cqToken))) {
+ } else if ((md5.isPresent() && !md5.get().equals(cqEntry.md5))) {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
- res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId);
-
LOGGER.warn(ConfigNodeMessages.DROP_CQ_FAILED_BECAUSE_ITS_TOKEN_DOESN_T_MATCH,
cqId);
+ res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId);
+
LOGGER.warn(ConfigNodeMessages.DROP_CQ_FAILED_BECAUSE_ITS_MD5_DOESN_T_MATCH,
cqId);
} else {
cqMap.remove(cqId);
res.code = TSStatusCode.SUCCESS_STATUS.getStatusCode();
@@ -139,24 +136,11 @@ public class CQInfo implements SnapshotProcessor {
}
public ShowCQResp showCQ() {
- return showCQ(new ShowCQPlan());
- }
-
- public ShowCQResp showCQ(ShowCQPlan plan) {
lock.readLock().lock();
try {
- Optional<String> cqId = plan.getCqId();
- List<CQEntry> cqList;
- if (cqId.isPresent()) {
- CQEntry cqEntry = cqMap.get(cqId.get());
- cqList =
- cqEntry == null
- ? Collections.emptyList()
- : Collections.singletonList(new CQEntry(cqEntry));
- } else {
- cqList =
cqMap.values().stream().map(CQEntry::new).collect(Collectors.toList());
- }
- return new ShowCQResp(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), cqList);
+ return new ShowCQResp(
+ new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+
cqMap.values().stream().map(CQEntry::new).collect(Collectors.toList()));
} finally {
lock.readLock().unlock();
}
@@ -170,16 +154,16 @@ public class CQInfo implements SnapshotProcessor {
public TSStatus activeCQ(ActiveCQPlan plan) {
TSStatus res = new TSStatus();
String cqId = plan.getCqId();
- String cqToken = plan.getCqToken();
+ String md5 = plan.getMd5();
lock.writeLock().lock();
try {
CQEntry cqEntry = cqMap.get(cqId);
if (cqEntry == null) {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId);
- } else if (!cqToken.equals(cqEntry.cqToken)) {
+ } else if (!md5.equals(cqEntry.md5)) {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
- res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId);
+ res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId);
} else if (cqEntry.state == CQState.ACTIVE) {
res.code = TSStatusCode.CQ_ALREADY_ACTIVE.getStatusCode();
res.message = String.format("CQ %s has already been active", cqId);
@@ -197,22 +181,22 @@ public class CQInfo implements SnapshotProcessor {
* Update the last execution time of the corresponding CQ.
*
* @return SUCCESS_STATUS if successfully updated, or NO_SUCH_CQ if 1. the
CQ doesn't exist; or 2.
- * token is different. or CQ_UPDATE_LAST_EXEC_TIME_FAILED 3. original
lastExecutionTime >=
+ * md5 is different. or CQ_UPDATE_LAST_EXEC_TIME_FAILED 3. original
lastExecutionTime >=
* current lastExecutionTime;
*/
public TSStatus updateCQLastExecutionTime(UpdateCQLastExecTimePlan plan) {
TSStatus res = new TSStatus();
String cqId = plan.getCqId();
- String cqToken = plan.getCqToken();
+ String md5 = plan.getMd5();
lock.writeLock().lock();
try {
CQEntry cqEntry = cqMap.get(cqId);
if (cqEntry == null) {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
res.message = String.format(CQ_NOT_EXIST_FORMAT, cqId);
- } else if (!cqToken.equals(cqEntry.cqToken)) {
+ } else if (!md5.equals(cqEntry.md5)) {
res.code = TSStatusCode.NO_SUCH_CQ.getStatusCode();
- res.message = String.format(CQ_TOKEN_NOT_MATCH_FORMAT, cqId);
+ res.message = String.format(MD5_NOT_MATCH_FORMAT, cqId);
} else if (cqEntry.lastExecutionTime >= plan.getExecutionTime()) {
res.code = TSStatusCode.CQ_UPDATE_LAST_EXEC_TIME_ERROR.getStatusCode();
res.message =
@@ -316,7 +300,7 @@ public class CQInfo implements SnapshotProcessor {
private final TimeoutPolicy timeoutPolicy;
private final String queryBody;
private final String sql;
- private final String cqToken;
+ private final String md5;
private final String zoneId;
@@ -325,7 +309,7 @@ public class CQInfo implements SnapshotProcessor {
private CQState state;
private long lastExecutionTime;
- private CQEntry(TCreateCQReq req, String cqToken, long lastExecutionTime) {
+ private CQEntry(TCreateCQReq req, String md5, long lastExecutionTime) {
this(
req.cqId,
req.everyInterval,
@@ -335,7 +319,7 @@ public class CQInfo implements SnapshotProcessor {
TimeoutPolicy.deserialize(req.timeoutPolicy),
req.queryBody,
req.sql,
- cqToken,
+ md5,
req.zoneId,
req.username,
CQState.INACTIVE,
@@ -352,7 +336,7 @@ public class CQInfo implements SnapshotProcessor {
other.timeoutPolicy,
other.queryBody,
other.sql,
- other.cqToken,
+ other.md5,
other.zoneId,
other.username,
other.state,
@@ -369,7 +353,7 @@ public class CQInfo implements SnapshotProcessor {
TimeoutPolicy timeoutPolicy,
String queryBody,
String sql,
- String cqToken,
+ String md5,
String zoneId,
String username,
CQState state,
@@ -382,7 +366,7 @@ public class CQInfo implements SnapshotProcessor {
this.timeoutPolicy = timeoutPolicy;
this.queryBody = queryBody;
this.sql = sql;
- this.cqToken = cqToken;
+ this.md5 = md5;
this.zoneId = zoneId;
this.username = username;
this.state = state;
@@ -398,7 +382,7 @@ public class CQInfo implements SnapshotProcessor {
ReadWriteIOUtils.write(timeoutPolicy.getType(), stream);
ReadWriteIOUtils.write(queryBody, stream);
ReadWriteIOUtils.write(sql, stream);
- ReadWriteIOUtils.write(cqToken, stream);
+ ReadWriteIOUtils.write(md5, stream);
ReadWriteIOUtils.write(zoneId, stream);
ReadWriteIOUtils.write(username, stream);
ReadWriteIOUtils.write(state.getType(), stream);
@@ -414,7 +398,7 @@ public class CQInfo implements SnapshotProcessor {
TimeoutPolicy timeoutPolicy =
TimeoutPolicy.deserialize(ReadWriteIOUtils.readByte(stream));
String queryBody = ReadWriteIOUtils.readString(stream);
String sql = ReadWriteIOUtils.readString(stream);
- String cqToken = ReadWriteIOUtils.readString(stream);
+ String md5 = ReadWriteIOUtils.readString(stream);
String zoneId = ReadWriteIOUtils.readString(stream);
String username = ReadWriteIOUtils.readString(stream);
CQState state = CQState.deserialize(ReadWriteIOUtils.readByte(stream));
@@ -428,7 +412,7 @@ public class CQInfo implements SnapshotProcessor {
timeoutPolicy,
queryBody,
sql,
- cqToken,
+ md5,
zoneId,
username,
state,
@@ -467,8 +451,8 @@ public class CQInfo implements SnapshotProcessor {
return sql;
}
- public String getCqToken() {
- return cqToken;
+ public String getMd5() {
+ return md5;
}
public CQState getState() {
@@ -505,7 +489,7 @@ public class CQInfo implements SnapshotProcessor {
&& timeoutPolicy == cqEntry.timeoutPolicy
&& Objects.equals(queryBody, cqEntry.queryBody)
&& Objects.equals(sql, cqEntry.sql)
- && Objects.equals(cqToken, cqEntry.cqToken)
+ && Objects.equals(md5, cqEntry.md5)
&& Objects.equals(zoneId, cqEntry.zoneId)
&& Objects.equals(username, cqEntry.username)
&& state == cqEntry.state;
@@ -522,7 +506,7 @@ public class CQInfo implements SnapshotProcessor {
timeoutPolicy,
queryBody,
sql,
- cqToken,
+ md5,
zoneId,
username,
state,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 96dcdea4648..eb8d5e5538b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import
org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
import
org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan;
-import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
import
org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
import
org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
import
org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
@@ -362,7 +361,7 @@ public class ConfigPlanExecutor {
case GetRegionGroupsByTime:
return partitionInfo.getRegionGroupsByTime((GetRegionGroupsByTimePlan)
req);
case SHOW_CQ:
- return cqInfo.showCQ((ShowCQPlan) req);
+ return cqInfo.showCQ();
case ShowExternalService:
return externalServiceInfo.showService(((ShowExternalServicePlan)
req).getDataNodeIds());
case GetFunctionTable:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
index 490f723d2e6..ac964d23ca3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedure.java
@@ -22,15 +22,11 @@ package org.apache.iotdb.confignode.procedure.impl.cq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
-import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
-import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp;
import org.apache.iotdb.confignode.i18n.ProcedureMessages;
-import org.apache.iotdb.confignode.manager.cq.CQManager;
import org.apache.iotdb.confignode.manager.cq.CQScheduleTask;
-import org.apache.iotdb.confignode.persistence.cq.CQInfo;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.impl.node.AbstractNodeProcedure;
@@ -40,6 +36,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.tsfile.external.commons.codec.digest.DigestUtils;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,8 +45,6 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
-import java.util.Optional;
-import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import static
org.apache.iotdb.confignode.procedure.state.cq.CreateCQState.INACTIVE;
@@ -65,7 +60,7 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
private TCreateCQReq req;
- private String cqToken;
+ private String md5;
private long firstExecutionTime;
@@ -80,7 +75,7 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
public CreateCQProcedure(TCreateCQReq req, ScheduledExecutorService
executor) {
super();
this.req = req;
- this.cqToken = generateCQToken();
+ this.md5 = DigestUtils.md2Hex(req.cqId);
this.executor = executor;
this.firstExecutionTime =
CQScheduleTask.getFirstExecutionTime(req.boundaryTime,
req.everyInterval);
@@ -96,16 +91,12 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
addCQ(env);
return Flow.HAS_MORE_STATE;
case INACTIVE:
- submitScheduleTask(
- env,
- new CQScheduleTask(
- req, firstExecutionTime, cqToken, executor,
env.getConfigManager()));
+ CQScheduleTask cqScheduleTask =
+ new CQScheduleTask(req, firstExecutionTime, md5, executor,
env.getConfigManager());
+ cqScheduleTask.submitSelf();
setNextState(SCHEDULED);
break;
case SCHEDULED:
- if (isStateDeserialized()) {
- recoverScheduledTask(env);
- }
activeCQ(env);
return Flow.NO_MORE_STATE;
default:
@@ -135,7 +126,7 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
res =
env.getConfigManager()
.getConsensusManager()
- .write(new AddCQPlan(req, cqToken, firstExecutionTime));
+ .write(new AddCQPlan(req, md5, firstExecutionTime));
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
@@ -156,7 +147,7 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
private void activeCQ(ConfigNodeProcedureEnv env) {
TSStatus res;
try {
- res = env.getConfigManager().getConsensusManager().write(new
ActiveCQPlan(req.cqId, cqToken));
+ res = env.getConfigManager().getConsensusManager().write(new
ActiveCQPlan(req.cqId, md5));
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
@@ -177,42 +168,6 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
}
}
- void recoverScheduledTask(ConfigNodeProcedureEnv env) throws
ConsensusException {
- Optional<CQInfo.CQEntry> cqEntry = getCurrentCQEntry(env);
- if (!cqEntry.isPresent()) {
- LOGGER.info(
- "Skip recovering the schedule task of CQ {} because its metadata is
unavailable.",
- req.cqId);
- return;
- }
- submitScheduleTask(env, new CQScheduleTask(cqEntry.get(), executor,
env.getConfigManager()));
- }
-
- Optional<CQInfo.CQEntry> getCurrentCQEntry(ConfigNodeProcedureEnv env)
throws ConsensusException {
- ShowCQResp response =
- (ShowCQResp) env.getConfigManager().getConsensusManager().read(new
ShowCQPlan(req.cqId));
- return response.getCqList().stream()
- .filter(entry -> cqToken.equals(entry.getCqToken()))
- .findFirst();
- }
-
- private static String generateCQToken() {
- return UUID.randomUUID().toString();
- }
-
- private void submitScheduleTask(ConfigNodeProcedureEnv env, CQScheduleTask
cqScheduleTask) {
- CQManager cqManager = env.getConfigManager().getCQManager();
- if (!cqManager.markCQLocallyScheduled(req.cqId, cqToken, cqScheduleTask)) {
- return;
- }
- try {
- cqScheduleTask.submitSelf();
- } catch (RuntimeException e) {
- cqManager.unmarkCQLocallyScheduled(req.cqId, cqToken);
- throw e;
- }
- }
-
@Override
protected void rollbackState(ConfigNodeProcedureEnv env, CreateCQState state)
throws IOException, InterruptedException, ProcedureException {
@@ -225,8 +180,7 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
LOGGER.info(ProcedureMessages.START_INACTIVE_ROLLBACK_OF_CQ, req.cqId);
TSStatus res;
try {
- res =
- env.getConfigManager().getConsensusManager().write(new
DropCQPlan(req.cqId, cqToken));
+ res = env.getConfigManager().getConsensusManager().write(new
DropCQPlan(req.cqId, md5));
} catch (ConsensusException e) {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
res = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
@@ -277,7 +231,7 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
stream.writeShort(ProcedureType.CREATE_CQ_PROCEDURE.getTypeCode());
super.serialize(stream);
ThriftCommonsSerDeUtils.serializeTCreateCQReq(req, stream);
- ReadWriteIOUtils.write(cqToken, stream);
+ ReadWriteIOUtils.write(md5, stream);
ReadWriteIOUtils.write(firstExecutionTime, stream);
}
@@ -285,7 +239,7 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
this.req = ThriftCommonsSerDeUtils.deserializeTCreateCQReq(byteBuffer);
- this.cqToken = ReadWriteIOUtils.readString(byteBuffer);
+ this.md5 = ReadWriteIOUtils.readString(byteBuffer);
this.firstExecutionTime = ReadWriteIOUtils.readLong(byteBuffer);
}
@@ -304,7 +258,7 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
&& isGeneratedByPipe == that.isGeneratedByPipe
&& firstExecutionTime == that.firstExecutionTime
&& Objects.equals(req, that.req)
- && Objects.equals(cqToken, that.cqToken);
+ && Objects.equals(md5, that.md5);
}
@Override
@@ -315,15 +269,7 @@ public class CreateCQProcedure extends
AbstractNodeProcedure<CreateCQState> {
getCycles(),
isGeneratedByPipe,
req,
- cqToken,
+ md5,
firstExecutionTime);
}
-
- public String getCqId() {
- return req == null ? null : req.getCqId();
- }
-
- public String getCqToken() {
- return cqToken;
- }
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index ea35be6c5d7..d6a3d7e3fd7 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -1674,7 +1674,7 @@ public class ConfigPhysicalPlanSerDeTest {
@Test
public void ActiveCQPlanTest() throws IOException {
- ActiveCQPlan activeCQPlan0 = new ActiveCQPlan("testCq", "testCqToken");
+ ActiveCQPlan activeCQPlan0 = new ActiveCQPlan("testCq", "testCq_md5");
ActiveCQPlan activeCQPlan1 =
(ActiveCQPlan)
ConfigPhysicalPlan.Factory.create(activeCQPlan0.serializeToByteBuffer());
@@ -1697,7 +1697,7 @@ public class ConfigPhysicalPlanSerDeTest {
"create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from
root.sg.d1 END",
"Asia",
"root"),
- "testCq1Token",
+ "testCq1_md5",
executionTime);
AddCQPlan addCQPlan1 =
(AddCQPlan)
ConfigPhysicalPlan.Factory.create(addCQPlan0.serializeToByteBuffer());
@@ -1712,7 +1712,7 @@ public class ConfigPhysicalPlanSerDeTest {
(DropCQPlan)
ConfigPhysicalPlan.Factory.create(dropCQPlan0.serializeToByteBuffer());
Assert.assertEquals(dropCQPlan0, dropCQPlan1);
- dropCQPlan0 = new DropCQPlan("testCq1", "testCq1Token");
+ dropCQPlan0 = new DropCQPlan("testCq1", "testCq1_md5");
dropCQPlan1 =
(DropCQPlan)
ConfigPhysicalPlan.Factory.create(dropCQPlan0.serializeToByteBuffer());
Assert.assertEquals(dropCQPlan0, dropCQPlan1);
@@ -1721,7 +1721,7 @@ public class ConfigPhysicalPlanSerDeTest {
@Test
public void UpdateCQLastExecTimePlanTest() throws IOException {
UpdateCQLastExecTimePlan updateCQLastExecTimePlan0 =
- new UpdateCQLastExecTimePlan("testCq", System.currentTimeMillis(),
"testCqToken");
+ new UpdateCQLastExecTimePlan("testCq", System.currentTimeMillis(),
"testCq_md5");
UpdateCQLastExecTimePlan updateCQLastExecTimePlan1 =
(UpdateCQLastExecTimePlan)
ConfigPhysicalPlan.Factory.create(updateCQLastExecTimePlan0.serializeToByteBuffer());
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java
deleted file mode 100644
index a0bc5a523ba..00000000000
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/cq/CQManagerTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.confignode.cq;
-
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.cq.TimeoutPolicy;
-import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
-import org.apache.iotdb.confignode.manager.cq.CQManager;
-import org.apache.iotdb.confignode.manager.cq.CQScheduleTask;
-import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertTrue;
-
-public class CQManagerTest {
-
- @SuppressWarnings("unchecked")
- @Test
- public void dropCQShouldCancelLocallyScheduledTask() throws Exception {
- ConfigManager configManager = Mockito.mock(ConfigManager.class);
- ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);
-
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
- Mockito.when(consensusManager.write(Mockito.any()))
- .thenReturn(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
- CQManager cqManager = new CQManager(configManager);
- ScheduledFuture<?> future = Mockito.mock(ScheduledFuture.class);
- CQScheduleTask task = newScheduledTask(configManager, future, "token");
-
- try {
- assertTrue(cqManager.markCQLocallyScheduled("testCq", "token", task));
- task.submitSelf();
- cqManager.dropCQ(new TDropCQReq("testCq"));
-
- Mockito.verify(future).cancel(false);
- } finally {
- cqManager.stopCQScheduler();
- }
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void newTokenShouldCancelPreviousLocallyScheduledTask() {
- ConfigManager configManager = Mockito.mock(ConfigManager.class);
- CQManager cqManager = new CQManager(configManager);
- ScheduledFuture<?> previousFuture = Mockito.mock(ScheduledFuture.class);
- CQScheduleTask previousTask = newScheduledTask(configManager,
previousFuture, "previousToken");
- ScheduledFuture<?> currentFuture = Mockito.mock(ScheduledFuture.class);
- CQScheduleTask currentTask = newScheduledTask(configManager,
currentFuture, "currentToken");
-
- try {
- assertTrue(cqManager.markCQLocallyScheduled("testCq", "previousToken",
previousTask));
- previousTask.submitSelf();
- assertTrue(cqManager.markCQLocallyScheduled("testCq", "currentToken",
currentTask));
-
- Mockito.verify(previousFuture).cancel(false);
- } finally {
- cqManager.stopCQScheduler();
- }
- }
-
- @SuppressWarnings("unchecked")
- private CQScheduleTask newScheduledTask(
- ConfigManager configManager, ScheduledFuture<?> scheduledFuture, String
cqToken) {
- ScheduledExecutorService executor =
Mockito.mock(ScheduledExecutorService.class);
- Mockito.when(
- executor.schedule(
- Mockito.any(Runnable.class), Mockito.anyLong(),
Mockito.any(TimeUnit.class)))
- .thenReturn((ScheduledFuture) scheduledFuture);
- return new CQScheduleTask(
- "testCq",
- 1000,
- 0,
- 1000,
- TimeoutPolicy.BLOCKED,
- "select s1 into root.backup.d1.s1 from root.sg.d1",
- cqToken,
- "Asia",
- "root",
- executor,
- configManager,
- System.currentTimeMillis() + 10_000);
- }
-}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
index 64bbd69c5b6..4b409d6cf0c 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/CQInfoTest.java
@@ -18,14 +18,9 @@
*/
package org.apache.iotdb.confignode.persistence;
-import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
-import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan;
-import
org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan;
-import org.apache.iotdb.confignode.consensus.response.cq.ShowCQResp;
import org.apache.iotdb.confignode.persistence.cq.CQInfo;
import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.apache.tsfile.external.commons.io.FileUtils;
@@ -75,7 +70,7 @@ public class CQInfoTest {
"create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from
root.sg.d1 END",
"Asia",
"root"),
- "testCq1Token",
+ "testCq1_md5",
executionTime);
cqInfo.addCQ(addCQPlan);
@@ -94,7 +89,7 @@ public class CQInfoTest {
"create cq testCq2 BEGIN select s1 into root.backup.d2.s1 from
root.sg.d2 END",
"Asia",
"root"),
- "testCq2Token",
+ "testCq2_md5",
executionTime);
cqInfo.addCQ(addCQPlan);
@@ -104,59 +99,4 @@ public class CQInfoTest {
Assert.assertEquals(cqInfo, actualCQInfo);
}
-
- @Test
- public void testOldCallbackCannotTouchRecreatedCQ() throws Exception {
- long executionTime = System.currentTimeMillis();
- TCreateCQReq req =
- new TCreateCQReq(
- "testCq3",
- 1000,
- 0,
- 1000,
- 0,
- (byte) 0,
- "select s1 into root.backup.d3.s1 from root.sg.d3",
- "create cq testCq3 BEGIN select s1 into root.backup.d3.s1 from
root.sg.d3 END",
- "Asia",
- "root");
-
- cqInfo.addCQ(new AddCQPlan(req, "oldToken", executionTime));
- cqInfo.dropCQ(new DropCQPlan("testCq3"));
- cqInfo.addCQ(new AddCQPlan(req, "newToken", executionTime));
-
- Assert.assertEquals(
- TSStatusCode.NO_SUCH_CQ.getStatusCode(),
- cqInfo.updateCQLastExecutionTime(
- new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000,
"oldToken"))
- .code);
- Assert.assertEquals(
- TSStatusCode.SUCCESS_STATUS.getStatusCode(),
- cqInfo.updateCQLastExecutionTime(
- new UpdateCQLastExecTimePlan("testCq3", executionTime + 1000,
"newToken"))
- .code);
- }
-
- @Test
- public void testShowCQCanFilterByCQId() throws Exception {
- long executionTime = System.currentTimeMillis();
- TCreateCQReq req =
- new TCreateCQReq(
- "testCq4",
- 1000,
- 0,
- 1000,
- 0,
- (byte) 0,
- "select s1 into root.backup.d4.s1 from root.sg.d4",
- "create cq testCq4 BEGIN select s1 into root.backup.d4.s1 from
root.sg.d4 END",
- "Asia",
- "root");
- cqInfo.addCQ(new AddCQPlan(req, "testCq4Token", executionTime));
-
- ShowCQResp showCQResp = cqInfo.showCQ(new ShowCQPlan("testCq4"));
-
- Assert.assertEquals(1, showCQResp.getCqList().size());
- Assert.assertEquals("testCq4", showCQResp.getCqList().get(0).getCqId());
- }
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
index 3e7fd2052ad..d0e92b32816 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/CreateCQProcedureTest.java
@@ -36,36 +36,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;
public class CreateCQProcedureTest {
- @Test
- public void tokenShouldBeUniqueForSameCQId() {
- ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
- try {
- TCreateCQReq req =
- new TCreateCQReq(
- "testCq1",
- 1000,
- 0,
- 1000,
- 0,
- (byte) 0,
- "select s1 into root.backup.d1(s1) from root.sg.d1",
- "create cq testCq1 BEGIN select s1 into root.backup.d1(s1) from
root.sg.d1 END",
- "Asia",
- "root");
- CreateCQProcedure createCQProcedure1 = new CreateCQProcedure(req,
executor);
- CreateCQProcedure createCQProcedure2 = new CreateCQProcedure(req,
executor);
-
- assertNotEquals(createCQProcedure1.getCqToken(),
createCQProcedure2.getCqToken());
- } finally {
- executor.shutdown();
- }
- }
-
@Test
public void serializeDeserializeTest() {
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java
deleted file mode 100644
index a90e282494f..00000000000
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/cq/CreateCQProcedureRecoveryTest.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.procedure.impl.cq;
-
-import org.apache.iotdb.confignode.consensus.request.read.cq.ShowCQPlan;
-import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan;
-import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
-import org.apache.iotdb.confignode.manager.cq.CQManager;
-import org.apache.iotdb.confignode.manager.cq.CQScheduleTask;
-import org.apache.iotdb.confignode.persistence.cq.CQInfo;
-import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
-
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-public class CreateCQProcedureRecoveryTest {
-
- private TCreateCQReq newCreateCQReq() {
- return new TCreateCQReq(
- "testCq1",
- 1000,
- 0,
- 1000,
- 0,
- (byte) 0,
- "select s1 into root.backup.d1.s1 from root.sg.d1",
- "create cq testCq1 BEGIN select s1 into root.backup.d1.s1 from
root.sg.d1 END",
- "Asia",
- "root");
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void recoverScheduledTaskShouldResubmitFromLatestMetadata() throws
Exception {
- ScheduledExecutorService executor =
Mockito.mock(ScheduledExecutorService.class);
- Mockito.when(
- executor.schedule(
- Mockito.any(Runnable.class), Mockito.anyLong(),
Mockito.any(TimeUnit.class)))
- .thenReturn(Mockito.mock(ScheduledFuture.class));
-
- ConfigManager configManager = Mockito.mock(ConfigManager.class);
- ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);
- CQManager cqManager = Mockito.mock(CQManager.class);
- ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class);
- Mockito.when(env.getConfigManager()).thenReturn(configManager);
-
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
- Mockito.when(configManager.getCQManager()).thenReturn(cqManager);
- Mockito.when(
- cqManager.markCQLocallyScheduled(
- Mockito.anyString(), Mockito.anyString(),
Mockito.any(CQScheduleTask.class)))
- .thenReturn(true);
-
- TCreateCQReq req = newCreateCQReq();
- CreateCQProcedure procedure = new CreateCQProcedure(req, executor);
-
- CQInfo cqInfo = new CQInfo();
- cqInfo.addCQ(new AddCQPlan(req, procedure.getCqToken(),
System.currentTimeMillis() + 10_000));
-
Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ());
-
- procedure.recoverScheduledTask(env);
-
- Mockito.verify(executor)
- .schedule(Mockito.any(Runnable.class), Mockito.anyLong(),
Mockito.any(TimeUnit.class));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void recoverScheduledTaskShouldSkipDuplicatedLocalSchedule() throws
Exception {
- ScheduledExecutorService executor =
Mockito.mock(ScheduledExecutorService.class);
- ConfigManager configManager = Mockito.mock(ConfigManager.class);
- ConsensusManager consensusManager = Mockito.mock(ConsensusManager.class);
- CQManager cqManager = Mockito.mock(CQManager.class);
- ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class);
- Mockito.when(env.getConfigManager()).thenReturn(configManager);
-
Mockito.when(configManager.getConsensusManager()).thenReturn(consensusManager);
- Mockito.when(configManager.getCQManager()).thenReturn(cqManager);
- Mockito.when(
- cqManager.markCQLocallyScheduled(
- Mockito.anyString(), Mockito.anyString(),
Mockito.any(CQScheduleTask.class)))
- .thenReturn(false);
-
- TCreateCQReq req = newCreateCQReq();
- CreateCQProcedure procedure = new CreateCQProcedure(req, executor);
-
- CQInfo cqInfo = new CQInfo();
- cqInfo.addCQ(new AddCQPlan(req, procedure.getCqToken(),
System.currentTimeMillis() + 10_000));
-
Mockito.when(consensusManager.read(Mockito.any(ShowCQPlan.class))).thenReturn(cqInfo.showCQ());
-
- procedure.recoverScheduledTask(env);
-
- Mockito.verify(executor, Mockito.never())
- .schedule(Mockito.any(Runnable.class), Mockito.anyLong(),
Mockito.any(TimeUnit.class));
- }
-}