This is an automated email from the ASF dual-hosted git repository. jt2594838 pushed a commit to branch fix_deletion_broadcase in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f704f17ab7e7d0394d90ffac25eca00111110786 Author: Tian Jiang <[email protected]> AuthorDate: Wed Jun 10 10:23:26 2026 +0800 Only send deletion to related region group --- .../db/queryengine/plan/analyze/AnalyzeUtils.java | 34 +++++++--- .../queryengine/plan/analyze/AnalyzeUtilsTest.java | 74 ++++++++++++++++++++++ 2 files changed, 99 insertions(+), 9 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java index d00ee54428b..6d619914c09 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.queryengine.plan.analyze; -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.exception.IoTDBException; @@ -39,7 +38,8 @@ import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.StringLitera import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema; import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; -import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp; import org.apache.iotdb.db.i18n.DataNodeQueryMessages; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; @@ -320,18 +320,34 @@ public class AnalyzeUtils { try (final ConfigNodeClient configNodeClient = ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - // TODO: may use time and db/table to filter - final TRegionRouteMapResp latestRegionRouteMap = configNodeClient.getLatestRegionRouteMap(); - final Set<TRegionReplicaSet> replicaSets = new HashSet<>(); - latestRegionRouteMap.getRegionRouteMap().entrySet().stream() - .filter(e -> e.getKey().getType() == TConsensusGroupType.DataRegion) - .forEach(e -> replicaSets.add(e.getValue())); - node.setReplicaSets(replicaSets); + node.setReplicaSets(fetchDeleteReplicaSets(configNodeClient, node)); + } catch (final IoTDBRuntimeException e) { + throw e; } catch (final Exception e) { throw new IoTDBRuntimeException(e, TSStatusCode.CAN_NOT_CONNECT_CONFIGNODE.getStatusCode()); } } + static Set<TRegionReplicaSet> fetchDeleteReplicaSets( + final ConfigNodeClient configNodeClient, final Delete node) throws Exception { + final Set<TRegionReplicaSet> replicaSets = new HashSet<>(); + for (final TableDeletionEntry tableDeletionEntry : node.getTableDeletionEntries()) { + final TGetRegionGroupsByTimeResp resp = + configNodeClient.getRegionGroupsByTime( + new TGetRegionGroupsByTimeReq( + node.getDatabaseName(), + tableDeletionEntry.getStartTime(), + tableDeletionEntry.getEndTime())); + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new IoTDBRuntimeException(resp.getStatus()); + } + if (resp.isSetRegionReplicaSets()) { + replicaSets.addAll(resp.getRegionReplicaSets()); + } + } + return replicaSets; + } + @SuppressWarnings("java:S3655") // optional is checked public static String getDatabaseName(final Delete node, final MPPQueryContext queryContext) { final String databaseName; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtilsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtilsTest.java index 5d0ccd74f45..bbeaa629d4f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtilsTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtilsTest.java @@ -19,20 +19,40 @@ package org.apache.iotdb.db.queryengine.plan.analyze; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.ComparisonExpression; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Identifier; import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.LongLiteral; +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.QualifiedName; +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Table; import org.apache.iotdb.commons.schema.table.TsTable; import org.apache.iotdb.commons.schema.table.column.TimeColumnSchema; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq; +import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp; +import org.apache.iotdb.db.protocol.client.ConfigNodeClient; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete; +import org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.TimeRange; import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class AnalyzeUtilsTest { @@ -52,4 +72,58 @@ public class AnalyzeUtilsTest { assertEquals(Long.MIN_VALUE, entries.get(0).getStartTime()); assertEquals(100, entries.get(0).getEndTime()); } + + @Test + public void testFetchDeleteReplicaSetsOnlyQueriesTargetDatabaseRegions() throws Exception { + final Delete delete = new Delete(new Table(QualifiedName.of("table1"))); + delete.setDatabaseName("root.db1"); + delete.setTableDeletionEntries( + Arrays.asList( + new TableDeletionEntry(new DeletionPredicate("table1"), new TimeRange(10, 20)), + new TableDeletionEntry(new DeletionPredicate("table1"), new TimeRange(30, 40)))); + + final TRegionReplicaSet regionReplicaSet1 = dataRegionReplicaSet(1); + final TRegionReplicaSet regionReplicaSet2 = dataRegionReplicaSet(2); + final TGetRegionGroupsByTimeResp resp1 = + successRegionGroupsResp(Collections.singleton(regionReplicaSet1)); + final TGetRegionGroupsByTimeResp resp2 = + successRegionGroupsResp(new HashSet<>(Arrays.asList(regionReplicaSet1, regionReplicaSet2))); + final ConfigNodeClient configNodeClient = Mockito.mock(ConfigNodeClient.class); + Mockito.when( + configNodeClient.getRegionGroupsByTime(Mockito.any(TGetRegionGroupsByTimeReq.class))) + .thenReturn(resp1, resp2); + + final Set<TRegionReplicaSet> result = + AnalyzeUtils.fetchDeleteReplicaSets(configNodeClient, delete); + + assertEquals(2, result.size()); + assertTrue(result.contains(regionReplicaSet1)); + assertTrue(result.contains(regionReplicaSet2)); + + final ArgumentCaptor<TGetRegionGroupsByTimeReq> reqCaptor = + ArgumentCaptor.forClass(TGetRegionGroupsByTimeReq.class); + Mockito.verify(configNodeClient, Mockito.times(2)).getRegionGroupsByTime(reqCaptor.capture()); + Mockito.verify(configNodeClient, Mockito.never()).getLatestRegionRouteMap(); + + final List<TGetRegionGroupsByTimeReq> requests = reqCaptor.getAllValues(); + assertEquals("root.db1", requests.get(0).getDatabase()); + assertEquals(10, requests.get(0).getStartTime()); + assertEquals(20, requests.get(0).getEndTime()); + assertEquals("root.db1", requests.get(1).getDatabase()); + assertEquals(30, requests.get(1).getStartTime()); + assertEquals(40, requests.get(1).getEndTime()); + } + + private static TGetRegionGroupsByTimeResp successRegionGroupsResp( + final Set<TRegionReplicaSet> replicaSets) { + final TGetRegionGroupsByTimeResp resp = + new TGetRegionGroupsByTimeResp(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + resp.setRegionReplicaSets(replicaSets); + return resp; + } + + private static TRegionReplicaSet dataRegionReplicaSet(final int regionId) { + return new TRegionReplicaSet( + new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId), Collections.emptyList()); + } }
