This is an automated email from the ASF dual-hosted git repository.
CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cc108e78ec8 Fixed TTL problems (#17735)
cc108e78ec8 is described below
commit cc108e78ec81b2b7d003305b88218fa3327f581e
Author: Caideyipi <[email protected]>
AuthorDate: Sun May 24 14:00:46 2026 +0800
Fixed TTL problems (#17735)
---
.../iotdb/confignode/manager/TTLManager.java | 4 +
.../iotdb/confignode/persistence/TTLInfo.java | 33 ++-
.../procedure/impl/schema/SetTTLProcedure.java | 244 ++++++++++++---
.../procedure/state/schema/SetTTLState.java | 4 +-
.../iotdb/confignode/persistence/TTLInfoTest.java | 68 ++++-
.../procedure/impl/schema/SetTTLProcedureTest.java | 327 +++++++++++++++++++++
6 files changed, 637 insertions(+), 43 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java
index dc6ae4f37a1..284b77004be 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/TTLManager.java
@@ -127,6 +127,10 @@ public class TTLManager {
return ((ShowTTLResp) showTTL(new ShowTTLPlan())).getPathTTLMap();
}
+ public long getTTL(final String[] pathPattern) {
+ return ttlInfo.getTTL(pathPattern);
+ }
+
public int getTTLCount() {
return ttlInfo.getTTLCount();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java
index 7b98ebba50b..4921b03851c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java
@@ -70,13 +70,15 @@ public class TTLInfo implements SnapshotProcessor {
try {
// check ttl rule capacity
final int tTlRuleCapacity =
CommonDescriptor.getInstance().getConfig().getTTlRuleCapacity();
- if (getTTLCount() >= tTlRuleCapacity) {
+ final int newTTLRuleCount = calculateNewTTLRuleCount(plan);
+ final int requestedTTLRuleCount = ttlCache.getTtlCount() +
newTTLRuleCount;
+ if (newTTLRuleCount > 0 && requestedTTLRuleCount > tTlRuleCapacity) {
TSStatus errorStatus = new
TSStatus(TSStatusCode.OVERSIZE_TTL.getStatusCode());
errorStatus.setMessage(
String.format(
- "The number of TTL rules has reached the limit (%d). Please
delete "
- + "some existing rules first.",
- tTlRuleCapacity));
+ "The number of TTL rules has reached the limit "
+ + "(capacity: %d, requested total: %d). Please delete some
existing rules first.",
+ tTlRuleCapacity, requestedTTLRuleCount));
return errorStatus;
}
ttlCache.setTTL(plan.getPathPattern(), plan.getTTL());
@@ -92,6 +94,20 @@ public class TTLInfo implements SnapshotProcessor {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
}
+ private int calculateNewTTLRuleCount(SetTTLPlan plan) {
+ int newTTLRuleCount = isNewTTLRule(plan.getPathPattern()) ? 1 : 0;
+ if (plan.isDataBase()) {
+ String[] pathNodes = Arrays.copyOf(plan.getPathPattern(),
plan.getPathPattern().length + 1);
+ pathNodes[pathNodes.length - 1] =
IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+ newTTLRuleCount += isNewTTLRule(pathNodes) ? 1 : 0;
+ }
+ return newTTLRuleCount;
+ }
+
+ private boolean isNewTTLRule(String[] pathNodes) {
+ return ttlCache.getLastNodeTTL(pathNodes) == TTLCache.NULL_TTL;
+ }
+
/** Only used for upgrading from database level ttl to device level ttl. */
public void setTTL(Map<String, Long> databaseTTLMap) throws
IllegalPathException {
lock.writeLock().lock();
@@ -159,6 +175,15 @@ public class TTLInfo implements SnapshotProcessor {
}
}
+ public long getTTL(final String[] pathPattern) {
+ lock.readLock().lock();
+ try {
+ return ttlCache.getLastNodeTTL(pathPattern);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
/**
* Get the maximum ttl of the corresponding database level.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
index b90f2df87d5..dca79a02366 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
@@ -22,8 +22,10 @@ package org.apache.iotdb.confignode.procedure.impl.schema;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.schema.ttl.TTLCache;
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
import
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
import
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
@@ -47,14 +49,21 @@ import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
public class SetTTLProcedure extends
StateMachineProcedure<ConfigNodeProcedureEnv, SetTTLState> {
private static final Logger LOGGER =
LoggerFactory.getLogger(SetTTLProcedure.class);
+ // Distinguishes no previous TTL from TTLCache.NULL_TTL, the explicit unset
marker for rollback.
+ private static final long TTL_NOT_EXIST = Long.MIN_VALUE;
+ private static final int ROLLBACK_STATE_BYTES = Byte.BYTES + Long.BYTES * 2;
private SetTTLPlan plan;
+ private long previousTTL = TTL_NOT_EXIST;
+ private long previousDatabaseWildcardTTL = TTL_NOT_EXIST;
+ private boolean previousTTLStateCaptured = false;
public SetTTLProcedure(final boolean isGeneratedByPipe) {
super(isGeneratedByPipe);
@@ -71,6 +80,10 @@ public class SetTTLProcedure extends
StateMachineProcedure<ConfigNodeProcedureEn
long startTime = System.currentTimeMillis();
try {
switch (state) {
+ case CAPTURE_PREVIOUS_TTL:
+ capturePreviousTTLState(env);
+ setNextState(SetTTLState.SET_CONFIGNODE_TTL);
+ return Flow.HAS_MORE_STATE;
case SET_CONFIGNODE_TTL:
setConfigNodeTTL(env);
return Flow.HAS_MORE_STATE;
@@ -86,18 +99,13 @@ public class SetTTLProcedure extends
StateMachineProcedure<ConfigNodeProcedureEn
}
}
- private void setConfigNodeTTL(ConfigNodeProcedureEnv env) {
- TSStatus res;
- try {
- res =
- env.getConfigManager()
- .getConsensusManager()
- .write(isGeneratedByPipe ? new PipeEnrichedPlan(this.plan) :
this.plan);
- } catch (ConsensusException e) {
-
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
e);
- res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
- res.setMessage(e.getMessage());
+ void setConfigNodeTTL(final ConfigNodeProcedureEnv env) {
+ if (!previousTTLStateCaptured) {
+ capturePreviousTTLState(env);
+ setNextState(SetTTLState.SET_CONFIGNODE_TTL);
+ return;
}
+ final TSStatus res = writeConfigNodePlan(env, plan);
if (res.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.info(ProcedureMessages.FAILED_TO_EXECUTE_PLAN_BECAUSE, plan,
res.message);
setFailure(new ProcedureException(new IoTDBException(res)));
@@ -106,35 +114,177 @@ public class SetTTLProcedure extends
StateMachineProcedure<ConfigNodeProcedureEn
}
}
- private void updateDataNodeTTL(ConfigNodeProcedureEnv env) {
- Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ void updateDataNodeTTL(final ConfigNodeProcedureEnv env) {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
- DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
- new DataNodeAsyncRequestContext<>(
- CnToDnAsyncRequestType.SET_TTL,
- new TSetTTLReq(
- Collections.singletonList(String.join(".",
plan.getPathPattern())),
- plan.getTTL(),
- plan.isDataBase()),
- dataNodeLocationMap);
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+ sendTTLRequest(
+ dataNodeLocationMap,
+ buildSetTTLReq(plan.getPathPattern(), plan.getTTL(),
plan.isDataBase()));
+ if (hasFailedDataNode(clientHandler)) {
+ LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
+ setFailure(
+ new ProcedureException(
+ new
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
+ }
+ }
+
+ private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) {
+ if (previousTTLStateCaptured) {
+ return;
+ }
+ previousTTL = getTTLOrDefault(env, plan.getPathPattern());
+ if (plan.isDataBase()) {
+ previousDatabaseWildcardTTL =
+ getTTLOrDefault(env,
getDatabaseWildcardPathPattern(plan.getPathPattern()));
+ }
+ previousTTLStateCaptured = true;
+ }
+
+ TSStatus writeConfigNodePlan(final ConfigNodeProcedureEnv env, final
SetTTLPlan setTTLPlan) {
+ try {
+ return env.getConfigManager()
+ .getConsensusManager()
+ .write(isGeneratedByPipe ? new PipeEnrichedPlan(setTTLPlan) :
setTTLPlan);
+ } catch (ConsensusException e) {
+
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
e);
+ final TSStatus res = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+ res.setMessage(e.getMessage());
+ return res;
+ }
+ }
+
+ DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> sendTTLRequest(
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap, final
TSetTTLReq req) {
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+ new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SET_TTL, req,
dataNodeLocationMap);
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
- Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
- for (TSStatus status : statusMap.values()) {
- // all dataNodes must clear the related schemaengine cache
+ return clientHandler;
+ }
+
+ private TSetTTLReq buildSetTTLReq(
+ final String[] pathPattern, final long ttl, final boolean isDataBase) {
+ return new TSetTTLReq(
+ Collections.singletonList(String.join(".", pathPattern)), ttl,
isDataBase);
+ }
+
+ private boolean hasFailedDataNode(
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler) {
+ if (!clientHandler.getRequestIndices().isEmpty()) {
+ return true;
+ }
+ for (TSStatus status : clientHandler.getResponseMap().values()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
- setFailure(
- new ProcedureException(
- new
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
- return;
+ return true;
}
}
+ return false;
}
+ private long getTTLOrDefault(final ConfigNodeProcedureEnv env, final
String[] pathPattern) {
+ final long ttl =
env.getConfigManager().getTTLManager().getTTL(pathPattern);
+ return ttl == TTLCache.NULL_TTL ? TTL_NOT_EXIST : ttl;
+ }
+
+ private String[] getDatabaseWildcardPathPattern(final String[] pathPattern) {
+ final String[] pathNodes = Arrays.copyOf(pathPattern, pathPattern.length +
1);
+ pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+ return pathNodes;
+ }
+
+ private void rollbackConfigNodeTTL(final ConfigNodeProcedureEnv env) throws
ProcedureException {
+ restoreTTLOnConfigNode(env, plan.getPathPattern(), previousTTL);
+ if (plan.isDataBase()) {
+ restoreTTLOnConfigNode(
+ env, getDatabaseWildcardPathPattern(plan.getPathPattern()),
previousDatabaseWildcardTTL);
+ }
+ }
+
+ private void restoreTTLOnConfigNode(
+ final ConfigNodeProcedureEnv env, final String[] pathPattern, final long
ttl)
+ throws ProcedureException {
+ // TTL_NOT_EXIST means the original ttl was absent; NULL_TTL asks the
executor to unset it.
+ final SetTTLPlan rollbackPlan =
+ new SetTTLPlan(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL :
ttl);
+ // Database rollback restores the database path and db.** separately, so
avoid auto-expansion.
+ rollbackPlan.setDataBase(false);
+ final TSStatus status = writeConfigNodePlan(env, rollbackPlan);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new ProcedureException(
+ new MetadataException(
+ "Rollback ConfigNode ttl failed for "
+ + String.join(".", pathPattern)
+ + ": "
+ + status.getMessage()));
+ }
+ }
+
+ private void rollbackDataNodeTTL(final ConfigNodeProcedureEnv env) throws
ProcedureException {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
+ restoreTTLOnDataNodes(dataNodeLocationMap, plan.getPathPattern(),
previousTTL);
+ if (plan.isDataBase()) {
+ restoreTTLOnDataNodes(
+ dataNodeLocationMap,
+ getDatabaseWildcardPathPattern(plan.getPathPattern()),
+ previousDatabaseWildcardTTL);
+ }
+ }
+
+ private void restoreTTLOnDataNodes(
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+ final String[] pathPattern,
+ final long ttl)
+ throws ProcedureException {
+ if (dataNodeLocationMap.isEmpty()) {
+ return;
+ }
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+ sendTTLRequest(
+ dataNodeLocationMap,
+ buildSetTTLReq(pathPattern, ttl == TTL_NOT_EXIST ?
TTLCache.NULL_TTL : ttl, false));
+ if (hasFailedDataNode(clientHandler)) {
+ throw new ProcedureException(
+ new MetadataException(
+ "Rollback dataNode ttl cache failed for " + String.join(".",
pathPattern)));
+ }
+ }
+
+ /**
+ * Best-effort rollback: restore both sides, throw the earliest failure, and
suppress later ones.
+ */
@Override
- protected void rollbackState(
- ConfigNodeProcedureEnv configNodeProcedureEnv, SetTTLState setTTLState)
- throws IOException, InterruptedException, ProcedureException {}
+ protected void rollbackState(final ConfigNodeProcedureEnv env, final
SetTTLState setTTLState)
+ throws IOException, InterruptedException, ProcedureException {
+ if (setTTLState != SetTTLState.UPDATE_DATANODE_CACHE ||
!previousTTLStateCaptured) {
+ return;
+ }
+ ProcedureException rollbackFailure = null;
+ try {
+ rollbackConfigNodeTTL(env);
+ } catch (ProcedureException e) {
+ LOGGER.error("Failed to rollback ConfigNode ttl state.", e);
+ rollbackFailure = e;
+ }
+ try {
+ rollbackDataNodeTTL(env);
+ } catch (ProcedureException e) {
+ LOGGER.error("Failed to rollback DataNode ttl cache.", e);
+ if (rollbackFailure == null) {
+ rollbackFailure = e;
+ } else {
+ rollbackFailure.addSuppressed(e);
+ }
+ }
+ if (rollbackFailure != null) {
+ throw rollbackFailure;
+ }
+ }
+
+ @Override
+ protected boolean isRollbackSupported(final SetTTLState state) {
+ return state == SetTTLState.UPDATE_DATANODE_CACHE;
+ }
@Override
protected SetTTLState getState(int stateId) {
@@ -148,7 +298,7 @@ public class SetTTLProcedure extends
StateMachineProcedure<ConfigNodeProcedureEn
@Override
protected SetTTLState getInitialState() {
- return SetTTLState.SET_CONFIGNODE_TTL;
+ return SetTTLState.CAPTURE_PREVIOUS_TTL;
}
@Override
@@ -159,14 +309,25 @@ public class SetTTLProcedure extends
StateMachineProcedure<ConfigNodeProcedureEn
: ProcedureType.SET_TTL_PROCEDURE.getTypeCode());
super.serialize(stream);
ReadWriteIOUtils.write(plan.serializeToByteBuffer(), stream);
+ stream.writeBoolean(previousTTLStateCaptured);
+ stream.writeLong(previousTTL);
+ stream.writeLong(previousDatabaseWildcardTTL);
}
@Override
public void deserialize(ByteBuffer byteBuffer) {
super.deserialize(byteBuffer);
try {
- ReadWriteIOUtils.readInt(byteBuffer);
+ final int length = ReadWriteIOUtils.readInt(byteBuffer);
+ final int position = byteBuffer.position();
this.plan = (SetTTLPlan) ConfigPhysicalPlan.Factory.create(byteBuffer);
+ // The serialized plan buffer may include padding; skip to the actual
payload end.
+ byteBuffer.position(position + length);
+ if (byteBuffer.remaining() >= ROLLBACK_STATE_BYTES) {
+ this.previousTTLStateCaptured = byteBuffer.get() != 0;
+ this.previousTTL = byteBuffer.getLong();
+ this.previousDatabaseWildcardTTL = byteBuffer.getLong();
+ }
} catch (IOException e) {
LOGGER.error(ProcedureMessages.IO_ERROR_WHEN_DESERIALIZE_SETTTL_PLAN, e);
}
@@ -180,12 +341,21 @@ public class SetTTLProcedure extends
StateMachineProcedure<ConfigNodeProcedureEn
if (o == null || getClass() != o.getClass()) {
return false;
}
- return this.plan.equals(((SetTTLProcedure) o).plan)
- && this.isGeneratedByPipe == (((SetTTLProcedure) o).isGeneratedByPipe);
+ final SetTTLProcedure that = (SetTTLProcedure) o;
+ return this.isGeneratedByPipe == that.isGeneratedByPipe
+ && this.previousTTL == that.previousTTL
+ && this.previousDatabaseWildcardTTL == that.previousDatabaseWildcardTTL
+ && this.previousTTLStateCaptured == that.previousTTLStateCaptured
+ && this.plan.equals(that.plan);
}
@Override
public int hashCode() {
- return Objects.hash(plan, isGeneratedByPipe);
+ return Objects.hash(
+ plan,
+ isGeneratedByPipe,
+ previousTTL,
+ previousDatabaseWildcardTTL,
+ previousTTLStateCaptured);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/SetTTLState.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/SetTTLState.java
index fbdc026fc70..4dd3063ea3f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/SetTTLState.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/SetTTLState.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.confignode.procedure.state.schema;
public enum SetTTLState {
+ // Keep existing state ordinals stable for persisted procedures.
SET_CONFIGNODE_TTL,
- UPDATE_DATANODE_CACHE
+ UPDATE_DATANODE_CACHE,
+ CAPTURE_PREVIOUS_TTL
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java
index 42a23d35cb9..e424671fb3a 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.ttl.TTLCache;
import org.apache.iotdb.confignode.consensus.request.read.ttl.ShowTTLPlan;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
import org.apache.iotdb.confignode.consensus.response.ttl.ShowTTLResp;
@@ -50,6 +51,7 @@ public class TTLInfoTest {
private final File snapshotDir = new File(BASE_OUTPUT_PATH,
"ttlInfo-snapshot");
private final long ttl = 123435565323L;
private long[] originTTLArr;
+ private int originTTlRuleCapacity;
@Before
public void setup() throws IOException {
@@ -57,6 +59,7 @@ public class TTLInfoTest {
snapshotDir.mkdirs();
}
originTTLArr = CommonDescriptor.getInstance().getConfig().getTierTTLInMs();
+ originTTlRuleCapacity =
CommonDescriptor.getInstance().getConfig().getTTlRuleCapacity();
long[] ttlArr = new long[2];
ttlArr[0] = 10000000L;
ttlArr[1] = ttl;
@@ -70,6 +73,7 @@ public class TTLInfoTest {
FileUtils.deleteDirectory(snapshotDir);
}
CommonDescriptor.getInstance().getConfig().setTierTTLInMs(originTTLArr);
+
CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(originTTlRuleCapacity);
}
@Test
@@ -208,6 +212,17 @@ public class TTLInfoTest {
Assert.assertEquals(4, ttlInfo.getTTLCount());
}
+ @Test
+ public void testGetTTLReturnsExactPathTTL() throws IllegalPathException {
+ PartialPath path = new PartialPath("root.test.db1.**");
+ ttlInfo.setTTL(new SetTTLPlan(Arrays.asList(path.getNodes()), 121322323L));
+
+ Assert.assertEquals(121322323L, ttlInfo.getTTL(path.getNodes()));
+ Assert.assertEquals(
+ TTLCache.NULL_TTL, ttlInfo.getTTL(new
PartialPath("root.test.db1").getNodes()));
+ Assert.assertEquals(Long.MAX_VALUE, ttlInfo.getTTL(new
PartialPath("root.**").getNodes()));
+ }
+
@Test
public void testUnsetNonExistTTL() throws IllegalPathException {
assertEquals(
@@ -241,10 +256,61 @@ public class TTLInfoTest {
final TSStatus status = ttlInfo.setTTL(setTTLPlan);
assertEquals(TSStatusCode.OVERSIZE_TTL.getStatusCode(), status.code);
assertEquals(
- "The number of TTL rules has reached the limit (1000). Please delete
some existing rules first.",
+ "The number of TTL rules has reached the limit "
+ + "(capacity: 1000, requested total: 1001). Please delete some
existing rules first.",
status.message);
}
+ @Test
+ public void testUpdateExistingTTLWhenCapacityIsReached() {
+ CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(2);
+
+ SetTTLPlan setTTLPlan =
+ new SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"),
1000);
+ assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
ttlInfo.setTTL(setTTLPlan).code);
+ assertEquals(2, ttlInfo.getTTLCount());
+
+ setTTLPlan = new
SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"), 2000);
+ assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
ttlInfo.setTTL(setTTLPlan).code);
+ assertEquals(2, ttlInfo.getTTLCount());
+ assertEquals(
+ Long.valueOf(2000),
+ ttlInfo.showTTL(new
ShowTTLPlan()).getPathTTLMap().get("root.sg1.d1.**"));
+ }
+
+ @Test
+ public void testUpdateExistingTTLWhenCurrentStateIsAlreadyOversize() {
+ SetTTLPlan setTTLPlan =
+ new SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"),
1000);
+ assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
ttlInfo.setTTL(setTTLPlan).code);
+ assertEquals(2, ttlInfo.getTTLCount());
+
+ CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(1);
+
+ setTTLPlan = new
SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"), 2000);
+ assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
ttlInfo.setTTL(setTTLPlan).code);
+ assertEquals(2, ttlInfo.getTTLCount());
+ assertEquals(
+ Long.valueOf(2000),
+ ttlInfo.showTTL(new
ShowTTLPlan()).getPathTTLMap().get("root.sg1.d1.**"));
+ }
+
+ @Test
+ public void testDatabaseTTLShouldReserveTwoSlots() {
+ CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(2);
+
+ SetTTLPlan setTTLPlan = new
SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1"), 1000);
+ setTTLPlan.setDataBase(true);
+
+ final TSStatus status = ttlInfo.setTTL(setTTLPlan);
+ assertEquals(TSStatusCode.OVERSIZE_TTL.getStatusCode(), status.code);
+ assertEquals(1, ttlInfo.getTTLCount());
+ assertEquals(1, ttlInfo.showTTL(new ShowTTLPlan()).getPathTTLMap().size());
+ assertEquals(
+ Long.valueOf(Long.MAX_VALUE),
+ ttlInfo.showTTL(new ShowTTLPlan()).getPathTTLMap().get("root.**"));
+ }
+
@Test
public void testSnapshot() throws TException, IOException,
IllegalPathException {
// set ttl
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java
index 5042eb1dd0f..cb09c23659c 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java
@@ -19,19 +19,42 @@
package org.apache.iotdb.confignode.procedure.impl.schema;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.ttl.TTLCache;
+import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
+import
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.TTLManager;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.state.ProcedureState;
+import org.apache.iotdb.confignode.procedure.state.schema.SetTTLState;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
public class SetTTLProcedureTest {
@@ -65,4 +88,308 @@ public class SetTTLProcedureTest {
buffer.clear();
byteArrayOutputStream.reset();
}
+
+ @Test
+ public void serializeDeserializeTestWithCapturedRollbackState() throws
Exception {
+ final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
+
+ final SetTTLPlan setTTLPlan =
+ new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()),
2000L);
+ setTTLPlan.setDataBase(true);
+ final TestingSetTTLProcedure procedure = new
TestingSetTTLProcedure(setTTLPlan);
+
+ final Map<String, Long> ttlMap = new HashMap<>();
+ ttlMap.put("root.**", Long.MAX_VALUE);
+ ttlMap.put("root.db", 500L);
+ ttlMap.put("root.db.**", 600L);
+
+ procedure.executeFromState(mockProcedureEnv(ttlMap),
SetTTLState.CAPTURE_PREVIOUS_TTL);
+
+ procedure.serialize(outputStream);
+ final ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ final SetTTLProcedure deserializedProcedure =
+ (SetTTLProcedure) ProcedureFactory.getInstance().create(buffer);
+ assertSerializedProcedure(
+ deserializedProcedure, "root.db", 2000L, true, true, 500L, 600L,
false);
+ }
+
+ @Test
+ public void deserializeOldFormatWithoutRollbackStateTest() throws Exception {
+ final SetTTLPlan setTTLPlan =
+ new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()),
2000L);
+ setTTLPlan.setDataBase(true);
+
+ final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
+ writeOldFormatProcedure(outputStream, setTTLPlan);
+
+ final ByteBuffer buffer =
+ ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ final SetTTLProcedure deserializedProcedure =
+ (SetTTLProcedure) ProcedureFactory.getInstance().create(buffer);
+
+ assertSerializedProcedure(
+ deserializedProcedure,
+ "root.db",
+ 2000L,
+ true,
+ false,
+ Long.MIN_VALUE,
+ Long.MIN_VALUE,
+ false);
+ }
+
+ @Test
+ public void setConfigNodeTTLShouldNotWriteBeforePreviousStateIsCaptured()
throws Exception {
+ final SetTTLPlan setTTLPlan =
+ new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()),
2000L);
+ setTTLPlan.setDataBase(true);
+ final TestingSetTTLProcedure procedure = new
TestingSetTTLProcedure(setTTLPlan);
+
+ final Map<String, Long> ttlMap = new HashMap<>();
+ ttlMap.put("root.**", Long.MAX_VALUE);
+ ttlMap.put("root.db", 500L);
+ ttlMap.put("root.db.**", 600L);
+
+ procedure.executeFromState(mockProcedureEnv(ttlMap),
SetTTLState.SET_CONFIGNODE_TTL);
+
+ Assert.assertTrue(procedure.getWrittenPlans().isEmpty());
+ assertSerializedProcedure(procedure, "root.db", 2000L, true, true, 500L,
600L, false);
+
+ procedure.executeFromState(mockProcedureEnv(ttlMap),
SetTTLState.SET_CONFIGNODE_TTL);
+
+ Assert.assertEquals(1, procedure.getWrittenPlans().size());
+ assertPlan(procedure.getWrittenPlans().get(0), "root.db", 2000L, true);
+ }
+
+ @Test
+ public void rollbackStateShouldUnsetNewTTLWhenPreviousStateDidNotExist()
throws Exception {
+ final SetTTLPlan setTTLPlan =
+ new SetTTLPlan(Arrays.asList(new
PartialPath("root.test.sg1.**").getNodes()), 1000L);
+ final TestingSetTTLProcedure procedure = new
TestingSetTTLProcedure(setTTLPlan);
+ procedure.failFirstDataNodeUpdateForTest();
+
+ final ConfigNodeProcedureEnv env =
+ mockProcedureEnv(Collections.singletonMap("root.**", Long.MAX_VALUE));
+
+ procedure.executeFromState(env, SetTTLState.CAPTURE_PREVIOUS_TTL);
+ procedure.executeFromState(env, SetTTLState.SET_CONFIGNODE_TTL);
+ procedure.executeFromState(env, SetTTLState.UPDATE_DATANODE_CACHE);
+ Assert.assertTrue(procedure.isFailed());
+
+ procedure.rollbackState(env, SetTTLState.UPDATE_DATANODE_CACHE);
+
+ Assert.assertEquals(2, procedure.getWrittenPlans().size());
+ assertPlan(procedure.getWrittenPlans().get(0), "root.test.sg1.**", 1000L,
false);
+ assertPlan(procedure.getWrittenPlans().get(1), "root.test.sg1.**", -1L,
false);
+
+ Assert.assertEquals(2, procedure.getRequests().size());
+ assertRequest(procedure.getRequests().get(0), "root.test.sg1.**", 1000L,
false);
+ assertRequest(procedure.getRequests().get(1), "root.test.sg1.**", -1L,
false);
+ }
+
+ @Test
+ public void rollbackStateShouldRestoreDatabaseWildcardTTLSeparately() throws
Exception {
+ final SetTTLPlan setTTLPlan =
+ new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()),
2000L);
+ setTTLPlan.setDataBase(true);
+ final TestingSetTTLProcedure procedure = new
TestingSetTTLProcedure(setTTLPlan);
+ procedure.failFirstDataNodeUpdateForTest();
+
+ final Map<String, Long> ttlMap = new HashMap<>();
+ ttlMap.put("root.**", Long.MAX_VALUE);
+ ttlMap.put("root.db", 500L);
+ ttlMap.put("root.db.**", 600L);
+ final ConfigNodeProcedureEnv env = mockProcedureEnv(ttlMap);
+
+ procedure.executeFromState(env, SetTTLState.CAPTURE_PREVIOUS_TTL);
+ procedure.executeFromState(env, SetTTLState.SET_CONFIGNODE_TTL);
+ procedure.executeFromState(env, SetTTLState.UPDATE_DATANODE_CACHE);
+ Assert.assertTrue(procedure.isFailed());
+
+ procedure.rollbackState(env, SetTTLState.UPDATE_DATANODE_CACHE);
+
+ Assert.assertEquals(3, procedure.getWrittenPlans().size());
+ assertPlan(procedure.getWrittenPlans().get(0), "root.db", 2000L, true);
+ assertPlan(procedure.getWrittenPlans().get(1), "root.db", 500L, false);
+ assertPlan(procedure.getWrittenPlans().get(2), "root.db.**", 600L, false);
+
+ Assert.assertEquals(3, procedure.getRequests().size());
+ assertRequest(procedure.getRequests().get(0), "root.db", 2000L, true);
+ assertRequest(procedure.getRequests().get(1), "root.db", 500L, false);
+ assertRequest(procedure.getRequests().get(2), "root.db.**", 600L, false);
+ }
+
+ private ConfigNodeProcedureEnv mockProcedureEnv(final Map<String, Long>
ttlMap) {
+ final ConfigNodeProcedureEnv env =
Mockito.mock(ConfigNodeProcedureEnv.class);
+ final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+ final TTLManager ttlManager = Mockito.mock(TTLManager.class);
+ final NodeManager nodeManager = Mockito.mock(NodeManager.class);
+
+ final TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
+ dataNodeLocation.setDataNodeId(1);
+
+ Mockito.when(env.getConfigManager()).thenReturn(configManager);
+ Mockito.when(configManager.getTTLManager()).thenReturn(ttlManager);
+ Mockito.when(ttlManager.getTTL(Mockito.any(String[].class)))
+ .thenAnswer(
+ invocation -> {
+ final String[] pathPattern = invocation.getArgument(0);
+ return ttlMap.getOrDefault(String.join(".", pathPattern),
TTLCache.NULL_TTL);
+ });
+ Mockito.when(configManager.getNodeManager()).thenReturn(nodeManager);
+ Mockito.when(nodeManager.getRegisteredDataNodeLocations())
+ .thenReturn(Collections.singletonMap(1, dataNodeLocation));
+ return env;
+ }
+
+ private void assertPlan(
+ final SetTTLPlan plan, final String path, final long ttl, final boolean
isDataBase) {
+ Assert.assertEquals(path, String.join(".", plan.getPathPattern()));
+ Assert.assertEquals(ttl, plan.getTTL());
+ Assert.assertEquals(isDataBase, plan.isDataBase());
+ }
+
+ private void assertRequest(
+ final TSetTTLReq req, final String path, final long ttl, final boolean
isDataBase) {
+ Assert.assertEquals(Collections.singletonList(path), req.getPathPattern());
+ Assert.assertEquals(ttl, req.getTTL());
+ Assert.assertEquals(isDataBase, req.isDataBase);
+ }
+
+ private void writeOldFormatProcedure(final DataOutputStream stream, final
SetTTLPlan plan)
+ throws IOException {
+ stream.writeShort(ProcedureType.SET_TTL_PROCEDURE.getTypeCode());
+ // Procedure fields.
+ stream.writeLong(Procedure.NO_PROC_ID);
+ stream.writeInt(ProcedureState.INITIALIZING.ordinal());
+ stream.writeLong(0L);
+ stream.writeLong(0L);
+ stream.writeLong(Procedure.NO_PROC_ID);
+ stream.writeLong(Procedure.NO_TIMEOUT);
+ stream.writeInt(-1); // no stack indexes
+ stream.write((byte) 0); // no exception
+ stream.writeInt(-1); // no result
+ stream.write((byte) 0); // no lock
+ // StateMachineProcedure fields.
+ stream.writeInt(0); // no states
+ ReadWriteIOUtils.write(plan.serializeToByteBuffer(), stream);
+ }
+
+ private void assertSerializedProcedure(
+ final SetTTLProcedure procedure,
+ final String path,
+ final long ttl,
+ final boolean isDataBase,
+ final boolean previousTTLStateCaptured,
+ final long previousTTL,
+ final long previousDatabaseWildcardTTL,
+ final boolean isGeneratedByPipe)
+ throws Exception {
+ final Field planField = findField(SetTTLProcedure.class, "plan");
+ planField.setAccessible(true);
+ assertPlan((SetTTLPlan) planField.get(procedure), path, ttl, isDataBase);
+
+ final Field previousTTLStateCapturedField =
+ findField(SetTTLProcedure.class, "previousTTLStateCaptured");
+ previousTTLStateCapturedField.setAccessible(true);
+ Assert.assertEquals(previousTTLStateCaptured,
previousTTLStateCapturedField.get(procedure));
+
+ final Field previousTTLField = findField(SetTTLProcedure.class,
"previousTTL");
+ previousTTLField.setAccessible(true);
+ Assert.assertEquals(previousTTL, previousTTLField.get(procedure));
+
+ final Field previousDatabaseWildcardTTLField =
+ findField(SetTTLProcedure.class, "previousDatabaseWildcardTTL");
+ previousDatabaseWildcardTTLField.setAccessible(true);
+ Assert.assertEquals(
+ previousDatabaseWildcardTTL,
previousDatabaseWildcardTTLField.get(procedure));
+
+ final Field isGeneratedByPipeField = findField(SetTTLProcedure.class,
"isGeneratedByPipe");
+ isGeneratedByPipeField.setAccessible(true);
+ Assert.assertEquals(isGeneratedByPipe,
isGeneratedByPipeField.get(procedure));
+ }
+
+ private Field findField(final Class<?> clazz, final String fieldName)
+ throws NoSuchFieldException {
+ Class<?> current = clazz;
+ while (current != null) {
+ try {
+ return current.getDeclaredField(fieldName);
+ } catch (NoSuchFieldException e) {
+ current = current.getSuperclass();
+ }
+ }
+ throw new NoSuchFieldException(fieldName);
+ }
+
+ private static class TestingSetTTLProcedure extends SetTTLProcedure {
+
+ private final List<TSetTTLReq> requests = new ArrayList<>();
+ private final List<SetTTLPlan> writtenPlans = new ArrayList<>();
+ private boolean failFirstDataNodeUpdate = false;
+ private int requestCount = 0;
+
+ private TestingSetTTLProcedure(final SetTTLPlan plan) {
+ super(plan, false);
+ }
+
+ private void failFirstDataNodeUpdateForTest() {
+ failFirstDataNodeUpdate = true;
+ }
+
+ private List<TSetTTLReq> getRequests() {
+ return requests;
+ }
+
+ private List<SetTTLPlan> getWrittenPlans() {
+ return writtenPlans;
+ }
+
+ @Override
+ TSStatus writeConfigNodePlan(final ConfigNodeProcedureEnv env, final
SetTTLPlan setTTLPlan) {
+ writtenPlans.add(copyPlan(setTTLPlan));
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ @Override
+ DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> sendTTLRequest(
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap, final
TSetTTLReq req) {
+ requests.add(copyRequest(req));
+
+ final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+ new DataNodeAsyncRequestContext<>(
+ CnToDnAsyncRequestType.SET_TTL, copyRequest(req),
dataNodeLocationMap);
+ final List<Integer> requestIds = new
ArrayList<>(clientHandler.getNodeLocationMap().keySet());
+ final boolean shouldFail = failFirstDataNodeUpdate && requestCount++ ==
0;
+
+ for (Integer requestId : requestIds) {
+ clientHandler
+ .getResponseMap()
+ .put(
+ requestId,
+ new TSStatus(
+ shouldFail
+ ? TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()
+ : TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ if (!shouldFail) {
+ clientHandler.getNodeLocationMap().remove(requestId);
+ }
+ }
+ return clientHandler;
+ }
+
+ private SetTTLPlan copyPlan(final SetTTLPlan plan) {
+ final SetTTLPlan copiedPlan =
+ new SetTTLPlan(Arrays.asList(plan.getPathPattern()), plan.getTTL());
+ copiedPlan.setDataBase(plan.isDataBase());
+ return copiedPlan;
+ }
+
+ private TSetTTLReq copyRequest(final TSetTTLReq req) {
+ return new TSetTTLReq(new ArrayList<>(req.getPathPattern()),
req.getTTL(), req.isDataBase);
+ }
+ }
}