This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch 
fix_negative_iot_queue_size_and_delete_empty_file
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to 
refs/heads/fix_negative_iot_queue_size_and_delete_empty_file by this push:
     new 71c575ce8fb Fix missing searchIndex and lost deletion when no TsFile 
is involved.
71c575ce8fb is described below

commit 71c575ce8fba1c5113e875f6cb78de9bf83a4b91
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Jul 24 16:32:47 2025 +0800

    Fix missing searchIndex and lost deletion when no TsFile is involved.
---
 ...IoTDBRegionOperationReliabilityITFramework.java |  23 ++++-
 .../IoTDBRegionMigrateWithLastEmptyDeletionIT.java | 101 +++++++++++++++++++++
 .../logdispatcher/IoTConsensusMemoryManager.java   |   2 +-
 .../plan/node/pipe/PipeEnrichedInsertNode.java     |   4 +-
 .../planner/plan/node/write/DeleteDataNode.java    |   9 +-
 .../plan/node/write/InsertMultiTabletsNode.java    |   3 +-
 .../planner/plan/node/write/InsertRowsNode.java    |   3 +-
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |   3 +-
 .../plan/node/write/RelationalDeleteDataNode.java  |   9 +-
 .../plan/planner/plan/node/write/SearchNode.java   |   3 +-
 .../db/storageengine/dataregion/DataRegion.java    |  12 +++
 11 files changed, 157 insertions(+), 15 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
index 5ef20ddd209..075035dcab9 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.session.Session;
 
 import org.apache.thrift.TException;
 import org.apache.tsfile.read.common.Field;
+import org.apache.tsfile.utils.Pair;
 import org.awaitility.Awaitility;
 import org.awaitility.core.ConditionTimeoutException;
 import org.junit.After;
@@ -307,7 +308,7 @@ public class IoTDBRegionOperationReliabilityITFramework {
     }
   }
 
