This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 41393abf9f6 Only send table deletion to related region groups (#17896)
41393abf9f6 is described below
commit 41393abf9f6b513140f355f13ec589d2990e346a
Author: Jiang Tian <[email protected]>
AuthorDate: Thu Jun 11 15:08:56 2026 +0800
Only send table deletion to related region groups (#17896)
* Only send deletion to related region group
* Fix table pipe tsfile mods IT race
---------
Co-authored-by: Caideyipi <[email protected]>
---
.../IoTDBPipeTsFileDecompositionWithModsIT.java | 24 +++++--
.../db/queryengine/plan/analyze/AnalyzeUtils.java | 34 +++++++---
.../queryengine/plan/analyze/AnalyzeUtilsTest.java | 74 ++++++++++++++++++++++
3 files changed, 118 insertions(+), 14 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileDecompositionWithModsIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileDecompositionWithModsIT.java
index 22e042e2853..6dfede5a8ec 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileDecompositionWithModsIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeTsFileDecompositionWithModsIT.java
@@ -129,17 +129,31 @@ public class IoTDBPipeTsFileDecompositionWithModsIT
extends AbstractPipeTableMod
executeNonQueryWithRetry(senderEnv, "FLUSH");
+ HashSet<String> expectedResults = new HashSet<>();
+ expectedResults.add(
+
"t1,t1,t1,t1,1,1.0,1,1970-01-01T00:00:00.001Z,1,1.0,1970-01-01,1,1970-01-01T00:00:00.001Z,");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ senderEnv,
+ TableModelUtils.getQuerySql("table1"),
+ TableModelUtils.generateHeaderResults(),
+ expectedResults,
+ "sg1");
+
+ TestUtils.assertDataEventuallyOnEnv(
+ senderEnv,
+ "SELECT s4 FROM table1 WHERE time >= 2 AND time <= 4",
+ "s4,",
+ Collections.emptySet(),
+ "sg1");
+
executeNonQueryWithRetry(
senderEnv,
String.format(
- "CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true',
'capture.table'='true') WITH CONNECTOR('ip'='%s', 'port'='%s',
'username'='root', 'format'='tablet')",
+ "CREATE PIPE test_pipe WITH SOURCE ('mods.enable'='true',
'capture.table'='true', 'inclusion'='data.insert,data.delete') WITH
CONNECTOR('ip'='%s', 'port'='%s', 'username'='root', 'format'='tablet')",
receiverEnv.getDataNodeWrapperList().get(0).getIp(),
receiverEnv.getDataNodeWrapperList().get(0).getPort()));
- HashSet<String> expectedResults = new HashSet<>();
- expectedResults.add(
-
"t1,t1,t1,t1,1,1.0,1,1970-01-01T00:00:00.001Z,1,1.0,1970-01-01,1,1970-01-01T00:00:00.001Z,");
-
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
TableModelUtils.getQuerySql("table1"),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
index d00ee54428b..6d619914c09 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.queryengine.plan.analyze;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.exception.IoTDBException;
@@ -39,7 +38,8 @@ import
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.StringLitera
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp;
import org.apache.iotdb.db.i18n.DataNodeQueryMessages;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
@@ -320,18 +320,34 @@ public class AnalyzeUtils {
try (final ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
- // TODO: may use time and db/table to filter
- final TRegionRouteMapResp latestRegionRouteMap =
configNodeClient.getLatestRegionRouteMap();
- final Set<TRegionReplicaSet> replicaSets = new HashSet<>();
- latestRegionRouteMap.getRegionRouteMap().entrySet().stream()
- .filter(e -> e.getKey().getType() == TConsensusGroupType.DataRegion)
- .forEach(e -> replicaSets.add(e.getValue()));
- node.setReplicaSets(replicaSets);
+ node.setReplicaSets(fetchDeleteReplicaSets(configNodeClient, node));
+ } catch (final IoTDBRuntimeException e) {
+ throw e;
} catch (final Exception e) {
throw new IoTDBRuntimeException(e,
TSStatusCode.CAN_NOT_CONNECT_CONFIGNODE.getStatusCode());
}
}
+ static Set<TRegionReplicaSet> fetchDeleteReplicaSets(
+ final ConfigNodeClient configNodeClient, final Delete node) throws
Exception {
+ final Set<TRegionReplicaSet> replicaSets = new HashSet<>();
+ for (final TableDeletionEntry tableDeletionEntry :
node.getTableDeletionEntries()) {
+ final TGetRegionGroupsByTimeResp resp =
+ configNodeClient.getRegionGroupsByTime(
+ new TGetRegionGroupsByTimeReq(
+ node.getDatabaseName(),
+ tableDeletionEntry.getStartTime(),
+ tableDeletionEntry.getEndTime()));
+ if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(resp.getStatus());
+ }
+ if (resp.isSetRegionReplicaSets()) {
+ replicaSets.addAll(resp.getRegionReplicaSets());
+ }
+ }
+ return replicaSets;
+ }
+
@SuppressWarnings("java:S3655") // optional is checked
public static String getDatabaseName(final Delete node, final
MPPQueryContext queryContext) {
final String databaseName;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtilsTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtilsTest.java
index 5d0ccd74f45..bbeaa629d4f 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtilsTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtilsTest.java
@@ -19,20 +19,40 @@
package org.apache.iotdb.db.queryengine.plan.analyze;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.ComparisonExpression;
import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Identifier;
import
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.LongLiteral;
+import
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.QualifiedName;
+import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Table;
import org.apache.iotdb.commons.schema.table.TsTable;
import org.apache.iotdb.commons.schema.table.column.TimeColumnSchema;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeResp;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
import
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.TimeRange;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class AnalyzeUtilsTest {
@@ -52,4 +72,58 @@ public class AnalyzeUtilsTest {
assertEquals(Long.MIN_VALUE, entries.get(0).getStartTime());
assertEquals(100, entries.get(0).getEndTime());
}
+
+ @Test
+ public void testFetchDeleteReplicaSetsOnlyQueriesTargetDatabaseRegions()
throws Exception {
+ final Delete delete = new Delete(new Table(QualifiedName.of("table1")));
+ delete.setDatabaseName("root.db1");
+ delete.setTableDeletionEntries(
+ Arrays.asList(
+ new TableDeletionEntry(new DeletionPredicate("table1"), new
TimeRange(10, 20)),
+ new TableDeletionEntry(new DeletionPredicate("table1"), new
TimeRange(30, 40))));
+
+ final TRegionReplicaSet regionReplicaSet1 = dataRegionReplicaSet(1);
+ final TRegionReplicaSet regionReplicaSet2 = dataRegionReplicaSet(2);
+ final TGetRegionGroupsByTimeResp resp1 =
+ successRegionGroupsResp(Collections.singleton(regionReplicaSet1));
+ final TGetRegionGroupsByTimeResp resp2 =
+ successRegionGroupsResp(new HashSet<>(Arrays.asList(regionReplicaSet1,
regionReplicaSet2)));
+ final ConfigNodeClient configNodeClient =
Mockito.mock(ConfigNodeClient.class);
+ Mockito.when(
+
configNodeClient.getRegionGroupsByTime(Mockito.any(TGetRegionGroupsByTimeReq.class)))
+ .thenReturn(resp1, resp2);
+
+ final Set<TRegionReplicaSet> result =
+ AnalyzeUtils.fetchDeleteReplicaSets(configNodeClient, delete);
+
+ assertEquals(2, result.size());
+ assertTrue(result.contains(regionReplicaSet1));
+ assertTrue(result.contains(regionReplicaSet2));
+
+ final ArgumentCaptor<TGetRegionGroupsByTimeReq> reqCaptor =
+ ArgumentCaptor.forClass(TGetRegionGroupsByTimeReq.class);
+ Mockito.verify(configNodeClient,
Mockito.times(2)).getRegionGroupsByTime(reqCaptor.capture());
+ Mockito.verify(configNodeClient,
Mockito.never()).getLatestRegionRouteMap();
+
+ final List<TGetRegionGroupsByTimeReq> requests = reqCaptor.getAllValues();
+ assertEquals("root.db1", requests.get(0).getDatabase());
+ assertEquals(10, requests.get(0).getStartTime());
+ assertEquals(20, requests.get(0).getEndTime());
+ assertEquals("root.db1", requests.get(1).getDatabase());
+ assertEquals(30, requests.get(1).getStartTime());
+ assertEquals(40, requests.get(1).getEndTime());
+ }
+
+ private static TGetRegionGroupsByTimeResp successRegionGroupsResp(
+ final Set<TRegionReplicaSet> replicaSets) {
+ final TGetRegionGroupsByTimeResp resp =
+ new TGetRegionGroupsByTimeResp(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ resp.setRegionReplicaSets(replicaSets);
+ return resp;
+ }
+
+ private static TRegionReplicaSet dataRegionReplicaSet(final int regionId) {
+ return new TRegionReplicaSet(
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, regionId),
Collections.emptyList());
+ }
}