This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cp_iot_1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8be0ff883764883701e2596de0e3a403ae76c0cc
Author: Jiang Tian <[email protected]>
AuthorDate: Fri Jul 25 11:20:40 2025 +0800

    Fix negative iot queue size & missing search index for deletion & missed 
request when performing empty table deleting (#16022)
    
    * Fix double memory free of iotconsensus queue request during region 
deletion
    
    * Fix missing searchIndex and lost deletion when no TsFile is involved.
---
 ...IoTDBRegionOperationReliabilityITFramework.java |  23 +-
 .../IoTDBRegionMigrateWithLastEmptyDeletionIT.java | 101 +++++++
 .../common/request/IndexedConsensusRequest.java    |  10 +
 .../logdispatcher/IoTConsensusMemoryManager.java   |  21 ++
 .../consensus/iot/logdispatcher/LogDispatcher.java |  22 +-
 .../plan/node/pipe/PipeEnrichedInsertNode.java     |   4 +-
 .../planner/plan/node/write/DeleteDataNode.java    |   9 +-
 .../plan/node/write/InsertMultiTabletsNode.java    |   3 +-
 .../planner/plan/node/write/InsertRowsNode.java    |   3 +-
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |   3 +-
 .../plan/node/write/RelationalDeleteDataNode.java  | 335 +++++++++++++++++++++
 .../plan/planner/plan/node/write/SearchNode.java   |   3 +-
 12 files changed, 521 insertions(+), 16 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
index 0dccc669eeb..1d14574a8ee 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.session.Session;
 
 import org.apache.thrift.TException;
 import org.apache.tsfile.read.common.Field;
+import org.apache.tsfile.utils.Pair;
 import org.awaitility.Awaitility;
 import org.awaitility.core.ConditionTimeoutException;
 import org.junit.After;
@@ -307,7 +308,7 @@ public class IoTDBRegionOperationReliabilityITFramework {
     }
   }
 