-  protected Set<Integer> getAllDataNodes(Statement statement) throws Exception 
{
+  public static Set<Integer> getAllDataNodes(Statement statement) throws 
Exception {
     ResultSet result = statement.executeQuery(SHOW_DATANODES);
     Set<Integer> allDataNodeId = new HashSet<>();
     while (result.next()) {
@@ -444,6 +445,26 @@ public class IoTDBRegionOperationReliabilityITFramework {
     return regionMap;
   }
 
+  public static Map<Integer, Pair<Integer, Set<Integer>>> 
getDataRegionMapWithLeader(
+      Statement statement) throws Exception {
+    ResultSet showRegionsResult = statement.executeQuery(SHOW_REGIONS);
+    Map<Integer, Pair<Integer, Set<Integer>>> regionMap = new HashMap<>();
+    while (showRegionsResult.next()) {
+      if (String.valueOf(TConsensusGroupType.DataRegion)
+          .equals(showRegionsResult.getString(ColumnHeaderConstant.TYPE))) {
+        int regionId = 
showRegionsResult.getInt(ColumnHeaderConstant.REGION_ID);
+        int dataNodeId = 
showRegionsResult.getInt(ColumnHeaderConstant.DATA_NODE_ID);
+        Pair<Integer, Set<Integer>> leaderNodesPair =
+            regionMap.computeIfAbsent(regionId, id -> new Pair<>(-1, new 
HashSet<>()));
+        leaderNodesPair.getRight().add(dataNodeId);
+        if 
(showRegionsResult.getString(ColumnHeaderConstant.ROLE).equals("Leader")) {
+          leaderNodesPair.setLeft(dataNodeId);
+        }
+      }
+    }
+    return regionMap;
+  }
+
   public static Map<Integer, Set<Integer>> getAllRegionMap(Statement 
statement) throws Exception {
     ResultSet showRegionsResult = statement.executeQuery(SHOW_REGIONS);
     Map<Integer, Set<Integer>> regionMap = new HashMap<>();
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithLastEmptyDeletionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithLastEmptyDeletionIT.java
new file mode 100644
index 00000000000..18cfcdee6c5
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithLastEmptyDeletionIT.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1;
+
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.apache.tsfile.utils.Pair;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes;
+import static 
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader;
+
+public class IoTDBRegionMigrateWithLastEmptyDeletionIT {
+  @Before
+  public void setUp() throws Exception {
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setDataReplicationFactor(2)
+        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
+    EnvFactory.getEnv().initClusterEnvironment(1, 3);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testWithLastEmptyDeletion() throws Exception {
+    try (Connection connection = EnvFactory.getEnv().getTableConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE DATABASE test");
+      statement.execute("USE test");
+      statement.execute("CREATE TABLE t1 (s1 INT64 FIELD)");
+      statement.execute("INSERT INTO t1 (time, s1) VALUES (100, 100)");
+      statement.execute("FLUSH");
+      // the deletion does not involve any file
+      statement.execute("DELETE FROM t1 WHERE time < 100");
+
+      Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMapWithLeader =
+          getDataRegionMapWithLeader(statement);
+      int dataRegionIdForTest =
+          
dataRegionMapWithLeader.keySet().stream().max(Integer::compare).get();
+      Pair<Integer, Set<Integer>> leaderAndNodes = 
dataRegionMapWithLeader.get(dataRegionIdForTest);
+      Set<Integer> allDataNodes = getAllDataNodes(statement);
+      int leaderId = leaderAndNodes.getLeft();
+      int followerId =
+          leaderAndNodes.getRight().stream().filter(i -> i != 
leaderId).findAny().get();
+      int newLeaderId =
+          allDataNodes.stream().filter(i -> i != leaderId && i != 
followerId).findAny().get();
+
+      System.out.printf(
+          "Old leader: %d, follower: %d, new leader: %d%n", leaderId, 
followerId, newLeaderId);
+
+      statement.execute(
+          String.format(
+              "migrate region %d from %d to %d", dataRegionIdForTest, 
leaderId, newLeaderId));
+
+      Awaitility.await()
+          .atMost(10, TimeUnit.MINUTES)
+          .pollDelay(1, TimeUnit.SECONDS)
+          .until(
+              () -> {
+                Map<Integer, Pair<Integer, Set<Integer>>> regionMapWithLeader =
+                    getDataRegionMapWithLeader(statement);
+                Pair<Integer, Set<Integer>> newLeaderAndNodes =
+                    regionMapWithLeader.get(dataRegionIdForTest);
+                Set<Integer> nodes = newLeaderAndNodes.right;
+                return nodes.size() == 2 && nodes.contains(newLeaderId);
+              });
+    }
+  }
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
index 03ef5f3a5f1..f26b1516b07 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
@@ -22,8 +22,8 @@ package org.apache.iotdb.consensus.iot.logdispatcher;
 import org.apache.iotdb.commons.memory.AtomicLongMemoryBlock;
 import org.apache.iotdb.commons.memory.IMemoryBlock;
 import org.apache.iotdb.commons.service.metric.MetricService;
-
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
index 102222c77ea..461822a8ab1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
 import org.apache.iotdb.db.trigger.executor.TriggerFireVisitor;
 
 import org.apache.tsfile.enums.TSDataType;
@@ -225,8 +226,9 @@ public class PipeEnrichedInsertNode extends InsertNode {
   }
 
   @Override
-  public void setSearchIndex(final long searchIndex) {
+  public SearchNode setSearchIndex(final long searchIndex) {
     insertNode.setSearchIndex(searchIndex);
+    return this;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
index 1f3792d102b..cfba72d66db 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
@@ -394,9 +394,10 @@ public class DeleteDataNode extends AbstractDeleteDataNode 
{
             .distinct()
             .collect(Collectors.toList());
     return new DeleteDataNode(
-        firstOne.getPlanNodeId(),
-        pathList,
-        firstOne.getDeleteStartTime(),
-        firstOne.getDeleteEndTime());
+            firstOne.getPlanNodeId(),
+            pathList,
+            firstOne.getDeleteStartTime(),
+            firstOne.getDeleteEndTime())
+        .setSearchIndex(firstOne.searchIndex);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index eaf0a0ef2c1..5fdc7ed7dd7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -134,9 +134,10 @@ public class InsertMultiTabletsNode extends InsertNode {
   }
 
   @Override
-  public void setSearchIndex(long index) {
+  public SearchNode setSearchIndex(long index) {
     searchIndex = index;
     insertTabletNodeList.forEach(plan -> plan.setSearchIndex(index));
+    return this;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index d34be33dbaf..ea773217766 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -128,9 +128,10 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
   }
 
   @Override
-  public void setSearchIndex(long index) {
+  public SearchNode setSearchIndex(long index) {
     searchIndex = index;
     insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
+    return this;
   }
 
   public Map<Integer, TSStatus> getResults() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 14c28b4de5e..fc9f66221dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -98,9 +98,10 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
   }
 
   @Override
-  public void setSearchIndex(long index) {
+  public SearchNode setSearchIndex(long index) {
     searchIndex = index;
     insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
+    return this;
   }
 
   public TSStatus[] getFailingStatus() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
index 091d78b5d5b..632d7c9ee1e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
@@ -287,11 +287,12 @@ public class RelationalDeleteDataNode extends 
AbstractDeleteDataNode {
 
   public String toString() {
     return String.format(
-        "RelationalDeleteDataNode-%s[ Deletion: %s, Region: %s, ProgressIndex: 
%s]",
+        "RelationalDeleteDataNode-%s[ Deletion: %s, Region: %s, ProgressIndex: 
%s, SearchIndex: %d]",
         getPlanNodeId(),
         modEntries,
         regionReplicaSet == null ? "Not Assigned" : 
regionReplicaSet.getRegionId(),
-        progressIndex == null ? "Not Assigned" : progressIndex);
+        progressIndex == null ? "Not Assigned" : progressIndex,
+        searchIndex);
   }
 
   @Override
@@ -328,7 +329,7 @@ public class RelationalDeleteDataNode extends 
AbstractDeleteDataNode {
             .map(RelationalDeleteDataNode::getModEntries)
             .flatMap(Collection::stream)
             .collect(Collectors.toList());
-    return new RelationalDeleteDataNode(
-        this.getPlanNodeId(), allTableDeletionEntries, databaseName);
+    return new RelationalDeleteDataNode(this.getPlanNodeId(), 
allTableDeletionEntries, databaseName)
+        .setSearchIndex(getSearchIndex());
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
index b021025bf6a..d506d1414e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
@@ -46,8 +46,9 @@ public abstract class SearchNode extends WritePlanNode 
implements ComparableCons
   }
 
   /** Search index should start from 1 */
-  public void setSearchIndex(long searchIndex) {
+  public SearchNode setSearchIndex(long searchIndex) {
     this.searchIndex = searchIndex;
+    return this;
   }
 
   public abstract SearchNode merge(List<SearchNode> searchNodes);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index f4f4b9c3988..b25f0d8005b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2511,6 +2511,18 @@ public class DataRegion implements IDataRegionForQuery {
       walFlushListeners.add(walFlushListener);
     }
 
+    // Some time the deletion operation doesn't have any related tsfile 
processor or memtable,
+    // but it's still necessary to write to the WAL, so that iot consensus can 
synchronize the
+    // delete
+    // operation to other nodes.
+    if (walFlushListeners.isEmpty()) {
+      logger.info("Writing no-file-related deletion to WAL {}", 
deleteDataNode);
+      getWALNode()
+          .ifPresent(
+              walNode ->
+                  walFlushListeners.add(
+                      walNode.log(TsFileProcessor.MEMTABLE_NOT_EXIST, 
deleteDataNode)));
+    }
     return walFlushListeners;
   }
 

Reply via email to