This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch ttl-fix
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9e4dd5d9c9bba7a5736710d094a386da01bbd64d
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 21 11:41:28 2026 +0800

    fix
---
 .../iotdb/confignode/persistence/TTLInfo.java      |  17 +-
 .../procedure/impl/schema/SetTTLProcedure.java     | 223 +++++++++++++++++----
 .../iotdb/confignode/persistence/TTLInfoTest.java  |  53 +++++
 .../procedure/impl/schema/SetTTLProcedureTest.java | 199 ++++++++++++++++++
 4 files changed, 455 insertions(+), 37 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java
index 7b98ebba50b..d40301b0c56 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/TTLInfo.java
@@ -70,7 +70,8 @@ public class TTLInfo implements SnapshotProcessor {
     try {
       // check ttl rule capacity
       final int tTlRuleCapacity = 
CommonDescriptor.getInstance().getConfig().getTTlRuleCapacity();
-      if (getTTLCount() >= tTlRuleCapacity) {
+      final int newTTLRuleCount = calculateNewTTLRuleCount(plan);
+      if (newTTLRuleCount > 0 && ttlCache.getTtlCount() + newTTLRuleCount > 
tTlRuleCapacity) {
         TSStatus errorStatus = new 
TSStatus(TSStatusCode.OVERSIZE_TTL.getStatusCode());
         errorStatus.setMessage(
             String.format(
@@ -92,6 +93,20 @@ public class TTLInfo implements SnapshotProcessor {
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS);
   }
 
+  private int calculateNewTTLRuleCount(SetTTLPlan plan) {
+    int newTTLRuleCount = getNewTTLRuleCount(plan.getPathPattern());
+    if (plan.isDataBase()) {
+      String[] pathNodes = Arrays.copyOf(plan.getPathPattern(), 
plan.getPathPattern().length + 1);
+      pathNodes[pathNodes.length - 1] = 
IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+      newTTLRuleCount += getNewTTLRuleCount(pathNodes);
+    }
+    return newTTLRuleCount;
+  }
+
+  private int getNewTTLRuleCount(String[] pathNodes) {
+    return ttlCache.getLastNodeTTL(pathNodes) == TTLCache.NULL_TTL ? 1 : 0;
+  }
+
   /** Only used for upgrading from database level ttl to device level ttl. */
   public void setTTL(Map<String, Long> databaseTTLMap) throws 
IllegalPathException {
     lock.writeLock().lock();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
index b90f2df87d5..b4031ec1338 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedure.java
@@ -22,8 +22,10 @@ package org.apache.iotdb.confignode.procedure.impl.schema;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.schema.ttl.TTLCache;
 import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
 import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
 import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
@@ -47,14 +49,19 @@ import org.slf4j.LoggerFactory;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 
 public class SetTTLProcedure extends 
StateMachineProcedure<ConfigNodeProcedureEnv, SetTTLState> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SetTTLProcedure.class);
+  private static final long TTL_NOT_EXIST = Long.MIN_VALUE;
 
   private SetTTLPlan plan;
+  private long previousTTL = TTL_NOT_EXIST;
+  private long previousDatabaseWildcardTTL = TTL_NOT_EXIST;
+  private boolean previousTTLStateCaptured = false;
 
   public SetTTLProcedure(final boolean isGeneratedByPipe) {
     super(isGeneratedByPipe);
@@ -86,18 +93,9 @@ public class SetTTLProcedure extends 
StateMachineProcedure<ConfigNodeProcedureEn
     }
   }
 
-  private void setConfigNodeTTL(ConfigNodeProcedureEnv env) {
-    TSStatus res;
-    try {
-      res =
-          env.getConfigManager()
-              .getConsensusManager()
-              .write(isGeneratedByPipe ? new PipeEnrichedPlan(this.plan) : 
this.plan);
-    } catch (ConsensusException e) {
-      
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
 e);
-      res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
-      res.setMessage(e.getMessage());
-    }
+  protected void setConfigNodeTTL(final ConfigNodeProcedureEnv env) {
+    capturePreviousTTLState(env);
+    final TSStatus res = writeConfigNodePlan(env, plan);
     if (res.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       LOGGER.info(ProcedureMessages.FAILED_TO_EXECUTE_PLAN_BECAUSE, plan, 
res.message);
       setFailure(new ProcedureException(new IoTDBException(res)));
@@ -106,35 +104,171 @@ public class SetTTLProcedure extends 
StateMachineProcedure<ConfigNodeProcedureEn
     }
   }
 
-  private void updateDataNodeTTL(ConfigNodeProcedureEnv env) {
-    Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+  protected void updateDataNodeTTL(final ConfigNodeProcedureEnv env) {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
-    DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
-        new DataNodeAsyncRequestContext<>(
-            CnToDnAsyncRequestType.SET_TTL,
-            new TSetTTLReq(
-                Collections.singletonList(String.join(".", 
plan.getPathPattern())),
-                plan.getTTL(),
-                plan.isDataBase()),
-            dataNodeLocationMap);
+    final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+        sendTTLRequest(
+            dataNodeLocationMap,
+            buildSetTTLReq(plan.getPathPattern(), plan.getTTL(), 
plan.isDataBase()));
+    if (hasFailedDataNode(clientHandler)) {
+      LOGGER.error("Failed to update ttl cache of dataNode.");
+      setFailure(
+          new ProcedureException(
+              new 
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
+    }
+  }
+
+  private void capturePreviousTTLState(final ConfigNodeProcedureEnv env) {
+    if (previousTTLStateCaptured) {
+      return;
+    }
+    final Map<String, Long> ttlMap = 
env.getConfigManager().getTTLManager().getAllTTL();
+    previousTTL = getTTLOrDefault(ttlMap, plan.getPathPattern());
+    if (plan.isDataBase()) {
+      previousDatabaseWildcardTTL =
+          getTTLOrDefault(ttlMap, 
getDatabaseWildcardPathPattern(plan.getPathPattern()));
+    }
+    previousTTLStateCaptured = true;
+  }
+
+  protected TSStatus writeConfigNodePlan(
+      final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) {
+    try {
+      return env.getConfigManager()
+          .getConsensusManager()
+          .write(isGeneratedByPipe ? new PipeEnrichedPlan(setTTLPlan) : 
setTTLPlan);
+    } catch (ConsensusException e) {
+      
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE,
 e);
+      final TSStatus res = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
+      res.setMessage(e.getMessage());
+      return res;
+    }
+  }
+
+  protected DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> sendTTLRequest(
+      final Map<Integer, TDataNodeLocation> dataNodeLocationMap, final 
TSetTTLReq req) {
+    final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+        new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.SET_TTL, req, 
dataNodeLocationMap);
     
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
-    Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap();
-    for (TSStatus status : statusMap.values()) {
-      // all dataNodes must clear the related schemaengine cache
+    return clientHandler;
+  }
+
+  private TSetTTLReq buildSetTTLReq(
+      final String[] pathPattern, final long ttl, final boolean isDataBase) {
+    return new TSetTTLReq(
+        Collections.singletonList(String.join(".", pathPattern)), ttl, 
isDataBase);
+  }
+
+  private boolean hasFailedDataNode(
+      final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler) {
+    if (!clientHandler.getRequestIndices().isEmpty()) {
+      return true;
+    }
+    for (TSStatus status : clientHandler.getResponseMap().values()) {
       if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.error(ProcedureMessages.FAILED_TO_UPDATE_TTL_CACHE_OF_DATANODE);
-        setFailure(
-            new ProcedureException(
-                new 
MetadataException(ProcedureMessages.UPDATE_DATANODE_TTL_CACHE_FAILED)));
-        return;
+        return true;
       }
     }
+    return false;
+  }
+
+  private long getTTLOrDefault(final Map<String, Long> ttlMap, final String[] 
pathPattern) {
+    return ttlMap.getOrDefault(String.join(".", pathPattern), TTL_NOT_EXIST);
+  }
+
+  private String[] getDatabaseWildcardPathPattern(final String[] pathPattern) {
+    final String[] pathNodes = Arrays.copyOf(pathPattern, pathPattern.length + 
1);
+    pathNodes[pathNodes.length - 1] = IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+    return pathNodes;
+  }
+
+  private void rollbackConfigNodeTTL(final ConfigNodeProcedureEnv env) throws 
ProcedureException {
+    restoreTTLOnConfigNode(env, plan.getPathPattern(), previousTTL);
+    if (plan.isDataBase()) {
+      restoreTTLOnConfigNode(
+          env, getDatabaseWildcardPathPattern(plan.getPathPattern()), 
previousDatabaseWildcardTTL);
+    }
+  }
+
+  private void restoreTTLOnConfigNode(
+      final ConfigNodeProcedureEnv env, final String[] pathPattern, final long 
ttl)
+      throws ProcedureException {
+    final SetTTLPlan rollbackPlan =
+        new SetTTLPlan(pathPattern, ttl == TTL_NOT_EXIST ? TTLCache.NULL_TTL : 
ttl);
+    rollbackPlan.setDataBase(false);
+    final TSStatus status = writeConfigNodePlan(env, rollbackPlan);
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new ProcedureException(
+          new MetadataException(
+              "Rollback ConfigNode ttl failed for "
+                  + String.join(".", pathPattern)
+                  + ": "
+                  + status.getMessage()));
+    }
+  }
+
+  private void rollbackDataNodeTTL(final ConfigNodeProcedureEnv env) throws 
ProcedureException {
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        
env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations();
+    restoreTTLOnDataNodes(dataNodeLocationMap, plan.getPathPattern(), 
previousTTL);
+    if (plan.isDataBase()) {
+      restoreTTLOnDataNodes(
+          dataNodeLocationMap,
+          getDatabaseWildcardPathPattern(plan.getPathPattern()),
+          previousDatabaseWildcardTTL);
+    }
+  }
+
+  private void restoreTTLOnDataNodes(
+      final Map<Integer, TDataNodeLocation> dataNodeLocationMap,
+      final String[] pathPattern,
+      final long ttl)
+      throws ProcedureException {
+    if (dataNodeLocationMap.isEmpty()) {
+      return;
+    }
+    final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+        sendTTLRequest(
+            dataNodeLocationMap,
+            buildSetTTLReq(pathPattern, ttl == TTL_NOT_EXIST ? 
TTLCache.NULL_TTL : ttl, false));
+    if (hasFailedDataNode(clientHandler)) {
+      throw new ProcedureException(
+          new MetadataException(
+              "Rollback dataNode ttl cache failed for " + String.join(".", 
pathPattern)));
+    }
   }
 
   @Override
-  protected void rollbackState(
-      ConfigNodeProcedureEnv configNodeProcedureEnv, SetTTLState setTTLState)
-      throws IOException, InterruptedException, ProcedureException {}
+  protected void rollbackState(final ConfigNodeProcedureEnv env, final 
SetTTLState setTTLState)
+      throws IOException, InterruptedException, ProcedureException {
+    if (setTTLState != SetTTLState.UPDATE_DATANODE_CACHE || 
!previousTTLStateCaptured) {
+      return;
+    }
+    ProcedureException rollbackFailure = null;
+    try {
+      rollbackConfigNodeTTL(env);
+    } catch (ProcedureException e) {
+      LOGGER.error("Failed to rollback ConfigNode ttl state.", e);
+      rollbackFailure = e;
+    }
+    try {
+      rollbackDataNodeTTL(env);
+    } catch (ProcedureException e) {
+      LOGGER.error("Failed to rollback DataNode ttl cache.", e);
+      if (rollbackFailure == null) {
+        rollbackFailure = e;
+      }
+    }
+    if (rollbackFailure != null) {
+      throw rollbackFailure;
+    }
+  }
+
+  @Override
+  protected boolean isRollbackSupported(final SetTTLState state) {
+    return state == SetTTLState.UPDATE_DATANODE_CACHE;
+  }
 
   @Override
   protected SetTTLState getState(int stateId) {
@@ -159,6 +293,9 @@ public class SetTTLProcedure extends 
StateMachineProcedure<ConfigNodeProcedureEn
             : ProcedureType.SET_TTL_PROCEDURE.getTypeCode());
     super.serialize(stream);
     ReadWriteIOUtils.write(plan.serializeToByteBuffer(), stream);
+    stream.writeBoolean(previousTTLStateCaptured);
+    stream.writeLong(previousTTL);
+    stream.writeLong(previousDatabaseWildcardTTL);
   }
 
   @Override
@@ -167,6 +304,11 @@ public class SetTTLProcedure extends 
StateMachineProcedure<ConfigNodeProcedureEn
     try {
       ReadWriteIOUtils.readInt(byteBuffer);
       this.plan = (SetTTLPlan) ConfigPhysicalPlan.Factory.create(byteBuffer);
+      if (byteBuffer.remaining() >= 17) {
+        this.previousTTLStateCaptured = byteBuffer.get() != 0;
+        this.previousTTL = byteBuffer.getLong();
+        this.previousDatabaseWildcardTTL = byteBuffer.getLong();
+      }
     } catch (IOException e) {
       LOGGER.error(ProcedureMessages.IO_ERROR_WHEN_DESERIALIZE_SETTTL_PLAN, e);
     }
@@ -180,12 +322,21 @@ public class SetTTLProcedure extends 
StateMachineProcedure<ConfigNodeProcedureEn
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    return this.plan.equals(((SetTTLProcedure) o).plan)
-        && this.isGeneratedByPipe == (((SetTTLProcedure) o).isGeneratedByPipe);
+    final SetTTLProcedure that = (SetTTLProcedure) o;
+    return this.isGeneratedByPipe == that.isGeneratedByPipe
+        && this.previousTTL == that.previousTTL
+        && this.previousDatabaseWildcardTTL == that.previousDatabaseWildcardTTL
+        && this.previousTTLStateCaptured == that.previousTTLStateCaptured
+        && this.plan.equals(that.plan);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(plan, isGeneratedByPipe);
+    return Objects.hash(
+        plan,
+        isGeneratedByPipe,
+        previousTTL,
+        previousDatabaseWildcardTTL,
+        previousTTLStateCaptured);
   }
 }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java
index 42a23d35cb9..bdb02a2d5a5 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/TTLInfoTest.java
@@ -50,6 +50,7 @@ public class TTLInfoTest {
   private final File snapshotDir = new File(BASE_OUTPUT_PATH, 
"ttlInfo-snapshot");
   private final long ttl = 123435565323L;
   private long[] originTTLArr;
+  private int originTTlRuleCapacity;
 
   @Before
   public void setup() throws IOException {
@@ -57,6 +58,7 @@ public class TTLInfoTest {
       snapshotDir.mkdirs();
     }
     originTTLArr = CommonDescriptor.getInstance().getConfig().getTierTTLInMs();
+    originTTlRuleCapacity = 
CommonDescriptor.getInstance().getConfig().getTTlRuleCapacity();
     long[] ttlArr = new long[2];
     ttlArr[0] = 10000000L;
     ttlArr[1] = ttl;
@@ -70,6 +72,7 @@ public class TTLInfoTest {
       FileUtils.deleteDirectory(snapshotDir);
     }
     CommonDescriptor.getInstance().getConfig().setTierTTLInMs(originTTLArr);
+    
CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(originTTlRuleCapacity);
   }
 
   @Test
@@ -245,6 +248,56 @@ public class TTLInfoTest {
         status.message);
   }
 
+  @Test
+  public void testUpdateExistingTTLWhenCapacityIsReached() {
+    CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(2);
+
+    SetTTLPlan setTTLPlan =
+        new SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"), 
1000);
+    assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
ttlInfo.setTTL(setTTLPlan).code);
+    assertEquals(2, ttlInfo.getTTLCount());
+
+    setTTLPlan = new 
SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"), 2000);
+    assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
ttlInfo.setTTL(setTTLPlan).code);
+    assertEquals(2, ttlInfo.getTTLCount());
+    assertEquals(
+        Long.valueOf(2000),
+        ttlInfo.showTTL(new 
ShowTTLPlan()).getPathTTLMap().get("root.sg1.d1.**"));
+  }
+
+  @Test
+  public void testUpdateExistingTTLWhenCurrentStateIsAlreadyOversize() {
+    SetTTLPlan setTTLPlan =
+        new SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"), 
1000);
+    assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
ttlInfo.setTTL(setTTLPlan).code);
+    assertEquals(2, ttlInfo.getTTLCount());
+
+    CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(1);
+
+    setTTLPlan = new 
SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1.d1.**"), 2000);
+    assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
ttlInfo.setTTL(setTTLPlan).code);
+    assertEquals(2, ttlInfo.getTTLCount());
+    assertEquals(
+        Long.valueOf(2000),
+        ttlInfo.showTTL(new 
ShowTTLPlan()).getPathTTLMap().get("root.sg1.d1.**"));
+  }
+
+  @Test
+  public void testDatabaseTTLShouldReserveTwoSlots() {
+    CommonDescriptor.getInstance().getConfig().setTTlRuleCapacity(2);
+
+    SetTTLPlan setTTLPlan = new 
SetTTLPlan(PathNodesGenerator.splitPathToNodes("root.sg1"), 1000);
+    setTTLPlan.setDataBase(true);
+
+    final TSStatus status = ttlInfo.setTTL(setTTLPlan);
+    assertEquals(TSStatusCode.OVERSIZE_TTL.getStatusCode(), status.code);
+    assertEquals(1, ttlInfo.getTTLCount());
+    assertEquals(1, ttlInfo.showTTL(new ShowTTLPlan()).getPathTTLMap().size());
+    assertEquals(
+        Long.valueOf(Long.MAX_VALUE),
+        ttlInfo.showTTL(new ShowTTLPlan()).getPathTTLMap().get("root.**"));
+  }
+
   @Test
   public void testSnapshot() throws TException, IOException, 