-  protected Set<Integer> getAllDataNodes(Statement statement) throws Exception 
{
+  public static Set<Integer> getAllDataNodes(Statement statement) throws 
Exception {
     ResultSet result = statement.executeQuery(SHOW_DATANODES);
     Set<Integer> allDataNodeId = new HashSet<>();
     while (result.next()) {
@@ -444,6 +445,26 @@ public class IoTDBRegionOperationReliabilityITFramework {
     return regionMap;
   }
 
+  public static Map<Integer, Pair<Integer, Set<Integer>>> 
getDataRegionMapWithLeader(
+      Statement statement) throws Exception {
+    ResultSet showRegionsResult = statement.executeQuery(SHOW_REGIONS);
+    Map<Integer, Pair<Integer, Set<Integer>>> regionMap = new HashMap<>();
+    while (showRegionsResult.next()) {
+      if (String.valueOf(TConsensusGroupType.DataRegion)
+          .equals(showRegionsResult.getString(ColumnHeaderConstant.TYPE))) {
+        int regionId = 
showRegionsResult.getInt(ColumnHeaderConstant.REGION_ID);
+        int dataNodeId = 
showRegionsResult.getInt(ColumnHeaderConstant.DATA_NODE_ID);
+        Pair<Integer, Set<Integer>> leaderNodesPair =
+            regionMap.computeIfAbsent(regionId, id -> new Pair<>(-1, new 
HashSet<>()));
+        leaderNodesPair.getRight().add(dataNodeId);
+        if 
(showRegionsResult.getString(ColumnHeaderConstant.ROLE).equals("Leader")) {
+          leaderNodesPair.setLeft(dataNodeId);
+        }
+      }
+    }
+    return regionMap;
+  }
+
   public static Map<Integer, Set<Integer>> getAllRegionMap(Statement 
statement) throws Exception {
     ResultSet showRegionsResult = statement.executeQuery(SHOW_REGIONS);
     Map<Integer, Set<Integer>> regionMap = new HashMap<>();
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateWithLastEmptyDeletionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateWithLastEmptyDeletionIT.java
new file mode 100644
index 00000000000..18cfcdee6c5
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateWithLastEmptyDeletionIT.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1;
+
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.apache.tsfile.utils.Pair;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes;
+import static 
org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader;
+
+public class IoTDBRegionMigrateWithLastEmptyDeletionIT {
+  @Before
+  public void setUp() throws Exception {
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setDataReplicationFactor(2)
+        .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS);
+    EnvFactory.getEnv().initClusterEnvironment(1, 3);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testWithLastEmptyDeletion() throws Exception {
+    try (Connection connection = EnvFactory.getEnv().getTableConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE DATABASE test");
+      statement.execute("USE test");
+      statement.execute("CREATE TABLE t1 (s1 INT64 FIELD)");
+      statement.execute("INSERT INTO t1 (time, s1) VALUES (100, 100)");
+      statement.execute("FLUSH");
+      // the deletion does not involve any file
+      statement.execute("DELETE FROM t1 WHERE time < 100");
+
+      Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMapWithLeader =
+          getDataRegionMapWithLeader(statement);
+      int dataRegionIdForTest =
+          
dataRegionMapWithLeader.keySet().stream().max(Integer::compare).get();
+      Pair<Integer, Set<Integer>> leaderAndNodes = 
dataRegionMapWithLeader.get(dataRegionIdForTest);
+      Set<Integer> allDataNodes = getAllDataNodes(statement);
+      int leaderId = leaderAndNodes.getLeft();
+      int followerId =
+          leaderAndNodes.getRight().stream().filter(i -> i != 
leaderId).findAny().get();
+      int newLeaderId =
+          allDataNodes.stream().filter(i -> i != leaderId && i != 
followerId).findAny().get();
+
+      System.out.printf(
+          "Old leader: %d, follower: %d, new leader: %d%n", leaderId, 
followerId, newLeaderId);
+
+      statement.execute(
+          String.format(
+              "migrate region %d from %d to %d", dataRegionIdForTest, 
leaderId, newLeaderId));
+
+      Awaitility.await()
+          .atMost(10, TimeUnit.MINUTES)
+          .pollDelay(1, TimeUnit.SECONDS)
+          .until(
+              () -> {
+                Map<Integer, Pair<Integer, Set<Integer>>> regionMapWithLeader =
+                    getDataRegionMapWithLeader(statement);
+                Pair<Integer, Set<Integer>> newLeaderAndNodes =
+                    regionMapWithLeader.get(dataRegionIdForTest);
+                Set<Integer> nodes = newLeaderAndNodes.right;
+                return nodes.size() == 2 && nodes.contains(newLeaderId);
+              });
+    }
+  }
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 1147abc049e..2bf01d4ef86 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
 
 /** only used for iot consensus. */
 public class IndexedConsensusRequest implements IConsensusRequest {
@@ -34,6 +35,7 @@ public class IndexedConsensusRequest implements 
IConsensusRequest {
   private final List<IConsensusRequest> requests;
   private final List<ByteBuffer> serializedRequests;
   private long memorySize = 0;
+  private AtomicLong referenceCnt = new AtomicLong();
 
   public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> 
requests) {
     this.searchIndex = searchIndex;
@@ -100,4 +102,12 @@ public class IndexedConsensusRequest implements 
IConsensusRequest {
   public int hashCode() {
     return Objects.hash(searchIndex, requests);
   }
+
+  public long incRef() {
+    return referenceCnt.getAndIncrement();
+  }
+
+  public long decRef() {
+    return referenceCnt.getAndDecrement();
+  }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
index df54918c60e..3e7893b4c44 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.consensus.iot.logdispatcher;
 
 import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,6 +40,17 @@ public class IoTConsensusMemoryManager {
     MetricService.getInstance().addMetricSet(new 
IoTConsensusMemoryManagerMetrics(this));
   }
 
+  public boolean reserve(IndexedConsensusRequest request, boolean fromQueue) {
+    synchronized (request) {
+      long prevRef = request.incRef();
+      if (prevRef == 0) {
+        return reserve(request.getMemorySize(), fromQueue);
+      } else {
+        return true;
+      }
+    }
+  }
+
   public boolean reserve(long size, boolean fromQueue) {
     AtomicBoolean result = new AtomicBoolean(false);
     memorySizeInByte.updateAndGet(
@@ -73,6 +85,15 @@ public class IoTConsensusMemoryManager {
     return result.get();
   }
 
+  public void free(IndexedConsensusRequest request, boolean fromQueue) {
+    synchronized (request) {
+      long prevRef = request.decRef();
+      if (prevRef == 0) {
+        free(request.getMemorySize(), fromQueue);
+      }
+    }
+  }
+
   public void free(long size, boolean fromQueue) {
     long currentUsedMemory = memorySizeInByte.addAndGet(-size);
     if (fromQueue) {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index a363e7dd9aa..7f1f91b1fa0 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -283,7 +283,7 @@ public class LogDispatcher {
 
     /** try to offer a request into queue with memory control. */
     public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
-      if 
(!iotConsensusMemoryManager.reserve(indexedConsensusRequest.getMemorySize(), 
true)) {
+      if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest, true)) {
         return false;
       }
       boolean success;
@@ -291,19 +291,19 @@ public class LogDispatcher {
         success = pendingEntries.offer(indexedConsensusRequest);
       } catch (Throwable t) {
         // If exception occurs during request offer, the reserved memory 
should be released
-        
iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), true);
+        iotConsensusMemoryManager.free(indexedConsensusRequest, true);
         throw t;
       }
       if (!success) {
         // If offer failed, the reserved memory should be released
-        
iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), true);
+        iotConsensusMemoryManager.free(indexedConsensusRequest, true);
       }
       return success;
     }
 
     /** try to remove a request from queue with memory control. */
     private void releaseReservedMemory(IndexedConsensusRequest 
indexedConsensusRequest) {
-      iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), 
true);
+      iotConsensusMemoryManager.free(indexedConsensusRequest, true);
     }
 
     public void stop() {
@@ -325,13 +325,23 @@ public class LogDispatcher {
       }
       long requestSize = 0;
       for (IndexedConsensusRequest indexedConsensusRequest : pendingEntries) {
-        requestSize += indexedConsensusRequest.getMemorySize();
+        synchronized (indexedConsensusRequest) {
+          long prevRef = indexedConsensusRequest.decRef();
+          if (prevRef == 1) {
+            requestSize += indexedConsensusRequest.getMemorySize();
+          }
+        }
       }
       pendingEntries.clear();
       iotConsensusMemoryManager.free(requestSize, true);
       requestSize = 0;
       for (IndexedConsensusRequest indexedConsensusRequest : bufferedEntries) {
-        requestSize += indexedConsensusRequest.getMemorySize();
+        synchronized (indexedConsensusRequest) {
+          long prevRef = indexedConsensusRequest.decRef();
+          if (prevRef == 1) {
+            requestSize += indexedConsensusRequest.getMemorySize();
+          }
+        }
       }
       iotConsensusMemoryManager.free(requestSize, true);
       syncStatus.free();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
index e152ed09bda..3f5bdf205b2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
 import org.apache.iotdb.db.trigger.executor.TriggerFireVisitor;
 
 import org.apache.tsfile.enums.TSDataType;
@@ -225,8 +226,9 @@ public class PipeEnrichedInsertNode extends InsertNode {
   }
 
   @Override
-  public void setSearchIndex(long searchIndex) {
+  public SearchNode setSearchIndex(final long searchIndex) {
     insertNode.setSearchIndex(searchIndex);
+    return this;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
index 0e7eae3f9d7..c4c4159f66d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
@@ -354,9 +354,10 @@ public class DeleteDataNode extends SearchNode implements 
WALEntryValue {
             .distinct()
             .collect(Collectors.toList());
     return new DeleteDataNode(
-        firstOne.getPlanNodeId(),
-        pathList,
-        firstOne.getDeleteStartTime(),
-        firstOne.getDeleteEndTime());
+            firstOne.getPlanNodeId(),
+            pathList,
+            firstOne.getDeleteStartTime(),
+            firstOne.getDeleteEndTime())
+        .setSearchIndex(firstOne.searchIndex);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 6e401b4d54b..773881e398e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -134,9 +134,10 @@ public class InsertMultiTabletsNode extends InsertNode {
   }
 
   @Override
-  public void setSearchIndex(long index) {
+  public SearchNode setSearchIndex(long index) {
     searchIndex = index;
     insertTabletNodeList.forEach(plan -> plan.setSearchIndex(index));
+    return this;
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index caf86e67bab..7ec81d626df 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -128,9 +128,10 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
   }
 
   @Override
-  public void setSearchIndex(long index) {
+  public SearchNode setSearchIndex(long index) {
     searchIndex = index;
     insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
+    return this;
   }
 
   public Map<Integer, TSStatus> getResults() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 021e51b69af..c7b193841d4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -98,9 +98,10 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
   }
 
   @Override
-  public void setSearchIndex(long index) {
+  public SearchNode setSearchIndex(long index) {
     searchIndex = index;
     insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
+    return this;
   }
 
   public TSStatus[] getFailingStatus() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
new file mode 100644
index 00000000000..632d7c9ee1e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import 
org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException;
+import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
+
+import org.apache.tsfile.utils.PublicBAOS;
+import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+@SuppressWarnings({"java:S1854", "unused"})
+public class RelationalDeleteDataNode extends AbstractDeleteDataNode {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RelationalDeleteDataNode.class);
+
+  /** byte: type */
+  private static final int FIXED_SERIALIZED_SIZE = Short.BYTES;
+
+  private final List<TableDeletionEntry> modEntries;
+
+  private Collection<TRegionReplicaSet> replicaSets;
+
+  private final String databaseName;
+
+  public RelationalDeleteDataNode(final PlanNodeId id, final Delete delete) {
+    super(id);
+    this.modEntries = delete.getTableDeletionEntries();
+    this.replicaSets = delete.getReplicaSets();
+    this.databaseName = delete.getDatabaseName();
+  }
+
+  public RelationalDeleteDataNode(
+      final PlanNodeId id, final TableDeletionEntry entry, final String 
databaseName) {
+    super(id);
+    this.modEntries = Collections.singletonList(entry);
+    this.databaseName = databaseName;
+  }
+
+  public RelationalDeleteDataNode(
+      final PlanNodeId id, final List<TableDeletionEntry> entries, final 
String databaseName) {
+    super(id);
+    this.modEntries = entries;
+    this.databaseName = databaseName;
+  }
+
+  public RelationalDeleteDataNode(
+      final PlanNodeId id, final Delete delete, final ProgressIndex 
progressIndex) {
+    this(id, delete);
+    this.progressIndex = progressIndex;
+  }
+
+  public RelationalDeleteDataNode(
+      final PlanNodeId id, final Delete delete, final TRegionReplicaSet 
regionReplicaSet) {
+    this(id, delete);
+    this.regionReplicaSet = regionReplicaSet;
+  }
+
+  public RelationalDeleteDataNode(
+      final PlanNodeId id,
+      final TableDeletionEntry delete,
+      final TRegionReplicaSet regionReplicaSet,
+      final String databaseName) {
+    this(id, delete, databaseName);
+    this.regionReplicaSet = regionReplicaSet;
+  }
+
+  public RelationalDeleteDataNode(
+      PlanNodeId id,
+      List<TableDeletionEntry> deletes,
+      TRegionReplicaSet regionReplicaSet,
+      String databaseName) {
+    this(id, deletes, databaseName);
+    this.regionReplicaSet = regionReplicaSet;
+  }
+
+  public static RelationalDeleteDataNode deserializeFromWAL(DataInputStream 
stream)
+      throws IOException {
+    long searchIndex = stream.readLong();
+
+    int entryNum = ReadWriteForEncodingUtils.readVarInt(stream);
+    List<TableDeletionEntry> modEntries = new ArrayList<>(entryNum);
+    for (int i = 0; i < entryNum; i++) {
+      modEntries.add((TableDeletionEntry) ModEntry.createFrom(stream));
+    }
+    String databaseName = ReadWriteIOUtils.readVarIntString(stream);
+
+    RelationalDeleteDataNode deleteDataNode =
+        new RelationalDeleteDataNode(new PlanNodeId(""), modEntries, 
databaseName);
+    deleteDataNode.setSearchIndex(searchIndex);
+    return deleteDataNode;
+  }
+
+  public static RelationalDeleteDataNode deserializeFromWAL(ByteBuffer buffer) 
{
+    long searchIndex = buffer.getLong();
+    int entryNum = ReadWriteForEncodingUtils.readVarInt(buffer);
+    List<TableDeletionEntry> modEntries = new ArrayList<>(entryNum);
+    for (int i = 0; i < entryNum; i++) {
+      modEntries.add((TableDeletionEntry) ModEntry.createFrom(buffer));
+    }
+    String databaseName = ReadWriteIOUtils.readVarIntString(buffer);
+
+    RelationalDeleteDataNode deleteDataNode =
+        new RelationalDeleteDataNode(new PlanNodeId(""), modEntries, 
databaseName);
+    deleteDataNode.setSearchIndex(searchIndex);
+    return deleteDataNode;
+  }
+
+  public static RelationalDeleteDataNode deserialize(ByteBuffer byteBuffer) {
+    int entryNum = ReadWriteForEncodingUtils.readVarInt(byteBuffer);
+    List<TableDeletionEntry> modEntries = new ArrayList<>(entryNum);
+    for (int i = 0; i < entryNum; i++) {
+      modEntries.add((TableDeletionEntry) ModEntry.createFrom(byteBuffer));
+    }
+    String databaseName = ReadWriteIOUtils.readVarIntString(byteBuffer);
+
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+
+    // DeleteDataNode has no child
+    int ignoredChildrenSize = ReadWriteIOUtils.readInt(byteBuffer);
+    RelationalDeleteDataNode relationalDeleteDataNode =
+        new RelationalDeleteDataNode(planNodeId, modEntries, databaseName);
+    return relationalDeleteDataNode;
+  }
+
+  public static RelationalDeleteDataNode deserializeFromDAL(ByteBuffer 
byteBuffer) {
+    // notice that the type is deserialized here, may move it outside
+    short nodeType = byteBuffer.getShort();
+    int entryNum = ReadWriteForEncodingUtils.readVarInt(byteBuffer);
+    List<TableDeletionEntry> modEntries = new ArrayList<>(entryNum);
+    for (int i = 0; i < entryNum; i++) {
+      modEntries.add((TableDeletionEntry) ModEntry.createFrom(byteBuffer));
+    }
+    String databaseName = ReadWriteIOUtils.readVarIntString(byteBuffer);
+
+    ProgressIndex deserializedIndex = 
ProgressIndexType.deserializeFrom(byteBuffer);
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+
+    // DeleteDataNode has no child
+    int ignoredChildrenSize = ReadWriteIOUtils.readInt(byteBuffer);
+    RelationalDeleteDataNode relationalDeleteDataNode =
+        new RelationalDeleteDataNode(planNodeId, modEntries, databaseName);
+    relationalDeleteDataNode.setProgressIndex(deserializedIndex);
+    return relationalDeleteDataNode;
+  }
+
+  @Override
+  public ByteBuffer serializeToDAL() {
+    try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      DeleteNodeType.RELATIONAL_DELETE_NODE.serialize(outputStream);
+      serializeAttributes(outputStream);
+      progressIndex.serialize(outputStream);
+      id.serialize(outputStream);
+      // write children nodes size
+      ReadWriteIOUtils.write(0, outputStream);
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    } catch (IOException e) {
+      throw new SerializationRunTimeException(e);
+    }
+  }
+
+  @Override
+  public PlanNodeType getType() {
+    return PlanNodeType.RELATIONAL_DELETE_DATA;
+  }
+
+  @SuppressWarnings({"java:S2975", "java:S1182"})
+  @Override
+  public PlanNode clone() {
+    return new RelationalDeleteDataNode(getPlanNodeId(), modEntries, 
databaseName);
+  }
+
+  @Override
+  public int serializedSize() {
+    int size = FIXED_SERIALIZED_SIZE + 
ReadWriteForEncodingUtils.varIntSize(modEntries.size());
+    for (TableDeletionEntry modEntry : modEntries) {
+      size += modEntry.serializedSize();
+    }
+    return size;
+  }
+
+  @Override
+  public void serializeToWAL(IWALByteBufferView buffer) {
+    buffer.putShort(PlanNodeType.RELATIONAL_DELETE_DATA.getNodeType());
+    buffer.putLong(searchIndex);
+    try {
+      ReadWriteForEncodingUtils.writeVarInt(modEntries.size(), buffer);
+      for (TableDeletionEntry modEntry : modEntries) {
+        modEntry.serialize(buffer);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Failed to serialize modEntry to WAL", e);
+    }
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.RELATIONAL_DELETE_DATA.serialize(byteBuffer);
+    ReadWriteForEncodingUtils.writeVarInt(modEntries.size(), byteBuffer);
+    modEntries.forEach(entry -> entry.serialize(byteBuffer));
+    ReadWriteIOUtils.writeVar(databaseName, byteBuffer);
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.RELATIONAL_DELETE_DATA.serialize(stream);
+    ReadWriteForEncodingUtils.writeVarInt(modEntries.size(), stream);
+    for (TableDeletionEntry modEntry : modEntries) {
+      modEntry.serialize(stream);
+    }
+    ReadWriteIOUtils.writeVar(databaseName, stream);
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitDeleteData(this, context);
+  }
+
+  @Override
+  public TRegionReplicaSet getRegionReplicaSet() {
+    return regionReplicaSet;
+  }
+
+  public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
+    this.regionReplicaSet = regionReplicaSet;
+  }
+
+  @Override
+  public boolean equals(final Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    final RelationalDeleteDataNode that = (RelationalDeleteDataNode) obj;
+    return this.getPlanNodeId().equals(that.getPlanNodeId())
+        && Objects.equals(this.modEntries, that.modEntries);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(getPlanNodeId(), modEntries, progressIndex);
+  }
+
+  public String toString() {
+    return String.format(
+        "RelationalDeleteDataNode-%s[ Deletion: %s, Region: %s, ProgressIndex: 
%s, SearchIndex: %d]",
+        getPlanNodeId(),
+        modEntries,
+        regionReplicaSet == null ? "Not Assigned" : 
regionReplicaSet.getRegionId(),
+        progressIndex == null ? "Not Assigned" : progressIndex,
+        searchIndex);
+  }
+
+  @Override
+  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+    return replicaSets.stream()
+        .map(r -> new RelationalDeleteDataNode(getPlanNodeId(), modEntries, r, 
databaseName))
+        .collect(Collectors.toList());
+  }
+
+  public List<TableDeletionEntry> getModEntries() {
+    return modEntries;
+  }
+
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  @Override
+  public SearchNode merge(List<SearchNode> searchNodes) {
+    List<RelationalDeleteDataNode> relationalDeleteDataNodeList =
+        searchNodes.stream()
+            .map(searchNode -> (RelationalDeleteDataNode) searchNode)
+            .collect(Collectors.toList());
+    if (relationalDeleteDataNodeList.stream()
+        .anyMatch(
+            relationalDeleteDataNode ->
+                this.getDatabaseName() != null
+                    && !this.getDatabaseName()
+                        .equals(relationalDeleteDataNode.getDatabaseName()))) {
+      throw new IllegalArgumentException("All database name need to be same");
+    }
+    List<TableDeletionEntry> allTableDeletionEntries =
+        relationalDeleteDataNodeList.stream()
+            .map(RelationalDeleteDataNode::getModEntries)
+            .flatMap(Collection::stream)
+            .collect(Collectors.toList());
+    return new RelationalDeleteDataNode(this.getPlanNodeId(), 
allTableDeletionEntries, databaseName)
+        .setSearchIndex(getSearchIndex());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
index ecae2cd7197..e9c614e7a98 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
@@ -45,8 +45,9 @@ public abstract class SearchNode extends WritePlanNode {
   }
 
   /** Search index should start from 1 */
-  public void setSearchIndex(long searchIndex) {
+  public SearchNode setSearchIndex(long searchIndex) {
     this.searchIndex = searchIndex;
+    return this;
   }
 
   public abstract SearchNode merge(List<SearchNode> searchNodes);

Reply via email to