This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch
fix_negative_iot_queue_size_and_delete_empty_file
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/fix_negative_iot_queue_size_and_delete_empty_file by this push:
new 71c575ce8fb Fix missing searchIndex and lost deletion when no TsFile
is involved.
71c575ce8fb is described below
commit 71c575ce8fba1c5113e875f6cb78de9bf83a4b91
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Jul 24 16:32:47 2025 +0800
Fix missing searchIndex and lost deletion when no TsFile is involved.
---
...IoTDBRegionOperationReliabilityITFramework.java | 23 ++++-
.../IoTDBRegionMigrateWithLastEmptyDeletionIT.java | 101 +++++++++++++++++++++
.../logdispatcher/IoTConsensusMemoryManager.java | 2 +-
.../plan/node/pipe/PipeEnrichedInsertNode.java | 4 +-
.../planner/plan/node/write/DeleteDataNode.java | 9 +-
.../plan/node/write/InsertMultiTabletsNode.java | 3 +-
.../planner/plan/node/write/InsertRowsNode.java | 3 +-
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 3 +-
.../plan/node/write/RelationalDeleteDataNode.java | 9 +-
.../plan/planner/plan/node/write/SearchNode.java | 3 +-
.../db/storageengine/dataregion/DataRegion.java | 12 +++
11 files changed, 157 insertions(+), 15 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
index 5ef20ddd209..075035dcab9 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.session.Session;
import org.apache.thrift.TException;
import org.apache.tsfile.read.common.Field;
+import org.apache.tsfile.utils.Pair;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.After;
@@ -307,7 +308,7 @@ public class IoTDBRegionOperationReliabilityITFramework {
}
}
- protected Set<Integer> getAllDataNodes(Statement statement) throws Exception
{
+ public static Set<Integer> getAllDataNodes(Statement statement) throws
Exception {
ResultSet result = statement.executeQuery(SHOW_DATANODES);
Set<Integer> allDataNodeId = new HashSet<>();
while (result.next()) {
@@ -444,6 +445,26 @@ public class IoTDBRegionOperationReliabilityITFramework {
return regionMap;
}
+ public static Map<Integer, Pair<Integer, Set<Integer>>>
getDataRegionMapWithLeader(
+ Statement statement) throws Exception {
+ ResultSet showRegionsResult = statement.executeQuery(SHOW_REGIONS);
+ Map<Integer, Pair<Integer, Set<Integer>>> regionMap = new HashMap<>();
+ while (showRegionsResult.next()) {
+ if (String.valueOf(TConsensusGroupType.DataRegion)
+ .equals(showRegionsResult.getString(ColumnHeaderConstant.TYPE))) {
+ int regionId =
showRegionsResult.getInt(ColumnHeaderConstant.REGION_ID);
+ int dataNodeId =
showRegionsResult.getInt(ColumnHeaderConstant.DATA_NODE_ID);
+ Pair<Integer, Set<Integer>> leaderNodesPair =
+ regionMap.computeIfAbsent(regionId, id -> new Pair<>(-1, new
HashSet<>()));
+ leaderNodesPair.getRight().add(dataNodeId);
+ if
(showRegionsResult.getString(ColumnHeaderConstant.ROLE).equals("Leader")) {
+ leaderNodesPair.setLeft(dataNodeId);
+ }
+ }
+ }
+ return regionMap;
+ }
+
public static Map<Integer, Set<Integer>> getAllRegionMap(Statement
statement) throws Exception {
ResultSet showRegionsResult = statement.executeQuery(SHOW_REGIONS);
Map<Integer, Set<Integer>> regionMap = new HashMap<>();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithLastEmptyDeletionIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithLastEmptyDeletionIT.java
new file mode 100644
index 00000000000..18cfcdee6c5
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv1/IoTDBRegionMigrateWithLastEmptyDeletionIT.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1;
+
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.apache.tsfile.utils.Pair;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes;
+import static
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader;
+
+public class IoTDBRegionMigrateWithLastEmptyDeletionIT {
+ @Before
+ public void setUp() throws Exception {
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setDataReplicationFactor(2)
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
+ EnvFactory.getEnv().initClusterEnvironment(1, 3);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testWithLastEmptyDeletion() throws Exception {
+ try (Connection connection = EnvFactory.getEnv().getTableConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("CREATE DATABASE test");
+ statement.execute("USE test");
+ statement.execute("CREATE TABLE t1 (s1 INT64 FIELD)");
+ statement.execute("INSERT INTO t1 (time, s1) VALUES (100, 100)");
+ statement.execute("FLUSH");
+ // the deletion does not involve any file
+ statement.execute("DELETE FROM t1 WHERE time < 100");
+
+ Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMapWithLeader =
+ getDataRegionMapWithLeader(statement);
+ int dataRegionIdForTest =
+
dataRegionMapWithLeader.keySet().stream().max(Integer::compare).get();
+ Pair<Integer, Set<Integer>> leaderAndNodes =
dataRegionMapWithLeader.get(dataRegionIdForTest);
+ Set<Integer> allDataNodes = getAllDataNodes(statement);
+ int leaderId = leaderAndNodes.getLeft();
+ int followerId =
+ leaderAndNodes.getRight().stream().filter(i -> i !=
leaderId).findAny().get();
+ int newLeaderId =
+ allDataNodes.stream().filter(i -> i != leaderId && i !=
followerId).findAny().get();
+
+ System.out.printf(
+ "Old leader: %d, follower: %d, new leader: %d%n", leaderId,
followerId, newLeaderId);
+
+ statement.execute(
+ String.format(
+ "migrate region %d from %d to %d", dataRegionIdForTest,
leaderId, newLeaderId));
+
+ Awaitility.await()
+ .atMost(10, TimeUnit.MINUTES)
+ .pollDelay(1, TimeUnit.SECONDS)
+ .until(
+ () -> {
+ Map<Integer, Pair<Integer, Set<Integer>>> regionMapWithLeader =
+ getDataRegionMapWithLeader(statement);
+ Pair<Integer, Set<Integer>> newLeaderAndNodes =
+ regionMapWithLeader.get(dataRegionIdForTest);
+ Set<Integer> nodes = newLeaderAndNodes.right;
+ return nodes.size() == 2 && nodes.contains(newLeaderId);
+ });
+ }
+ }
+}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
index 03ef5f3a5f1..f26b1516b07 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
@@ -22,8 +22,8 @@ package org.apache.iotdb.consensus.iot.logdispatcher;
import org.apache.iotdb.commons.memory.AtomicLongMemoryBlock;
import org.apache.iotdb.commons.memory.IMemoryBlock;
import org.apache.iotdb.commons.service.metric.MetricService;
-
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
index 102222c77ea..461822a8ab1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
import org.apache.iotdb.db.trigger.executor.TriggerFireVisitor;
import org.apache.tsfile.enums.TSDataType;
@@ -225,8 +226,9 @@ public class PipeEnrichedInsertNode extends InsertNode {
}
@Override
- public void setSearchIndex(final long searchIndex) {
+ public SearchNode setSearchIndex(final long searchIndex) {
insertNode.setSearchIndex(searchIndex);
+ return this;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
index 1f3792d102b..cfba72d66db 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
@@ -394,9 +394,10 @@ public class DeleteDataNode extends AbstractDeleteDataNode
{
.distinct()
.collect(Collectors.toList());
return new DeleteDataNode(
- firstOne.getPlanNodeId(),
- pathList,
- firstOne.getDeleteStartTime(),
- firstOne.getDeleteEndTime());
+ firstOne.getPlanNodeId(),
+ pathList,
+ firstOne.getDeleteStartTime(),
+ firstOne.getDeleteEndTime())
+ .setSearchIndex(firstOne.searchIndex);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index eaf0a0ef2c1..5fdc7ed7dd7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -134,9 +134,10 @@ public class InsertMultiTabletsNode extends InsertNode {
}
@Override
- public void setSearchIndex(long index) {
+ public SearchNode setSearchIndex(long index) {
searchIndex = index;
insertTabletNodeList.forEach(plan -> plan.setSearchIndex(index));
+ return this;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index d34be33dbaf..ea773217766 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -128,9 +128,10 @@ public class InsertRowsNode extends InsertNode implements
WALEntryValue {
}
@Override
- public void setSearchIndex(long index) {
+ public SearchNode setSearchIndex(long index) {
searchIndex = index;
insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
+ return this;
}
public Map<Integer, TSStatus> getResults() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 14c28b4de5e..fc9f66221dd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -98,9 +98,10 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
}
@Override
- public void setSearchIndex(long index) {
+ public SearchNode setSearchIndex(long index) {
searchIndex = index;
insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
+ return this;
}
public TSStatus[] getFailingStatus() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
index 091d78b5d5b..632d7c9ee1e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
@@ -287,11 +287,12 @@ public class RelationalDeleteDataNode extends
AbstractDeleteDataNode {
public String toString() {
return String.format(
- "RelationalDeleteDataNode-%s[ Deletion: %s, Region: %s, ProgressIndex:
%s]",
+ "RelationalDeleteDataNode-%s[ Deletion: %s, Region: %s, ProgressIndex:
%s, SearchIndex: %d]",
getPlanNodeId(),
modEntries,
regionReplicaSet == null ? "Not Assigned" :
regionReplicaSet.getRegionId(),
- progressIndex == null ? "Not Assigned" : progressIndex);
+ progressIndex == null ? "Not Assigned" : progressIndex,
+ searchIndex);
}
@Override
@@ -328,7 +329,7 @@ public class RelationalDeleteDataNode extends
AbstractDeleteDataNode {
.map(RelationalDeleteDataNode::getModEntries)
.flatMap(Collection::stream)
.collect(Collectors.toList());
- return new RelationalDeleteDataNode(
- this.getPlanNodeId(), allTableDeletionEntries, databaseName);
+ return new RelationalDeleteDataNode(this.getPlanNodeId(),
allTableDeletionEntries, databaseName)
+ .setSearchIndex(getSearchIndex());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
index b021025bf6a..d506d1414e1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
@@ -46,8 +46,9 @@ public abstract class SearchNode extends WritePlanNode
implements ComparableCons
}
/** Search index should start from 1 */
- public void setSearchIndex(long searchIndex) {
+ public SearchNode setSearchIndex(long searchIndex) {
this.searchIndex = searchIndex;
+ return this;
}
public abstract SearchNode merge(List<SearchNode> searchNodes);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index f4f4b9c3988..b25f0d8005b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2511,6 +2511,18 @@ public class DataRegion implements IDataRegionForQuery {
walFlushListeners.add(walFlushListener);
}
+ // Some time the deletion operation doesn't have any related tsfile
processor or memtable,
+ // but it's still necessary to write to the WAL, so that iot consensus can
synchronize the
+ // delete
+ // operation to other nodes.
+ if (walFlushListeners.isEmpty()) {
+ logger.info("Writing no-file-related deletion to WAL {}",
deleteDataNode);
+ getWALNode()
+ .ifPresent(
+ walNode ->
+ walFlushListeners.add(
+ walNode.log(TsFileProcessor.MEMTABLE_NOT_EXIST,
deleteDataNode)));
+ }
return walFlushListeners;
}