This is an automated email from the ASF dual-hosted git repository. CRZbulabula pushed a commit to branch support-multi-region-migrate in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 029a272e3a7dcd0a345af564b60858308d7129de Author: Yongzao <[email protected]> AuthorDate: Sun Jun 28 13:42:46 2026 +0800 Support migrating multiple regions in one MIGRATE REGION statement MIGRATE REGION previously accepted a single region id. Extend it to accept a comma-separated list of region ids while keeping a single source (FROM) and destination (TO) DataNode: MIGRATE REGION 1, 2, 3 FROM 4 TO 5 The list of regions is migrated from the source DataNode to the destination DataNode and the per-region results are aggregated into the response sub-status, mirroring the existing EXTEND/REMOVE REGION reporting. Both the tree model and table model grammars are updated. Changes span grammar (IoTDBSqlParser.g4, RelationalSql.g4), the thrift TMigrateRegionReq (regionId -> list<i32> regionIds), the parsers, the MigrateRegion AST node / MigrateRegionStatement, and ProcedureManager, which now resolves the fixed source/destination DataNodes once and loops over the deduped region ids submitting one RegionMigrateProcedure each. Adds tree- and table-model parser unit tests and a 1C5D integration test that migrates multiple regions in a single statement. --- .../commit/IoTDBMigrateMultiRegionForIoTV1IT.java | 195 +++++++++++++++++++++ .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 2 +- .../iotdb/confignode/manager/ProcedureManager.java | 187 +++++++++++++------- .../config/executor/ClusterConfigTaskExecutor.java | 2 +- .../config/metadata/region/MigrateRegionTask.java | 2 +- .../db/queryengine/plan/parser/ASTVisitor.java | 6 +- .../plan/relational/sql/ast/MigrateRegion.java | 22 +-- .../plan/relational/sql/parser/AstBuilder.java | 6 +- .../metadata/region/MigrateRegionStatement.java | 14 +- .../parser/MigrateRegionMultiRegionParseTest.java | 63 +++++++ .../MigrateRegionMultiRegionStatementTest.java | 75 ++++++++ .../db/relational/grammar/sql/RelationalSql.g4 | 2 +- .../src/main/thrift/confignode.thrift | 2 +- 13 files changed, 484 insertions(+), 94 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/commit/IoTDBMigrateMultiRegionForIoTV1IT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/commit/IoTDBMigrateMultiRegionForIoTV1IT.java new file mode 100644 index 00000000000..7e9be5ccf99 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/commit/IoTDBMigrateMultiRegionForIoTV1IT.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.regionmigration.pass.commit; + +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; + +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly; + +@Category({ClusterIT.class}) +@RunWith(IoTDBTestRunner.class) +public class IoTDBMigrateMultiRegionForIoTV1IT extends IoTDBRegionOperationReliabilityITFramework { + private static final String MULTI_REGION_MIGRATE_FORMAT = "migrate region %s from %d to %d"; + + private static final Logger LOGGER = + LoggerFactory.getLogger(IoTDBMigrateMultiRegionForIoTV1IT.class); + + /** + * Migrate multiple regions from one source DataNode to one destination DataNode in a single + * statement: {@code migrate region r1,r2 from src to dest}. Both regions must leave the source + * and land on the destination. + */ + @Test + public void multiRegionMigrateTest() throws Exception { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) + .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setDataReplicationFactor(1) + .setSchemaReplicationFactor(1); + + EnvFactory.getEnv().initClusterEnvironment(1, 5); + + try (final Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection()); + final Statement statement = makeItCloseQuietly(connection.createStatement()); + SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + // prepare data + statement.execute(INSERTION1); + statement.execute(FLUSH_COMMAND); + + Map<Integer, Set<Integer>> regionMap = getAllRegionMap(statement); + Set<Integer> allDataNodeId = getAllDataNodes(statement); + + // With replication factor 1 every region lives on exactly one DataNode. Pick a source + // DataNode that hosts at least two regions, then migrate all its regions to a fresh + // destination DataNode. + int sourceDataNode = selectDataNodeHostingMultipleRegions(regionMap); + List<Integer> selectedRegions = regionsOnDataNode(regionMap, sourceDataNode); + int destDataNode = + findDataNodeNotContainsAnyRegion(allDataNodeId, regionMap, selectedRegions); + + LOGGER.info( + "Multi-region migrate: regions {} from DataNode {} to DataNode {}", + selectedRegions, + sourceDataNode, + destDataNode); + + String command = + String.format( + MULTI_REGION_MIGRATE_FORMAT, + selectedRegions.stream().map(String::valueOf).collect(Collectors.joining(",")), + sourceDataNode, + destDataNode); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .until( + () -> { + try { + statement.execute(command); + return true; + } catch (Exception e) { + String errorMessage = e.getMessage(); + if (errorMessage != null + && errorMessage.contains("successfully submitted") + && errorMessage.contains("failed to submit")) { + LOGGER.warn("Multi-region migrate partially succeeded: {}", errorMessage); + return true; + } + LOGGER.warn("Multi-region migrate failed, retrying: {}", errorMessage); + return false; + } + }); + + Predicate<TShowRegionResp> migratePredicate = + tShowRegionResp -> { + Map<Integer, Set<Integer>> newRegionMap = + getRunningRegionMap(tShowRegionResp.getRegionInfoList()); + return selectedRegions.stream() + .allMatch( + regionId -> { + Set<Integer> dataNodes = newRegionMap.get(regionId); + return dataNodes != null + && dataNodes.contains(destDataNode) + && !dataNodes.contains(sourceDataNode); + }); + }; + + awaitUntilSuccess( + client, + selectedRegions.get(0), + migratePredicate, + Optional.of(destDataNode), + Optional.of(sourceDataNode)); + + regionMap = getAllRegionMap(statement); + for (int regionId : selectedRegions) { + Assert.assertTrue( + "Region " + regionId + " should be on destination DataNode " + destDataNode, + regionMap.get(regionId).contains(destDataNode)); + Assert.assertFalse( + "Region " + regionId + " should have left source DataNode " + sourceDataNode, + regionMap.get(regionId).contains(sourceDataNode)); + } + LOGGER.info("Multi-region migrate test passed"); + } + } + + private int selectDataNodeHostingMultipleRegions(Map<Integer, Set<Integer>> regionMap) { + Map<Integer, Long> regionCountPerDataNode = + regionMap.values().stream() + .flatMap(Set::stream) + .collect(Collectors.groupingBy(dataNodeId -> dataNodeId, Collectors.counting())); + return regionCountPerDataNode.entrySet().stream() + .filter(entry -> entry.getValue() >= 2) + .map(Map.Entry::getKey) + .findFirst() + .orElseThrow( + () -> new RuntimeException("Cannot find a DataNode hosting at least two regions")); + } + + private List<Integer> regionsOnDataNode(Map<Integer, Set<Integer>> regionMap, int dataNodeId) { + return regionMap.entrySet().stream() + .filter(entry -> entry.getValue().contains(dataNodeId)) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + } + + private int findDataNodeNotContainsAnyRegion( + Set<Integer> allDataNodeId, Map<Integer, Set<Integer>> regionMap, List<Integer> regionIds) { + return allDataNodeId.stream() + .filter( + dataNodeId -> + regionIds.stream() + .noneMatch(regionId -> regionMap.get(regionId).contains(dataNodeId))) + .findFirst() + .orElseThrow( + () -> + new RuntimeException( + "Cannot find DataNode that doesn't contain any of the regions")); + } +} diff --git a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 index 2f91b573aef..3acd0ae3713 100644 --- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 +++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 @@ -577,7 +577,7 @@ getSeriesSlotList // ---- Migrate Region migrateRegion - : MIGRATE REGION regionId=INTEGER_LITERAL FROM fromId=INTEGER_LITERAL TO toId=INTEGER_LITERAL + : MIGRATE REGION regionIds+=INTEGER_LITERAL (COMMA regionIds+=INTEGER_LITERAL)* FROM fromId=INTEGER_LITERAL TO toId=INTEGER_LITERAL ; reconstructRegion diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 2beeea6b4af..6b47128eab9 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -179,6 +179,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -823,6 +824,7 @@ public class ProcedureManager { */ private TSStatus checkMigrateRegion( TMigrateRegionReq migrateRegionReq, + int regionId, TConsensusGroupId regionGroupId, TDataNodeLocation originalDataNode, TDataNodeLocation destDataNode, @@ -847,7 +849,7 @@ public class ProcedureManager { failMessage = String.format( "Submit RegionMigrateProcedure failed, because the original DataNode %s doesn't contain Region %s", - migrateRegionReq.getFromId(), migrateRegionReq.getRegionId()); + migrateRegionReq.getFromId(), regionId); } else if (configManager .getPartitionManager() .getAllReplicaSets(destDataNode.getDataNodeId()) @@ -856,7 +858,7 @@ public class ProcedureManager { failMessage = String.format( "Submit RegionMigrateProcedure failed, because the target DataNode %s already contains Region %s", - migrateRegionReq.getToId(), migrateRegionReq.getRegionId()); + migrateRegionReq.getToId(), regionId); } if (failMessage != null) { @@ -1121,76 +1123,129 @@ public class ProcedureManager { public TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) { try (AutoCloseableLock ignoredLock = AutoCloseableLock.acquire(env.getSubmitRegionMigrateLock())) { - TConsensusGroupId regionGroupId; - Optional<TConsensusGroupId> optional = - configManager - .getPartitionManager() - .generateTConsensusGroupIdByRegionId(migrateRegionReq.getRegionId()); - if (optional.isPresent()) { - regionGroupId = optional.get(); - } else { - LOGGER.error(ManagerMessages.GET_REGION_GROUP_ID_FAIL); + // The source and destination DataNodes are fixed for the whole statement, so resolve them + // once and reuse them for every region. + final TDataNodeConfiguration originalDataNodeConfiguration = + configManager.getNodeManager().getRegisteredDataNode(migrateRegionReq.getFromId()); + final TDataNodeConfiguration destDataNodeConfiguration = + configManager.getNodeManager().getRegisteredDataNode(migrateRegionReq.getToId()); + if (originalDataNodeConfiguration == null) { return new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()) - .setMessage(ManagerMessages.GET_REGION_GROUP_ID_FAIL); + .setMessage( + String.format( + "Source DataNode %s does not exist in the cluster", + migrateRegionReq.getFromId())); } - - // find original dn and dest dn - final TDataNodeLocation originalDataNode = - configManager - .getNodeManager() - .getRegisteredDataNode(migrateRegionReq.getFromId()) - .getLocation(); - final TDataNodeLocation destDataNode = - configManager - .getNodeManager() - .getRegisteredDataNode(migrateRegionReq.getToId()) - .getLocation(); - // select coordinator for adding peer - RegionMaintainHandler handler = env.getRegionMaintainHandler(); - // TODO: choose the DataNode which has lowest load - final TDataNodeLocation coordinatorForAddPeer = - handler - .filterDataNodeWithOtherRegionReplica( - regionGroupId, - destDataNode, - NodeStatus.Running, - NodeStatus.Removing, - NodeStatus.ReadOnly) - .orElse(null); - // Select coordinator for removing peer - // For now, destDataNode temporarily acts as the coordinatorForRemovePeer - final TDataNodeLocation coordinatorForRemovePeer = destDataNode; - - TSStatus status = - checkMigrateRegion( - migrateRegionReq, - regionGroupId, - originalDataNode, - destDataNode, - coordinatorForAddPeer); - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - return status; + if (destDataNodeConfiguration == null) { + return new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()) + .setMessage( + String.format( + "Target DataNode %s does not exist in the cluster", + migrateRegionReq.getToId())); + } + final TDataNodeLocation originalDataNode = originalDataNodeConfiguration.getLocation(); + final TDataNodeLocation destDataNode = destDataNodeConfiguration.getLocation(); + final RegionMaintainHandler handler = env.getRegionMaintainHandler(); + + TSStatus resp = new TSStatus(); + StringBuilder messageBuilder = new StringBuilder(); + int total = 0, success = 0; + // dedup region ids while preserving the user-specified order + for (int theRegionId : new LinkedHashSet<>(migrateRegionReq.getRegionIds())) { + total++; + TSStatus subStatus = + migrateOneRegion( + migrateRegionReq, theRegionId, originalDataNode, destDataNode, handler); + if (subStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + messageBuilder.append("region ").append(theRegionId).append(": Successfully submitted\n"); + success++; + } else { + messageBuilder + .append("region ") + .append(theRegionId) + .append(": ") + .append(subStatus.getMessage()) + .append('\n'); + } + resp.addToSubStatus(subStatus); } - // finally, submit procedure - this.executor.submitProcedure( - new RegionMigrateProcedure( - regionGroupId, - originalDataNode, - destDataNode, - coordinatorForAddPeer, - coordinatorForRemovePeer)); - LOGGER.info( - ManagerMessages - .MIGRATEREGION_SUBMIT_REGIONMIGRATEPROCEDURE_SUCCESSFULLY_REGION_ORIGIN_DATANODE, - regionGroupId, - originalDataNode, - destDataNode, - coordinatorForAddPeer, - coordinatorForRemovePeer); + messageBuilder.insert( + 0, + String.format( + "Total regions: %d, successfully submitted: %d, failed to submit: %d\n", + total, success, total - success)); + resp.setCode( + total == success + ? TSStatusCode.SUCCESS_STATUS.getStatusCode() + : TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); + resp.setMessage(messageBuilder.toString()); + return resp; + } + } - return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + private TSStatus migrateOneRegion( + TMigrateRegionReq migrateRegionReq, + int theRegionId, + TDataNodeLocation originalDataNode, + TDataNodeLocation destDataNode, + RegionMaintainHandler handler) { + TConsensusGroupId regionGroupId; + Optional<TConsensusGroupId> optional = + configManager.getPartitionManager().generateTConsensusGroupIdByRegionId(theRegionId); + if (optional.isPresent()) { + regionGroupId = optional.get(); + } else { + LOGGER.error(ManagerMessages.GET_REGION_GROUP_ID_FAIL); + return new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()) + .setMessage(ManagerMessages.GET_REGION_GROUP_ID_FAIL); + } + + // select coordinator for adding peer + // TODO: choose the DataNode which has lowest load + final TDataNodeLocation coordinatorForAddPeer = + handler + .filterDataNodeWithOtherRegionReplica( + regionGroupId, + destDataNode, + NodeStatus.Running, + NodeStatus.Removing, + NodeStatus.ReadOnly) + .orElse(null); + // Select coordinator for removing peer + // For now, destDataNode temporarily acts as the coordinatorForRemovePeer + final TDataNodeLocation coordinatorForRemovePeer = destDataNode; + + TSStatus status = + checkMigrateRegion( + migrateRegionReq, + theRegionId, + regionGroupId, + originalDataNode, + destDataNode, + coordinatorForAddPeer); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; } + + // finally, submit procedure + this.executor.submitProcedure( + new RegionMigrateProcedure( + regionGroupId, + originalDataNode, + destDataNode, + coordinatorForAddPeer, + coordinatorForRemovePeer)); + LOGGER.info( + ManagerMessages + .MIGRATEREGION_SUBMIT_REGIONMIGRATEPROCEDURE_SUCCESSFULLY_REGION_ORIGIN_DATANODE, + regionGroupId, + originalDataNode, + destDataNode, + coordinatorForAddPeer, + coordinatorForRemovePeer); + + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } public TSStatus reconstructRegion(TReconstructRegionReq req) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 420c3c8285f..f2cdafff298 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -3635,7 +3635,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { final TMigrateRegionReq tMigrateRegionReq = new TMigrateRegionReq( - migrateRegionTask.getStatement().getRegionId(), + migrateRegionTask.getStatement().getRegionIds(), migrateRegionTask.getStatement().getFromId(), migrateRegionTask.getStatement().getToId(), migrateRegionTask.getModel()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/region/MigrateRegionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/region/MigrateRegionTask.java index 8c6f2bfe36b..d99f9a36c16 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/region/MigrateRegionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/region/MigrateRegionTask.java @@ -41,7 +41,7 @@ public class MigrateRegionTask implements IConfigTask { public MigrateRegionTask(MigrateRegion migrateRegion) { this.statement = new MigrateRegionStatement( - migrateRegion.getRegionId(), migrateRegion.getFromId(), migrateRegion.getToId()); + migrateRegion.getRegionIds(), migrateRegion.getFromId(), migrateRegion.getToId()); this.model = Model.TABLE; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java index b9e2fe21c7a..a64885754fd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java @@ -4632,10 +4632,10 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> { @Override public Statement visitMigrateRegion(IoTDBSqlParser.MigrateRegionContext ctx) { + List<Integer> regionIds = + ctx.regionIds.stream().map(token -> Integer.parseInt(token.getText())).collect(toList()); return new MigrateRegionStatement( - Integer.parseInt(ctx.regionId.getText()), - Integer.parseInt(ctx.fromId.getText()), - Integer.parseInt(ctx.toId.getText())); + regionIds, Integer.parseInt(ctx.fromId.getText()), Integer.parseInt(ctx.toId.getText())); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/MigrateRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/MigrateRegion.java index 3a305c96136..a24f99b4076 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/MigrateRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/MigrateRegion.java @@ -36,19 +36,20 @@ import java.util.Objects; public class MigrateRegion extends Statement { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(MigrateRegion.class); - private final int regionId; + private final List<Integer> regionIds; private final int fromId; private final int toId; - public MigrateRegion(int regionId, int fromId, int toId) { - this(null, regionId, fromId, toId); + public MigrateRegion(List<Integer> regionIds, int fromId, int toId) { + this(null, regionIds, fromId, toId); } - public MigrateRegion(@Nullable NodeLocation location, int regionId, int fromId, int toId) { + public MigrateRegion( + @Nullable NodeLocation location, List<Integer> regionIds, int fromId, int toId) { super(location); - this.regionId = regionId; + this.regionIds = regionIds; this.fromId = fromId; this.toId = toId; } @@ -60,7 +61,7 @@ public class MigrateRegion extends Statement { @Override public int hashCode() { - return Objects.hash(MigrateRegion.class, regionId, fromId, toId); + return Objects.hash(MigrateRegion.class, regionIds, fromId, toId); } @Override @@ -72,12 +73,12 @@ public class MigrateRegion extends Statement { return false; } MigrateRegion another = (MigrateRegion) obj; - return regionId == another.regionId && fromId == another.fromId && toId == another.toId; + return regionIds.equals(another.regionIds) && fromId == another.fromId && toId == another.toId; } @Override public String toString() { - return String.format("migrate region %d from %d to %d", regionId, fromId, toId); + return String.format("migrate region %s from %d to %d", regionIds, fromId, toId); } @Override @@ -85,8 +86,8 @@ public class MigrateRegion extends Statement { return ((AstVisitor<R, C>) visitor).visitMigrateRegion(this, context); } - public int getRegionId() { - return regionId; + public List<Integer> getRegionIds() { + return regionIds; } public int getFromId() { @@ -101,6 +102,7 @@ public class MigrateRegion extends Statement { public long ramBytesUsed() { long size = INSTANCE_SIZE; size += AstMemoryEstimationHelper.getEstimatedSizeOfNodeLocation(getLocationInternal()); + size += AstMemoryEstimationHelper.getEstimatedSizeOfIntegerList(regionIds); return size; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java index bd56b82042d..bdcb77ac107 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/AstBuilder.java @@ -1571,10 +1571,10 @@ public class AstBuilder extends RelationalSqlBaseVisitor<Node> { @Override public Node visitMigrateRegionStatement(RelationalSqlParser.MigrateRegionStatementContext ctx) { + List<Integer> regionIds = + ctx.regionIds.stream().map(token -> Integer.parseInt(token.getText())).collect(toList()); return new MigrateRegion( - Integer.parseInt(ctx.regionId.getText()), - Integer.parseInt(ctx.fromId.getText()), - Integer.parseInt(ctx.toId.getText())); + regionIds, Integer.parseInt(ctx.fromId.getText()), Integer.parseInt(ctx.toId.getText())); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/region/MigrateRegionStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/region/MigrateRegionStatement.java index f13323542bb..169665645ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/region/MigrateRegionStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/region/MigrateRegionStatement.java @@ -33,26 +33,26 @@ import java.util.List; * * <p>Here is the syntax definition: * - * <p>MIGRATE REGION regionid=INTEGER_LITERAL FROM fromid=INTEGER_LITERAL TO toid=INTEGERLITERAL + * <p>MIGRATE REGION regionIds+=INTEGER_LITERAL (COMMA regionIds+=INTEGER_LITERAL)* FROM + * fromid=INTEGER_LITERAL TO toid=INTEGER_LITERAL */ -// TODO: Whether to support more complex migration, such as, migrate all region from 1, 2 to 5, 6 public class MigrateRegionStatement extends Statement implements IConfigStatement { - private final int regionId; + private final List<Integer> regionIds; private final int fromId; private final int toId; - public MigrateRegionStatement(int regionId, int fromId, int toId) { + public MigrateRegionStatement(List<Integer> regionIds, int fromId, int toId) { super(); - this.regionId = regionId; + this.regionIds = regionIds; this.fromId = fromId; this.toId = toId; } - public int getRegionId() { - return regionId; + public List<Integer> getRegionIds() { + return regionIds; } public int getFromId() { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/MigrateRegionMultiRegionParseTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/MigrateRegionMultiRegionParseTest.java new file mode 100644 index 00000000000..77a573355db --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/parser/MigrateRegionMultiRegionParseTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.parser; + +import org.apache.iotdb.db.queryengine.plan.statement.Statement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.region.MigrateRegionStatement; + +import org.junit.Test; + +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Parsing tests for the tree-model SQL that lets MIGRATE REGION move multiple regions from a single + * source DataNode to a single destination DataNode in one statement. + */ +public class MigrateRegionMultiRegionParseTest { + + private static Statement parse(String sql) { + return StatementGenerator.createStatement(sql, ZoneId.systemDefault()); + } + + @Test + public void testMigrateSingleRegion() { + Statement statement = parse("migrate region 1 from 2 to 3"); + assertTrue(statement instanceof MigrateRegionStatement); + MigrateRegionStatement migrateRegionStatement = (MigrateRegionStatement) statement; + assertEquals(Collections.singletonList(1), migrateRegionStatement.getRegionIds()); + assertEquals(2, migrateRegionStatement.getFromId()); + assertEquals(3, migrateRegionStatement.getToId()); + } + + @Test + public void testMigrateMultipleRegions() { + Statement statement = parse("migrate region 1, 2, 3 from 4 to 5"); + assertTrue(statement instanceof MigrateRegionStatement); + MigrateRegionStatement migrateRegionStatement = (MigrateRegionStatement) statement; + assertEquals(Arrays.asList(1, 2, 3), migrateRegionStatement.getRegionIds()); + assertEquals(4, migrateRegionStatement.getFromId()); + assertEquals(5, migrateRegionStatement.getToId()); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/MigrateRegionMultiRegionStatementTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/MigrateRegionMultiRegionStatementTest.java new file mode 100644 index 00000000000..72ff20f1392 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/sql/parser/MigrateRegionMultiRegionStatementTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.relational.sql.parser; + +import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Statement; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.InternalClientSession; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.MigrateRegion; + +import org.junit.Before; +import org.junit.Test; + +import java.time.ZoneId; +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Parsing tests for the table-model SQL that lets MIGRATE REGION move multiple regions from a + * single source DataNode to a single destination DataNode in one statement. + */ +public class MigrateRegionMultiRegionStatementTest { + + private SqlParser sqlParser; + private IClientSession clientSession; + + @Before + public void setUp() { + sqlParser = new SqlParser(); + clientSession = new InternalClientSession("testClient"); + } + + private Statement parse(String sql) { + return sqlParser.createStatement(sql, ZoneId.systemDefault(), clientSession); + } + + @Test + public void testMigrateSingleRegion() { + Statement statement = parse("migrate region 1 from 2 to 3"); + assertTrue(statement instanceof MigrateRegion); + MigrateRegion migrateRegion = (MigrateRegion) statement; + assertEquals(Collections.singletonList(1), migrateRegion.getRegionIds()); + assertEquals(2, migrateRegion.getFromId()); + assertEquals(3, migrateRegion.getToId()); + } + + @Test + public void testMigrateMultipleRegions() { + Statement statement = parse("migrate region 1, 2, 3 from 4 to 5"); + assertTrue(statement instanceof MigrateRegion); + MigrateRegion migrateRegion = (MigrateRegion) statement; + assertEquals(Arrays.asList(1, 2, 3), migrateRegion.getRegionIds()); + assertEquals(4, migrateRegion.getFromId()); + assertEquals(5, migrateRegion.getToId()); + } +} diff --git a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 index 5ae0c5d39e1..e8e3c1937a7 100644 --- a/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 +++ b/iotdb-core/relational-grammar/src/main/antlr4/org/apache/iotdb/db/relational/grammar/sql/RelationalSql.g4 @@ -644,7 +644,7 @@ showSeriesSlotListStatement ; migrateRegionStatement - : MIGRATE REGION regionId=INTEGER_VALUE FROM fromId=INTEGER_VALUE TO toId=INTEGER_VALUE + : MIGRATE REGION regionIds+=INTEGER_VALUE (',' regionIds+=INTEGER_VALUE)* FROM fromId=INTEGER_VALUE TO toId=INTEGER_VALUE ; reconstructRegionStatement diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 852460fb60e..d5ad8928802 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -341,7 +341,7 @@ struct TGetSeriesSlotListResp { } struct TMigrateRegionReq { - 1: required i32 regionId + 1: required list<i32> regionIds 2: required i32 fromId 3: required i32 toId 4: required common.Model model