IllegalPathException {
     // set ttl
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java
index 5042eb1dd0f..22c6e160554 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/schema/SetTTLProcedureTest.java
@@ -19,19 +19,36 @@
 
 package org.apache.iotdb.confignode.procedure.impl.schema;
 
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
+import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
 import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.manager.TTLManager;
+import org.apache.iotdb.confignode.manager.node.NodeManager;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.state.schema.SetTTLState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.utils.PublicBAOS;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public class SetTTLProcedureTest {
 
@@ -65,4 +82,186 @@ public class SetTTLProcedureTest {
     buffer.clear();
     byteArrayOutputStream.reset();
   }
+
+  @Test
+  public void serializeDeserializeTestWithCapturedRollbackState() throws 
Exception {
+    final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
+
+    final SetTTLPlan setTTLPlan =
+        new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()), 
2000L);
+    setTTLPlan.setDataBase(true);
+    final TestingSetTTLProcedure procedure = new 
TestingSetTTLProcedure(setTTLPlan);
+
+    final Map<String, Long> ttlMap = new HashMap<>();
+    ttlMap.put("root.**", Long.MAX_VALUE);
+    ttlMap.put("root.db", 500L);
+    ttlMap.put("root.db.**", 600L);
+
+    procedure.executeFromState(mockProcedureEnv(ttlMap), 
SetTTLState.SET_CONFIGNODE_TTL);
+
+    procedure.serialize(outputStream);
+    final ByteBuffer buffer =
+        ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    final SetTTLProcedure deserializedProcedure =
+        (SetTTLProcedure) ProcedureFactory.getInstance().create(buffer);
+    Assert.assertTrue(procedure.equals(deserializedProcedure));
+  }
+
+  @Test
+  public void rollbackStateShouldUnsetNewTTLWhenPreviousStateDidNotExist() 
throws Exception {
+    final SetTTLPlan setTTLPlan =
+        new SetTTLPlan(Arrays.asList(new 
PartialPath("root.test.sg1.**").getNodes()), 1000L);
+    final TestingSetTTLProcedure procedure = new 
TestingSetTTLProcedure(setTTLPlan);
+    procedure.failFirstDataNodeUpdateForTest();
+
+    final ConfigNodeProcedureEnv env =
+        mockProcedureEnv(Collections.singletonMap("root.**", Long.MAX_VALUE));
+
+    procedure.executeFromState(env, SetTTLState.SET_CONFIGNODE_TTL);
+    procedure.executeFromState(env, SetTTLState.UPDATE_DATANODE_CACHE);
+    Assert.assertTrue(procedure.isFailed());
+
+    procedure.rollbackState(env, SetTTLState.UPDATE_DATANODE_CACHE);
+
+    Assert.assertEquals(2, procedure.getWrittenPlans().size());
+    assertPlan(procedure.getWrittenPlans().get(0), "root.test.sg1.**", 1000L, 
false);
+    assertPlan(procedure.getWrittenPlans().get(1), "root.test.sg1.**", -1L, 
false);
+
+    Assert.assertEquals(2, procedure.getRequests().size());
+    assertRequest(procedure.getRequests().get(0), "root.test.sg1.**", 1000L, 
false);
+    assertRequest(procedure.getRequests().get(1), "root.test.sg1.**", -1L, 
false);
+  }
+
+  @Test
+  public void rollbackStateShouldRestoreDatabaseWildcardTTLSeparately() throws 
Exception {
+    final SetTTLPlan setTTLPlan =
+        new SetTTLPlan(Arrays.asList(new PartialPath("root.db").getNodes()), 
2000L);
+    setTTLPlan.setDataBase(true);
+    final TestingSetTTLProcedure procedure = new 
TestingSetTTLProcedure(setTTLPlan);
+    procedure.failFirstDataNodeUpdateForTest();
+
+    final Map<String, Long> ttlMap = new HashMap<>();
+    ttlMap.put("root.**", Long.MAX_VALUE);
+    ttlMap.put("root.db", 500L);
+    ttlMap.put("root.db.**", 600L);
+    final ConfigNodeProcedureEnv env = mockProcedureEnv(ttlMap);
+
+    procedure.executeFromState(env, SetTTLState.SET_CONFIGNODE_TTL);
+    procedure.executeFromState(env, SetTTLState.UPDATE_DATANODE_CACHE);
+    Assert.assertTrue(procedure.isFailed());
+
+    procedure.rollbackState(env, SetTTLState.UPDATE_DATANODE_CACHE);
+
+    Assert.assertEquals(3, procedure.getWrittenPlans().size());
+    assertPlan(procedure.getWrittenPlans().get(0), "root.db", 2000L, true);
+    assertPlan(procedure.getWrittenPlans().get(1), "root.db", 500L, false);
+    assertPlan(procedure.getWrittenPlans().get(2), "root.db.**", 600L, false);
+
+    Assert.assertEquals(3, procedure.getRequests().size());
+    assertRequest(procedure.getRequests().get(0), "root.db", 2000L, true);
+    assertRequest(procedure.getRequests().get(1), "root.db", 500L, false);
+    assertRequest(procedure.getRequests().get(2), "root.db.**", 600L, false);
+  }
+
+  private ConfigNodeProcedureEnv mockProcedureEnv(final Map<String, Long> 
ttlMap) {
+    final ConfigNodeProcedureEnv env = 
Mockito.mock(ConfigNodeProcedureEnv.class);
+    final ConfigManager configManager = Mockito.mock(ConfigManager.class);
+    final TTLManager ttlManager = Mockito.mock(TTLManager.class);
+    final NodeManager nodeManager = Mockito.mock(NodeManager.class);
+
+    final TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
+    dataNodeLocation.setDataNodeId(1);
+
+    Mockito.when(env.getConfigManager()).thenReturn(configManager);
+    Mockito.when(configManager.getTTLManager()).thenReturn(ttlManager);
+    Mockito.when(ttlManager.getAllTTL()).thenReturn(ttlMap);
+    Mockito.when(configManager.getNodeManager()).thenReturn(nodeManager);
+    Mockito.when(nodeManager.getRegisteredDataNodeLocations())
+        .thenReturn(Collections.singletonMap(1, dataNodeLocation));
+    return env;
+  }
+
+  private void assertPlan(
+      final SetTTLPlan plan, final String path, final long ttl, final boolean 
isDataBase) {
+    Assert.assertEquals(path, String.join(".", plan.getPathPattern()));
+    Assert.assertEquals(ttl, plan.getTTL());
+    Assert.assertEquals(isDataBase, plan.isDataBase());
+  }
+
+  private void assertRequest(
+      final TSetTTLReq req, final String path, final long ttl, final boolean 
isDataBase) {
+    Assert.assertEquals(Collections.singletonList(path), req.getPathPattern());
+    Assert.assertEquals(ttl, req.getTTL());
+    Assert.assertEquals(isDataBase, req.isDataBase);
+  }
+
+  private static class TestingSetTTLProcedure extends SetTTLProcedure {
+
+    private final List<TSetTTLReq> requests = new ArrayList<>();
+    private final List<SetTTLPlan> writtenPlans = new ArrayList<>();
+    private boolean failFirstDataNodeUpdate = false;
+    private int requestCount = 0;
+
+    private TestingSetTTLProcedure(final SetTTLPlan plan) {
+      super(plan, false);
+    }
+
+    private void failFirstDataNodeUpdateForTest() {
+      failFirstDataNodeUpdate = true;
+    }
+
+    private List<TSetTTLReq> getRequests() {
+      return requests;
+    }
+
+    private List<SetTTLPlan> getWrittenPlans() {
+      return writtenPlans;
+    }
+
+    @Override
+    protected TSStatus writeConfigNodePlan(
+        final ConfigNodeProcedureEnv env, final SetTTLPlan setTTLPlan) {
+      writtenPlans.add(copyPlan(setTTLPlan));
+      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    }
+
+    @Override
+    protected DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> sendTTLRequest(
+        final Map<Integer, TDataNodeLocation> dataNodeLocationMap, final 
TSetTTLReq req) {
+      requests.add(copyRequest(req));
+
+      final DataNodeAsyncRequestContext<TSetTTLReq, TSStatus> clientHandler =
+          new DataNodeAsyncRequestContext<>(
+              CnToDnAsyncRequestType.SET_TTL, copyRequest(req), 
dataNodeLocationMap);
+      final List<Integer> requestIds = new 
ArrayList<>(clientHandler.getNodeLocationMap().keySet());
+      final boolean shouldFail = failFirstDataNodeUpdate && requestCount++ == 
0;
+
+      for (Integer requestId : requestIds) {
+        clientHandler
+            .getResponseMap()
+            .put(
+                requestId,
+                new TSStatus(
+                    shouldFail
+                        ? TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()
+                        : TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+        if (!shouldFail) {
+          clientHandler.getNodeLocationMap().remove(requestId);
+        }
+      }
+      return clientHandler;
+    }
+
+    private SetTTLPlan copyPlan(final SetTTLPlan plan) {
+      final SetTTLPlan copiedPlan =
+          new SetTTLPlan(Arrays.asList(plan.getPathPattern()), plan.getTTL());
+      copiedPlan.setDataBase(plan.isDataBase());
+      return copiedPlan;
+    }
+
+    private TSetTTLReq copyRequest(final TSetTTLReq req) {
+      return new TSetTTLReq(new ArrayList<>(req.getPathPattern()), 
req.getTTL(), req.isDataBase);
+    }
+  }
 }

Reply via email to