This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch cp_iot_1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8be0ff883764883701e2596de0e3a403ae76c0cc Author: Jiang Tian <[email protected]> AuthorDate: Fri Jul 25 11:20:40 2025 +0800 Fix negative iot queue size & missing search index for deletion & missed request when performing empty table deleting (#16022) * Fix double memory free of iotconsensus queue request during region deletion * Fix missing searchIndex and lost deletion when no TsFile is involved. --- ...IoTDBRegionOperationReliabilityITFramework.java | 23 +- .../IoTDBRegionMigrateWithLastEmptyDeletionIT.java | 101 +++++++ .../common/request/IndexedConsensusRequest.java | 10 + .../logdispatcher/IoTConsensusMemoryManager.java | 21 ++ .../consensus/iot/logdispatcher/LogDispatcher.java | 22 +- .../plan/node/pipe/PipeEnrichedInsertNode.java | 4 +- .../planner/plan/node/write/DeleteDataNode.java | 9 +- .../plan/node/write/InsertMultiTabletsNode.java | 3 +- .../planner/plan/node/write/InsertRowsNode.java | 3 +- .../plan/node/write/InsertRowsOfOneDeviceNode.java | 3 +- .../plan/node/write/RelationalDeleteDataNode.java | 335 +++++++++++++++++++++ .../plan/planner/plan/node/write/SearchNode.java | 3 +- 12 files changed, 521 insertions(+), 16 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java index 0dccc669eeb..1d14574a8ee 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java @@ -44,6 +44,7 @@ import org.apache.iotdb.session.Session; import org.apache.thrift.TException; import org.apache.tsfile.read.common.Field; +import org.apache.tsfile.utils.Pair; import org.awaitility.Awaitility; import org.awaitility.core.ConditionTimeoutException; import org.junit.After; @@ -307,7 +308,7 @@ public class IoTDBRegionOperationReliabilityITFramework { } } - protected Set<Integer> getAllDataNodes(Statement statement) throws Exception { + public static Set<Integer> getAllDataNodes(Statement statement) throws Exception { ResultSet result = statement.executeQuery(SHOW_DATANODES); Set<Integer> allDataNodeId = new HashSet<>(); while (result.next()) { @@ -444,6 +445,26 @@ public class IoTDBRegionOperationReliabilityITFramework { return regionMap; } + public static Map<Integer, Pair<Integer, Set<Integer>>> getDataRegionMapWithLeader( + Statement statement) throws Exception { + ResultSet showRegionsResult = statement.executeQuery(SHOW_REGIONS); + Map<Integer, Pair<Integer, Set<Integer>>> regionMap = new HashMap<>(); + while (showRegionsResult.next()) { + if (String.valueOf(TConsensusGroupType.DataRegion) + .equals(showRegionsResult.getString(ColumnHeaderConstant.TYPE))) { + int regionId = showRegionsResult.getInt(ColumnHeaderConstant.REGION_ID); + int dataNodeId = showRegionsResult.getInt(ColumnHeaderConstant.DATA_NODE_ID); + Pair<Integer, Set<Integer>> leaderNodesPair = + regionMap.computeIfAbsent(regionId, id -> new Pair<>(-1, new HashSet<>())); + leaderNodesPair.getRight().add(dataNodeId); + if (showRegionsResult.getString(ColumnHeaderConstant.ROLE).equals("Leader")) { + leaderNodesPair.setLeft(dataNodeId); + } + } + } + return regionMap; + } + public static Map<Integer, Set<Integer>> getAllRegionMap(Statement statement) throws Exception { ResultSet showRegionsResult = statement.executeQuery(SHOW_REGIONS); Map<Integer, Set<Integer>> regionMap = new HashMap<>(); diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateWithLastEmptyDeletionIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateWithLastEmptyDeletionIT.java new file mode 100644 index 00000000000..18cfcdee6c5 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/IoTDBRegionMigrateWithLastEmptyDeletionIT.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.regionmigration.pass.daily.iotv1; + +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.it.env.EnvFactory; + +import org.apache.tsfile.utils.Pair; +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.Statement; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllDataNodes; +import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMapWithLeader; + +public class IoTDBRegionMigrateWithLastEmptyDeletionIT { + @Before + public void setUp() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(2) + .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS); + EnvFactory.getEnv().initClusterEnvironment(1, 3); + } + + @After + public void tearDown() throws Exception { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void testWithLastEmptyDeletion() throws Exception { + try (Connection connection = EnvFactory.getEnv().getTableConnection(); + Statement statement = connection.createStatement()) { + statement.execute("CREATE DATABASE test"); + statement.execute("USE test"); + statement.execute("CREATE TABLE t1 (s1 INT64 FIELD)"); + statement.execute("INSERT INTO t1 (time, s1) VALUES (100, 100)"); + statement.execute("FLUSH"); + // the deletion does not involve any file + statement.execute("DELETE FROM t1 WHERE time < 100"); + + Map<Integer, Pair<Integer, Set<Integer>>> dataRegionMapWithLeader = + getDataRegionMapWithLeader(statement); + int dataRegionIdForTest = + dataRegionMapWithLeader.keySet().stream().max(Integer::compare).get(); + Pair<Integer, Set<Integer>> leaderAndNodes = dataRegionMapWithLeader.get(dataRegionIdForTest); + Set<Integer> allDataNodes = getAllDataNodes(statement); + int leaderId = leaderAndNodes.getLeft(); + int followerId = + leaderAndNodes.getRight().stream().filter(i -> i != leaderId).findAny().get(); + int newLeaderId = + allDataNodes.stream().filter(i -> i != leaderId && i != followerId).findAny().get(); + + System.out.printf( + "Old leader: %d, follower: %d, new leader: %d%n", leaderId, followerId, newLeaderId); + + statement.execute( + String.format( + "migrate region %d from %d to %d", dataRegionIdForTest, leaderId, newLeaderId)); + + Awaitility.await() + .atMost(10, TimeUnit.MINUTES) + .pollDelay(1, TimeUnit.SECONDS) + .until( + () -> { + Map<Integer, Pair<Integer, Set<Integer>>> regionMapWithLeader = + getDataRegionMapWithLeader(statement); + Pair<Integer, Set<Integer>> newLeaderAndNodes = + regionMapWithLeader.get(dataRegionIdForTest); + Set<Integer> nodes = newLeaderAndNodes.right; + return nodes.size() == 2 && nodes.contains(newLeaderId); + }); + } + } +} diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java index 1147abc049e..2bf01d4ef86 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; /** only used for iot consensus. */ public class IndexedConsensusRequest implements IConsensusRequest { @@ -34,6 +35,7 @@ public class IndexedConsensusRequest implements IConsensusRequest { private final List<IConsensusRequest> requests; private final List<ByteBuffer> serializedRequests; private long memorySize = 0; + private AtomicLong referenceCnt = new AtomicLong(); public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) { this.searchIndex = searchIndex; @@ -100,4 +102,12 @@ public class IndexedConsensusRequest implements IConsensusRequest { public int hashCode() { return Objects.hash(searchIndex, requests); } + + public long incRef() { + return referenceCnt.getAndIncrement(); + } + + public long decRef() { + return referenceCnt.getAndDecrement(); + } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java index df54918c60e..3e7893b4c44 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java @@ -20,6 +20,7 @@ package org.apache.iotdb.consensus.iot.logdispatcher; import org.apache.iotdb.commons.service.metric.MetricService; +import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,17 @@ public class IoTConsensusMemoryManager { MetricService.getInstance().addMetricSet(new IoTConsensusMemoryManagerMetrics(this)); } + public boolean reserve(IndexedConsensusRequest request, boolean fromQueue) { + synchronized (request) { + long prevRef = request.incRef(); + if (prevRef == 0) { + return reserve(request.getMemorySize(), fromQueue); + } else { + return true; + } + } + } + public boolean reserve(long size, boolean fromQueue) { AtomicBoolean result = new AtomicBoolean(false); memorySizeInByte.updateAndGet( @@ -73,6 +85,15 @@ public class IoTConsensusMemoryManager { return result.get(); } + public void free(IndexedConsensusRequest request, boolean fromQueue) { + synchronized (request) { + long prevRef = request.decRef(); + if (prevRef == 0) { + free(request.getMemorySize(), fromQueue); + } + } + } + public void free(long size, boolean fromQueue) { long currentUsedMemory = memorySizeInByte.addAndGet(-size); if (fromQueue) { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index a363e7dd9aa..7f1f91b1fa0 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -283,7 +283,7 @@ public class LogDispatcher { /** try to offer a request into queue with memory control. */ public boolean offer(IndexedConsensusRequest indexedConsensusRequest) { - if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest.getMemorySize(), true)) { + if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest, true)) { return false; } boolean success; @@ -291,19 +291,19 @@ public class LogDispatcher { success = pendingEntries.offer(indexedConsensusRequest); } catch (Throwable t) { // If exception occurs during request offer, the reserved memory should be released - iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), true); + iotConsensusMemoryManager.free(indexedConsensusRequest, true); throw t; } if (!success) { // If offer failed, the reserved memory should be released - iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), true); + iotConsensusMemoryManager.free(indexedConsensusRequest, true); } return success; } /** try to remove a request from queue with memory control. */ private void releaseReservedMemory(IndexedConsensusRequest indexedConsensusRequest) { - iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), true); + iotConsensusMemoryManager.free(indexedConsensusRequest, true); } public void stop() { @@ -325,13 +325,23 @@ public class LogDispatcher { } long requestSize = 0; for (IndexedConsensusRequest indexedConsensusRequest : pendingEntries) { - requestSize += indexedConsensusRequest.getMemorySize(); + synchronized (indexedConsensusRequest) { + long prevRef = indexedConsensusRequest.decRef(); + if (prevRef == 1) { + requestSize += indexedConsensusRequest.getMemorySize(); + } + } } pendingEntries.clear(); iotConsensusMemoryManager.free(requestSize, true); requestSize = 0; for (IndexedConsensusRequest indexedConsensusRequest : bufferedEntries) { - requestSize += indexedConsensusRequest.getMemorySize(); + synchronized (indexedConsensusRequest) { + long prevRef = indexedConsensusRequest.decRef(); + if (prevRef == 1) { + requestSize += indexedConsensusRequest.getMemorySize(); + } + } } iotConsensusMemoryManager.free(requestSize, true); syncStatus.free(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java index e152ed09bda..3f5bdf205b2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java @@ -31,6 +31,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode; import org.apache.iotdb.db.trigger.executor.TriggerFireVisitor; import org.apache.tsfile.enums.TSDataType; @@ -225,8 +226,9 @@ public class PipeEnrichedInsertNode extends InsertNode { } @Override - public void setSearchIndex(long searchIndex) { + public SearchNode setSearchIndex(final long searchIndex) { insertNode.setSearchIndex(searchIndex); + return this; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java index 0e7eae3f9d7..c4c4159f66d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java @@ -354,9 +354,10 @@ public class DeleteDataNode extends SearchNode implements WALEntryValue { .distinct() .collect(Collectors.toList()); return new DeleteDataNode( - firstOne.getPlanNodeId(), - pathList, - firstOne.getDeleteStartTime(), - firstOne.getDeleteEndTime()); + firstOne.getPlanNodeId(), + pathList, + firstOne.getDeleteStartTime(), + firstOne.getDeleteEndTime()) + .setSearchIndex(firstOne.searchIndex); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java index 6e401b4d54b..773881e398e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java @@ -134,9 +134,10 @@ public class InsertMultiTabletsNode extends InsertNode { } @Override - public void setSearchIndex(long index) { + public SearchNode setSearchIndex(long index) { searchIndex = index; insertTabletNodeList.forEach(plan -> plan.setSearchIndex(index)); + return this; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java index caf86e67bab..7ec81d626df 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java @@ -128,9 +128,10 @@ public class InsertRowsNode extends InsertNode implements WALEntryValue { } @Override - public void setSearchIndex(long index) { + public SearchNode setSearchIndex(long index) { searchIndex = index; insertRowNodeList.forEach(plan -> plan.setSearchIndex(index)); + return this; } public Map<Integer, TSStatus> getResults() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java index 021e51b69af..c7b193841d4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java @@ -98,9 +98,10 @@ public class InsertRowsOfOneDeviceNode extends InsertNode { } @Override - public void setSearchIndex(long index) { + public SearchNode setSearchIndex(long index) { searchIndex = index; insertRowNodeList.forEach(plan -> plan.setSearchIndex(index)); + return this; } public TSStatus[] getFailingStatus() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java new file mode 100644 index 00000000000..632d7c9ee1e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write; + +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.commons.consensus.index.ProgressIndex; +import org.apache.iotdb.commons.consensus.index.ProgressIndexType; +import org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException; +import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView; + +import org.apache.tsfile.utils.PublicBAOS; +import org.apache.tsfile.utils.ReadWriteForEncodingUtils; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +@SuppressWarnings({"java:S1854", "unused"}) +public class RelationalDeleteDataNode extends AbstractDeleteDataNode { + private static final Logger LOGGER = LoggerFactory.getLogger(RelationalDeleteDataNode.class); + + /** byte: type */ + private static final int FIXED_SERIALIZED_SIZE = Short.BYTES; + + private final List<TableDeletionEntry> modEntries; + + private Collection<TRegionReplicaSet> replicaSets; + + private final String databaseName; + + public RelationalDeleteDataNode(final PlanNodeId id, final Delete delete) { + super(id); + this.modEntries = delete.getTableDeletionEntries(); + this.replicaSets = delete.getReplicaSets(); + this.databaseName = delete.getDatabaseName(); + } + + public RelationalDeleteDataNode( + final PlanNodeId id, final TableDeletionEntry entry, final String databaseName) { + super(id); + this.modEntries = Collections.singletonList(entry); + this.databaseName = databaseName; + } + + public RelationalDeleteDataNode( + final PlanNodeId id, final List<TableDeletionEntry> entries, final String databaseName) { + super(id); + this.modEntries = entries; + this.databaseName = databaseName; + } + + public RelationalDeleteDataNode( + final PlanNodeId id, final Delete delete, final ProgressIndex progressIndex) { + this(id, delete); + this.progressIndex = progressIndex; + } + + public RelationalDeleteDataNode( + final PlanNodeId id, final Delete delete, final TRegionReplicaSet regionReplicaSet) { + this(id, delete); + this.regionReplicaSet = regionReplicaSet; + } + + public RelationalDeleteDataNode( + final PlanNodeId id, + final TableDeletionEntry delete, + final TRegionReplicaSet regionReplicaSet, + final String databaseName) { + this(id, delete, databaseName); + this.regionReplicaSet = regionReplicaSet; + } + + public RelationalDeleteDataNode( + PlanNodeId id, + List<TableDeletionEntry> deletes, + TRegionReplicaSet regionReplicaSet, + String databaseName) { + this(id, deletes, databaseName); + this.regionReplicaSet = regionReplicaSet; + } + + public static RelationalDeleteDataNode deserializeFromWAL(DataInputStream stream) + throws IOException { + long searchIndex = stream.readLong(); + + int entryNum = ReadWriteForEncodingUtils.readVarInt(stream); + List<TableDeletionEntry> modEntries = new ArrayList<>(entryNum); + for (int i = 0; i < entryNum; i++) { + modEntries.add((TableDeletionEntry) ModEntry.createFrom(stream)); + } + String databaseName = ReadWriteIOUtils.readVarIntString(stream); + + RelationalDeleteDataNode deleteDataNode = + new RelationalDeleteDataNode(new PlanNodeId(""), modEntries, databaseName); + deleteDataNode.setSearchIndex(searchIndex); + return deleteDataNode; + } + + public static RelationalDeleteDataNode deserializeFromWAL(ByteBuffer buffer) { + long searchIndex = buffer.getLong(); + int entryNum = ReadWriteForEncodingUtils.readVarInt(buffer); + List<TableDeletionEntry> modEntries = new ArrayList<>(entryNum); + for (int i = 0; i < entryNum; i++) { + modEntries.add((TableDeletionEntry) ModEntry.createFrom(buffer)); + } + String databaseName = ReadWriteIOUtils.readVarIntString(buffer); + + RelationalDeleteDataNode deleteDataNode = + new RelationalDeleteDataNode(new PlanNodeId(""), modEntries, databaseName); + deleteDataNode.setSearchIndex(searchIndex); + return deleteDataNode; + } + + public static RelationalDeleteDataNode deserialize(ByteBuffer byteBuffer) { + int entryNum = ReadWriteForEncodingUtils.readVarInt(byteBuffer); + List<TableDeletionEntry> modEntries = new ArrayList<>(entryNum); + for (int i = 0; i < entryNum; i++) { + modEntries.add((TableDeletionEntry) ModEntry.createFrom(byteBuffer)); + } + String databaseName = ReadWriteIOUtils.readVarIntString(byteBuffer); + + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + + // DeleteDataNode has no child + int ignoredChildrenSize = ReadWriteIOUtils.readInt(byteBuffer); + RelationalDeleteDataNode relationalDeleteDataNode = + new RelationalDeleteDataNode(planNodeId, modEntries, databaseName); + return relationalDeleteDataNode; + } + + public static RelationalDeleteDataNode deserializeFromDAL(ByteBuffer byteBuffer) { + // notice that the type is deserialized here, may move it outside + short nodeType = byteBuffer.getShort(); + int entryNum = ReadWriteForEncodingUtils.readVarInt(byteBuffer); + List<TableDeletionEntry> modEntries = new ArrayList<>(entryNum); + for (int i = 0; i < entryNum; i++) { + modEntries.add((TableDeletionEntry) ModEntry.createFrom(byteBuffer)); + } + String databaseName = ReadWriteIOUtils.readVarIntString(byteBuffer); + + ProgressIndex deserializedIndex = ProgressIndexType.deserializeFrom(byteBuffer); + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + + // DeleteDataNode has no child + int ignoredChildrenSize = ReadWriteIOUtils.readInt(byteBuffer); + RelationalDeleteDataNode relationalDeleteDataNode = + new RelationalDeleteDataNode(planNodeId, modEntries, databaseName); + relationalDeleteDataNode.setProgressIndex(deserializedIndex); + return relationalDeleteDataNode; + } + + @Override + public ByteBuffer serializeToDAL() { + try (PublicBAOS byteArrayOutputStream = new PublicBAOS(); + DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + DeleteNodeType.RELATIONAL_DELETE_NODE.serialize(outputStream); + serializeAttributes(outputStream); + progressIndex.serialize(outputStream); + id.serialize(outputStream); + // write children nodes size + ReadWriteIOUtils.write(0, outputStream); + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } catch (IOException e) { + throw new SerializationRunTimeException(e); + } + } + + @Override + public PlanNodeType getType() { + return PlanNodeType.RELATIONAL_DELETE_DATA; + } + + @SuppressWarnings({"java:S2975", "java:S1182"}) + @Override + public PlanNode clone() { + return new RelationalDeleteDataNode(getPlanNodeId(), modEntries, databaseName); + } + + @Override + public int serializedSize() { + int size = FIXED_SERIALIZED_SIZE + ReadWriteForEncodingUtils.varIntSize(modEntries.size()); + for (TableDeletionEntry modEntry : modEntries) { + size += modEntry.serializedSize(); + } + return size; + } + + @Override + public void serializeToWAL(IWALByteBufferView buffer) { + buffer.putShort(PlanNodeType.RELATIONAL_DELETE_DATA.getNodeType()); + buffer.putLong(searchIndex); + try { + ReadWriteForEncodingUtils.writeVarInt(modEntries.size(), buffer); + for (TableDeletionEntry modEntry : modEntries) { + modEntry.serialize(buffer); + } + } catch (IOException e) { + LOGGER.error("Failed to serialize modEntry to WAL", e); + } + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.RELATIONAL_DELETE_DATA.serialize(byteBuffer); + ReadWriteForEncodingUtils.writeVarInt(modEntries.size(), byteBuffer); + modEntries.forEach(entry -> entry.serialize(byteBuffer)); + ReadWriteIOUtils.writeVar(databaseName, byteBuffer); + } + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.RELATIONAL_DELETE_DATA.serialize(stream); + ReadWriteForEncodingUtils.writeVarInt(modEntries.size(), stream); + for (TableDeletionEntry modEntry : modEntries) { + modEntry.serialize(stream); + } + ReadWriteIOUtils.writeVar(databaseName, stream); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitDeleteData(this, context); + } + + @Override + public TRegionReplicaSet getRegionReplicaSet() { + return regionReplicaSet; + } + + public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) { + this.regionReplicaSet = regionReplicaSet; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + final RelationalDeleteDataNode that = (RelationalDeleteDataNode) obj; + return this.getPlanNodeId().equals(that.getPlanNodeId()) + && Objects.equals(this.modEntries, that.modEntries); + } + + @Override + public int hashCode() { + return Objects.hash(getPlanNodeId(), modEntries, progressIndex); + } + + public String toString() { + return String.format( + "RelationalDeleteDataNode-%s[ Deletion: %s, Region: %s, ProgressIndex: %s, SearchIndex: %d]", + getPlanNodeId(), + modEntries, + regionReplicaSet == null ? "Not Assigned" : regionReplicaSet.getRegionId(), + progressIndex == null ? "Not Assigned" : progressIndex, + searchIndex); + } + + @Override + public List<WritePlanNode> splitByPartition(IAnalysis analysis) { + return replicaSets.stream() + .map(r -> new RelationalDeleteDataNode(getPlanNodeId(), modEntries, r, databaseName)) + .collect(Collectors.toList()); + } + + public List<TableDeletionEntry> getModEntries() { + return modEntries; + } + + public String getDatabaseName() { + return databaseName; + } + + @Override + public SearchNode merge(List<SearchNode> searchNodes) { + List<RelationalDeleteDataNode> relationalDeleteDataNodeList = + searchNodes.stream() + .map(searchNode -> (RelationalDeleteDataNode) searchNode) + .collect(Collectors.toList()); + if (relationalDeleteDataNodeList.stream() + .anyMatch( + relationalDeleteDataNode -> + this.getDatabaseName() != null + && !this.getDatabaseName() + .equals(relationalDeleteDataNode.getDatabaseName()))) { + throw new IllegalArgumentException("All database name need to be same"); + } + List<TableDeletionEntry> allTableDeletionEntries = + relationalDeleteDataNodeList.stream() + .map(RelationalDeleteDataNode::getModEntries) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + return new RelationalDeleteDataNode(this.getPlanNodeId(), allTableDeletionEntries, databaseName) + .setSearchIndex(getSearchIndex()); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java index ecae2cd7197..e9c614e7a98 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java @@ -45,8 +45,9 @@ public abstract class SearchNode extends WritePlanNode { } /** Search index should start from 1 */ - public void setSearchIndex(long searchIndex) { + public SearchNode setSearchIndex(long searchIndex) { this.searchIndex = searchIndex; + return this; } public abstract SearchNode merge(List<SearchNode> searchNodes);
