This is an automated email from the ASF dual-hosted git repository. zyk pushed a commit to branch rc/1.1.0 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 851ba72edfe1e11c58c308d86e8fd1ea4f54e581 Author: YongzaoDan <[email protected]> AuthorDate: Wed Mar 15 09:25:31 2023 +0800 [To rel/1.1] [IOTDB-5519] Improve the concurrency control of DatabaseSchema (#9325) --- .../consensus/request/ConfigPhysicalPlan.java | 16 +- .../request/read/database/CountDatabasePlan.java | 2 +- .../read/partition/GetSeriesSlotListPlan.java | 2 +- .../read/partition/GetTimeSlotListPlan.java | 2 +- .../request/read/region/GetRegionIdPlan.java | 2 +- .../AdjustMaxRegionGroupNumPlan.java | 11 +- .../DatabaseSchemaPlan.java | 2 +- .../DeleteDatabasePlan.java | 2 +- .../PreDeleteDatabasePlan.java | 2 +- .../SetDataReplicationFactorPlan.java | 30 ++-- .../SetSchemaReplicationFactorPlan.java | 11 +- .../{storagegroup => database}/SetTTLPlan.java | 13 +- .../SetTimePartitionIntervalPlan.java | 13 +- .../confignode/manager/ClusterSchemaManager.java | 191 ++++++++++++--------- .../iotdb/confignode/manager/ConfigManager.java | 92 ++++------ .../apache/iotdb/confignode/manager/IManager.java | 18 +- .../iotdb/confignode/manager/ProcedureManager.java | 2 +- .../manager/partition/PartitionManager.java | 75 +++++++- .../persistence/executor/ConfigPlanExecutor.java | 16 +- .../partition/DatabasePartitionTable.java | 15 +- .../persistence/partition/PartitionInfo.java | 110 ++++++------ .../persistence/schema/ClusterSchemaInfo.java | 87 ++++------ .../procedure/env/ConfigNodeProcedureEnv.java | 17 +- .../procedure/env/DataNodeRemoveHandler.java | 3 + .../impl/schema/DeleteDatabaseProcedure.java | 90 +++++----- .../state/schema/DeleteStorageGroupState.java | 5 +- .../thrift/ConfigNodeRPCServiceProcessor.java | 25 +-- .../request/ConfigPhysicalPlanSerDeTest.java | 18 +- .../persistence/ClusterSchemaInfoTest.java | 2 +- .../confignode/persistence/PartitionInfoTest.java | 2 +- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 2 +- .../src/main/thrift/confignode.thrift | 4 +- 32 files changed, 488 insertions(+), 394 deletions(-) diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java index 23504a6851..62d83e48f3 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java @@ -53,6 +53,14 @@ import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan; +import org.apache.iotdb.confignode.consensus.request.write.database.AdjustMaxRegionGroupNumPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan; +import org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan; import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan; @@ -71,14 +79,6 @@ import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGr import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan; import org.apache.iotdb.confignode.consensus.request.write.region.PollRegionMaintainTaskPlan; import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.AdjustMaxRegionGroupNumPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DatabaseSchemaPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteDatabasePlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteDatabasePlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetSchemaReplicationFactorPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTTLPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTimePartitionIntervalPlan; import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan; import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipePlan; import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/database/CountDatabasePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/database/CountDatabasePlan.java index d247ae20db..0973490be0 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/database/CountDatabasePlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/database/CountDatabasePlan.java @@ -51,7 +51,7 @@ public class CountDatabasePlan extends ConfigPhysicalPlan { this.storageGroupPattern = storageGroupPattern.toArray(new String[0]); } - public String[] getStorageGroupPattern() { + public String[] getDatabasePattern() { return storageGroupPattern; } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/partition/GetSeriesSlotListPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/partition/GetSeriesSlotListPlan.java index 55fec8f439..f916b5b357 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/partition/GetSeriesSlotListPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/partition/GetSeriesSlotListPlan.java @@ -45,7 +45,7 @@ public class GetSeriesSlotListPlan extends ConfigPhysicalPlan { this.partitionType = partitionType; } - public String getStorageGroup() { + public String getDatabase() { return storageGroup; } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/partition/GetTimeSlotListPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/partition/GetTimeSlotListPlan.java index 5b040ba89f..dc36c50151 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/partition/GetTimeSlotListPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/partition/GetTimeSlotListPlan.java @@ -53,7 +53,7 @@ public class GetTimeSlotListPlan extends ConfigPhysicalPlan { this.endTime = endTime; } - public String getStorageGroup() { + public String getDatabase() { return storageGroup; } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/region/GetRegionIdPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/region/GetRegionIdPlan.java index 13b57c9e93..4fee4b98a1 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/region/GetRegionIdPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/region/GetRegionIdPlan.java @@ -58,7 +58,7 @@ public class GetRegionIdPlan extends ConfigPhysicalPlan { this.seriesSlotId = seriesSlotId; } - public String getStorageGroup() { + public String getDatabase() { return storageGroup; } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/AdjustMaxRegionGroupNumPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/AdjustMaxRegionGroupNumPlan.java similarity index 94% rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/AdjustMaxRegionGroupNumPlan.java rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/AdjustMaxRegionGroupNumPlan.java index ad8911b39e..a19b2767c4 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/AdjustMaxRegionGroupNumPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/AdjustMaxRegionGroupNumPlan.java @@ -16,7 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.consensus.request.write.storagegroup; + +package org.apache.iotdb.confignode.consensus.request.write.database; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; @@ -76,8 +77,12 @@ public class AdjustMaxRegionGroupNumPlan extends ConfigPhysicalPlan { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } AdjustMaxRegionGroupNumPlan that = (AdjustMaxRegionGroupNumPlan) o; return maxRegionGroupNumMap.equals(that.maxRegionGroupNumMap); } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/DatabaseSchemaPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/DatabaseSchemaPlan.java similarity index 96% rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/DatabaseSchemaPlan.java rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/DatabaseSchemaPlan.java index 1a0e6d749e..2430b77369 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/DatabaseSchemaPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/DatabaseSchemaPlan.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.confignode.consensus.request.write.storagegroup; +package org.apache.iotdb.confignode.consensus.request.write.database; import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/DeleteDatabasePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/DeleteDatabasePlan.java similarity index 96% rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/DeleteDatabasePlan.java rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/DeleteDatabasePlan.java index 5bf31ebd2d..f026275f9f 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/DeleteDatabasePlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/DeleteDatabasePlan.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.confignode.consensus.request.write.storagegroup; +package org.apache.iotdb.confignode.consensus.request.write.database; import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/PreDeleteDatabasePlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/PreDeleteDatabasePlan.java similarity index 97% rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/PreDeleteDatabasePlan.java rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/PreDeleteDatabasePlan.java index 89c9bd5162..bc9313f92f 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/PreDeleteDatabasePlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/PreDeleteDatabasePlan.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.confignode.consensus.request.write.storagegroup; +package org.apache.iotdb.confignode.consensus.request.write.database; import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/SetDataReplicationFactorPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/SetDataReplicationFactorPlan.java similarity index 74% rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/SetDataReplicationFactorPlan.java rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/SetDataReplicationFactorPlan.java index 7525f58cb2..dd154f8eb9 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/SetDataReplicationFactorPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/SetDataReplicationFactorPlan.java @@ -16,7 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.consensus.request.write.storagegroup; + +package org.apache.iotdb.confignode.consensus.request.write.database; import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; @@ -29,7 +30,7 @@ import java.util.Objects; public class SetDataReplicationFactorPlan extends ConfigPhysicalPlan { - private String storageGroup; + private String database; private int dataReplicationFactor; @@ -37,14 +38,14 @@ public class SetDataReplicationFactorPlan extends ConfigPhysicalPlan { super(ConfigPhysicalPlanType.SetDataReplicationFactor); } - public SetDataReplicationFactorPlan(String storageGroup, int dataReplicationFactor) { + public SetDataReplicationFactorPlan(String database, int dataReplicationFactor) { this(); - this.storageGroup = storageGroup; + this.database = database; this.dataReplicationFactor = dataReplicationFactor; } - public String getStorageGroup() { - return storageGroup; + public String getDatabase() { + return database; } public int getDataReplicationFactor() { @@ -55,27 +56,30 @@ public class SetDataReplicationFactorPlan extends ConfigPhysicalPlan { protected void serializeImpl(DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); - BasicStructureSerDeUtil.write(storageGroup, stream); + BasicStructureSerDeUtil.write(database, stream); stream.writeInt(dataReplicationFactor); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { - storageGroup = BasicStructureSerDeUtil.readString(buffer); + database = BasicStructureSerDeUtil.readString(buffer); dataReplicationFactor = buffer.getInt(); } @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } SetDataReplicationFactorPlan that = (SetDataReplicationFactorPlan) o; - return dataReplicationFactor == that.dataReplicationFactor - && storageGroup.equals(that.storageGroup); + return dataReplicationFactor == that.dataReplicationFactor && database.equals(that.database); } @Override public int hashCode() { - return Objects.hash(storageGroup, dataReplicationFactor); + return Objects.hash(database, dataReplicationFactor); } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/SetSchemaReplicationFactorPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/SetSchemaReplicationFactorPlan.java similarity index 92% rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/SetSchemaReplicationFactorPlan.java rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/SetSchemaReplicationFactorPlan.java index f20b2127e2..5a255c56ef 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/SetSchemaReplicationFactorPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/SetSchemaReplicationFactorPlan.java @@ -16,7 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.consensus.request.write.storagegroup; + +package org.apache.iotdb.confignode.consensus.request.write.database; import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; @@ -67,8 +68,12 @@ public class SetSchemaReplicationFactorPlan extends ConfigPhysicalPlan { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } SetSchemaReplicationFactorPlan that = (SetSchemaReplicationFactorPlan) o; return schemaReplicationFactor == that.schemaReplicationFactor && storageGroup.equals(that.storageGroup); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/SetTTLPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/SetTTLPlan.java similarity index 90% rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/SetTTLPlan.java rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/SetTTLPlan.java index 2f9670b278..cd1846019f 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/SetTTLPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/SetTTLPlan.java @@ -16,7 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.consensus.request.write.storagegroup; + +package org.apache.iotdb.confignode.consensus.request.write.database; import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; @@ -77,8 +78,12 @@ public class SetTTLPlan extends ConfigPhysicalPlan { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } SetTTLPlan setTTLPlan = (SetTTLPlan) o; return TTL == setTTLPlan.TTL && Arrays.equals(this.storageGroupPathPattern, setTTLPlan.storageGroupPathPattern); @@ -86,6 +91,6 @@ public class SetTTLPlan extends ConfigPhysicalPlan { @Override public int hashCode() { - return Objects.hash(storageGroupPathPattern, TTL); + return Objects.hash(Arrays.hashCode(storageGroupPathPattern), TTL); } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/SetTimePartitionIntervalPlan.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/SetTimePartitionIntervalPlan.java similarity index 91% rename from confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/SetTimePartitionIntervalPlan.java rename to confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/SetTimePartitionIntervalPlan.java index 926d26d1af..69fcd910c6 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/storagegroup/SetTimePartitionIntervalPlan.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/database/SetTimePartitionIntervalPlan.java @@ -16,7 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.iotdb.confignode.consensus.request.write.storagegroup; + +package org.apache.iotdb.confignode.consensus.request.write.database; import org.apache.iotdb.commons.utils.BasicStructureSerDeUtil; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; @@ -43,7 +44,7 @@ public class SetTimePartitionIntervalPlan extends ConfigPhysicalPlan { this.timePartitionInterval = timePartitionInterval; } - public String getStorageGroup() { + public String getDatabase() { return storageGroup; } @@ -67,8 +68,12 @@ public class SetTimePartitionIntervalPlan extends ConfigPhysicalPlan { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } SetTimePartitionIntervalPlan that = (SetTimePartitionIntervalPlan) o; return timePartitionInterval == that.timePartitionInterval && storageGroup.equals(that.storageGroup); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java index 1d949a621e..8bd2458ed9 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterSchemaManager.java @@ -41,19 +41,20 @@ import org.apache.iotdb.confignode.consensus.request.read.template.GetAllTemplat import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan; import org.apache.iotdb.confignode.consensus.request.read.template.GetSchemaTemplatePlan; import org.apache.iotdb.confignode.consensus.request.read.template.GetTemplateSetInfoPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.AdjustMaxRegionGroupNumPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DatabaseSchemaPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteDatabasePlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetSchemaReplicationFactorPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTTLPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTimePartitionIntervalPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.AdjustMaxRegionGroupNumPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan; import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan; import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan; import org.apache.iotdb.confignode.consensus.request.write.template.PreUnsetSchemaTemplatePlan; import org.apache.iotdb.confignode.consensus.request.write.template.RollbackPreUnsetSchemaTemplatePlan; import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan; import org.apache.iotdb.confignode.consensus.request.write.template.UnsetSchemaTemplatePlan; +import org.apache.iotdb.confignode.consensus.response.database.CountDatabaseResp; import org.apache.iotdb.confignode.consensus.response.database.DatabaseSchemaResp; import org.apache.iotdb.confignode.consensus.response.partition.PathInfoResp; import org.apache.iotdb.confignode.consensus.response.template.AllTemplateSetInfoResp; @@ -72,7 +73,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetAllTemplatesResp; import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp; import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp; import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp; -import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.db.metadata.template.Template; import org.apache.iotdb.db.metadata.template.TemplateInternalRPCUpdateType; import org.apache.iotdb.db.metadata.template.TemplateInternalRPCUtil; @@ -88,11 +88,11 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import static org.apache.iotdb.commons.conf.IoTDBConstant.MAX_DATABASE_NAME_LENGTH; @@ -130,7 +130,7 @@ public class ClusterSchemaManager { } try { - clusterSchemaInfo.checkContainsStorageGroup(databaseSchemaPlan.getSchema().getName()); + clusterSchemaInfo.isDatabaseNameValid(databaseSchemaPlan.getSchema().getName()); } catch (MetadataException metadataException) { // Reject if StorageGroup already set if (metadataException instanceof IllegalPathException) { @@ -158,53 +158,41 @@ public class ClusterSchemaManager { /** Alter Database */ public TSStatus alterDatabase(DatabaseSchemaPlan databaseSchemaPlan) { TSStatus result; - boolean isDatabaseExisted; - TDatabaseSchema storageGroupSchema = databaseSchemaPlan.getSchema(); + TDatabaseSchema databaseSchema = databaseSchemaPlan.getSchema(); - try { - isDatabaseExisted = clusterSchemaInfo.isDatabaseExisted(storageGroupSchema.getName()); - } catch (IllegalPathException e) { - // Reject if DatabaseName is illegal - result = new TSStatus(TSStatusCode.ILLEGAL_PATH.getStatusCode()); - result.setMessage("Failed to alter database. " + e.getMessage()); - return result; - } - - if (!isDatabaseExisted) { + if (!isDatabaseExist(databaseSchema.getName())) { // Reject if Database doesn't exist result = new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()); result.setMessage( - "Failed to alter database. The Database " - + storageGroupSchema.getName() - + " doesn't exist."); + "Failed to alter database. The Database " + databaseSchema.getName() + " doesn't exist."); return result; } - if (storageGroupSchema.isSetMinSchemaRegionGroupNum()) { + if (databaseSchema.isSetMinSchemaRegionGroupNum()) { // Validate alter SchemaRegionGroupNum int minSchemaRegionGroupNum = - getMinRegionGroupNum(storageGroupSchema.getName(), TConsensusGroupType.SchemaRegion); - if (storageGroupSchema.getMinSchemaRegionGroupNum() <= minSchemaRegionGroupNum) { + getMinRegionGroupNum(databaseSchema.getName(), TConsensusGroupType.SchemaRegion); + if (databaseSchema.getMinSchemaRegionGroupNum() <= minSchemaRegionGroupNum) { result = new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode()); result.setMessage( String.format( "Failed to alter database. The SchemaRegionGroupNum could only be increased. " + "Current SchemaRegionGroupNum: %d, Alter SchemaRegionGroupNum: %d", - minSchemaRegionGroupNum, storageGroupSchema.getMinSchemaRegionGroupNum())); + minSchemaRegionGroupNum, databaseSchema.getMinSchemaRegionGroupNum())); return result; } } - if (storageGroupSchema.isSetMinDataRegionGroupNum()) { + if (databaseSchema.isSetMinDataRegionGroupNum()) { // Validate alter DataRegionGroupNum int minDataRegionGroupNum = - getMinRegionGroupNum(storageGroupSchema.getName(), TConsensusGroupType.DataRegion); - if (storageGroupSchema.getMinDataRegionGroupNum() <= minDataRegionGroupNum) { + getMinRegionGroupNum(databaseSchema.getName(), TConsensusGroupType.DataRegion); + if (databaseSchema.getMinDataRegionGroupNum() <= minDataRegionGroupNum) { result = new TSStatus(TSStatusCode.DATABASE_CONFIG_ERROR.getStatusCode()); result.setMessage( String.format( "Failed to alter database. The DataRegionGroupNum could only be increased. " + "Current DataRegionGroupNum: %d, Alter DataRegionGroupNum: %d", - minDataRegionGroupNum, storageGroupSchema.getMinDataRegionGroupNum())); + minDataRegionGroupNum, databaseSchema.getMinDataRegionGroupNum())); return result; } } @@ -213,10 +201,9 @@ public class ClusterSchemaManager { return getConsensusManager().write(databaseSchemaPlan).getStatus(); } - /** Delete StorageGroup synchronized to protect the safety of adjustMaxRegionGroupNum */ - public synchronized TSStatus deleteStorageGroup(DeleteDatabasePlan deleteDatabasePlan) { + /** Delete DatabaseSchema. */ + public TSStatus deleteDatabase(DeleteDatabasePlan deleteDatabasePlan) { TSStatus result = getConsensusManager().write(deleteDatabasePlan).getStatus(); - // Adjust the maximum RegionGroup number of each StorageGroup after deleting the storage group if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { adjustMaxRegionGroupNum(); } @@ -224,80 +211,82 @@ public class ClusterSchemaManager { } /** - * Count StorageGroups by specific path pattern + * Count Databases by specified path pattern. Notice: including pre-deleted Database. * - * @return CountStorageGroupResp + * <p>Notice: Only invoke this interface in ConfigManager + * + * @return CountDatabaseResp */ - public DataSet countMatchedStorageGroups(CountDatabasePlan countDatabasePlan) { - return getConsensusManager().read(countDatabasePlan).getDataset(); + public CountDatabaseResp countMatchedDatabases(CountDatabasePlan countDatabasePlan) { + return (CountDatabaseResp) getConsensusManager().read(countDatabasePlan).getDataset(); } /** - * Get StorageGroupSchemas by specific path pattern + * Get DatabaseSchemas by specified path pattern. Notice: including pre-deleted Database + * + * <p>Notice: Only invoke this interface in ConfigManager * - * @return StorageGroupSchemaDataSet + * @return DatabaseSchemaResp */ - public DataSet getMatchedStorageGroupSchema(GetDatabasePlan getStorageGroupPlan) { - return getConsensusManager().read(getStorageGroupPlan).getDataset(); + public DatabaseSchemaResp getMatchedDatabaseSchema(GetDatabasePlan getStorageGroupPlan) { + return (DatabaseSchemaResp) getConsensusManager().read(getStorageGroupPlan).getDataset(); } - /** Only used in cluster tool show StorageGroup */ - public TShowDatabaseResp showStorageGroup(GetDatabasePlan getStorageGroupPlan) { + /** Only used in cluster tool show Databases. */ + public TShowDatabaseResp showDatabase(GetDatabasePlan getStorageGroupPlan) { DatabaseSchemaResp databaseSchemaResp = - (DatabaseSchemaResp) getMatchedStorageGroupSchema(getStorageGroupPlan); + (DatabaseSchemaResp) getConsensusManager().read(getStorageGroupPlan).getDataset(); if (databaseSchemaResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - // Return immediately if some StorageGroups doesn't exist + // Return immediately if some Database doesn't exist return new TShowDatabaseResp().setStatus(databaseSchemaResp.getStatus()); } Map<String, TDatabaseInfo> infoMap = new ConcurrentHashMap<>(); - for (TDatabaseSchema storageGroupSchema : databaseSchemaResp.getSchemaMap().values()) { - String database = storageGroupSchema.getName(); - TDatabaseInfo storageGroupInfo = new TDatabaseInfo(); - storageGroupInfo.setName(database); - storageGroupInfo.setTTL(storageGroupSchema.getTTL()); - storageGroupInfo.setSchemaReplicationFactor(storageGroupSchema.getSchemaReplicationFactor()); - storageGroupInfo.setDataReplicationFactor(storageGroupSchema.getDataReplicationFactor()); - storageGroupInfo.setTimePartitionInterval(storageGroupSchema.getTimePartitionInterval()); + for (TDatabaseSchema databaseSchema : databaseSchemaResp.getSchemaMap().values()) { + String database = databaseSchema.getName(); + TDatabaseInfo databaseInfo = new TDatabaseInfo(); + databaseInfo.setName(database); + databaseInfo.setTTL(databaseSchema.getTTL()); + databaseInfo.setSchemaReplicationFactor(databaseSchema.getSchemaReplicationFactor()); + databaseInfo.setDataReplicationFactor(databaseSchema.getDataReplicationFactor()); + databaseInfo.setTimePartitionInterval(databaseSchema.getTimePartitionInterval()); + databaseInfo.setMinSchemaRegionNum( + getMinRegionGroupNum(database, TConsensusGroupType.SchemaRegion)); + databaseInfo.setMaxSchemaRegionNum( + getMaxRegionGroupNum(database, TConsensusGroupType.SchemaRegion)); + databaseInfo.setMinDataRegionNum( + getMinRegionGroupNum(database, TConsensusGroupType.DataRegion)); + databaseInfo.setMaxDataRegionNum( + getMaxRegionGroupNum(database, TConsensusGroupType.DataRegion)); try { - storageGroupInfo.setSchemaRegionNum( + databaseInfo.setSchemaRegionNum( getPartitionManager().getRegionGroupCount(database, TConsensusGroupType.SchemaRegion)); - storageGroupInfo.setDataRegionNum( + databaseInfo.setDataRegionNum( getPartitionManager().getRegionGroupCount(database, TConsensusGroupType.DataRegion)); - storageGroupInfo.setMinSchemaRegionNum( - getMinRegionGroupNum(database, TConsensusGroupType.SchemaRegion)); - storageGroupInfo.setMaxSchemaRegionNum( - getMaxRegionGroupNum(database, TConsensusGroupType.SchemaRegion)); - storageGroupInfo.setMinDataRegionNum( - getMinRegionGroupNum(database, TConsensusGroupType.DataRegion)); - storageGroupInfo.setMaxDataRegionNum( - getMaxRegionGroupNum(database, TConsensusGroupType.DataRegion)); } catch (DatabaseNotExistsException e) { - // Return immediately if some StorageGroups doesn't exist - return new TShowDatabaseResp() - .setStatus( - new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) - .setMessage(e.getMessage())); + // Skip pre-deleted Database + LOGGER.warn( + "The Database: {} doesn't exist. Maybe it has been pre-deleted.", + databaseSchema.getName()); + continue; } - infoMap.put(database, storageGroupInfo); + infoMap.put(database, databaseInfo); } return new TShowDatabaseResp().setDatabaseInfoMap(infoMap).setStatus(StatusUtils.OK); } public Map<String, Long> getAllTTLInfo() { - DatabaseSchemaResp databaseSchemaResp = - (DatabaseSchemaResp) - getMatchedStorageGroupSchema(new GetDatabasePlan(Arrays.asList("root", "**"))); + List<String> databases = getDatabaseNames(); Map<String, Long> infoMap = new ConcurrentHashMap<>(); - if (databaseSchemaResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - // Return immediately if some StorageGroups doesn't exist - return infoMap; - } - for (TDatabaseSchema storageGroupSchema : databaseSchemaResp.getSchemaMap().values()) { - infoMap.put(storageGroupSchema.getName(), storageGroupSchema.getTTL()); + for (String database : databases) { + try { + infoMap.put(database, getTTL(database)); + } catch (DatabaseNotExistsException e) { + LOGGER.warn("Database: {} doesn't exist", databases, e); + } } return infoMap; } @@ -392,7 +381,7 @@ public class ClusterSchemaManager { int databaseNum = databaseSchemaMap.size(); for (TDatabaseSchema databaseSchema : databaseSchemaMap.values()) { - if (!getPartitionManager().isDatabaseExisted(databaseSchema.getName())) { + if (!isDatabaseExist(databaseSchema.getName())) { // filter the pre deleted database databaseNum--; } @@ -481,13 +470,25 @@ public class ClusterSchemaManager { // Leader scheduling interfaces // ====================================================== + /** + * Check if the specified Database exists + * + * @param database The specified Database + * @return True if the DatabaseSchema is exists and the Database is not pre-deleted + */ + public boolean isDatabaseExist(String database) { + return getPartitionManager().isDatabaseExist(database); + } + /** * Only leader use this interface. Get all Databases name * * @return List<DatabaseName>, all Databases' name */ public List<String> getDatabaseNames() { - return clusterSchemaInfo.getDatabaseNames(); + return clusterSchemaInfo.getDatabaseNames().stream() + .filter(this::isDatabaseExist) + .collect(Collectors.toList()); } /** @@ -499,6 +500,9 @@ public class ClusterSchemaManager { */ public TDatabaseSchema getDatabaseSchemaByName(String database) throws DatabaseNotExistsException { + if (!isDatabaseExist(database)) { + throw new DatabaseNotExistsException(database); + } return clusterSchemaInfo.getMatchedDatabaseSchemaByName(database); } @@ -509,7 +513,26 @@ public class ClusterSchemaManager { * @return the matched DatabaseSchemas */ public Map<String, TDatabaseSchema> getMatchedDatabaseSchemasByName(List<String> rawPathList) { - return clusterSchemaInfo.getMatchedDatabaseSchemasByName(rawPathList); + Map<String, TDatabaseSchema> result = new ConcurrentHashMap<>(); + clusterSchemaInfo + .getMatchedDatabaseSchemasByName(rawPathList) + .forEach( + (database, databaseSchema) -> { + if (isDatabaseExist(database)) { + result.put(database, databaseSchema); + } + }); + return result; + } + + /** + * Only leader use this interface. Get the TTL of specified Database + * + * @param database DatabaseName + * @throws DatabaseNotExistsException When the specified Database doesn't exist + */ + public long getTTL(String database) throws DatabaseNotExistsException { + return getDatabaseSchemaByName(database).getTTL(); } /** diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 1c3023588f..52aa33493f 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -57,13 +57,13 @@ import org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlo import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan; import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan; +import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan; import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DatabaseSchemaPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetSchemaReplicationFactorPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTTLPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTimePartitionIntervalPlan; import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan; import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan; import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan; @@ -507,11 +507,11 @@ public class ConfigManager implements IManager { } @Override - public DataSet countMatchedStorageGroups(CountDatabasePlan countDatabasePlan) { + public DataSet countMatchedDatabases(CountDatabasePlan countDatabasePlan) { TSStatus status = confirmLeader(); CountDatabaseResp result = new CountDatabaseResp(); if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - return clusterSchemaManager.countMatchedStorageGroups(countDatabasePlan); + return clusterSchemaManager.countMatchedDatabases(countDatabasePlan); } else { result.setStatus(status); } @@ -519,10 +519,10 @@ public class ConfigManager implements IManager { } @Override - public DataSet getMatchedStorageGroupSchemas(GetDatabasePlan getStorageGroupReq) { + public DataSet getMatchedDatabaseSchemas(GetDatabasePlan getDatabaseReq) { TSStatus status = confirmLeader(); if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - return clusterSchemaManager.getMatchedStorageGroupSchema(getStorageGroupReq); + return clusterSchemaManager.getMatchedDatabaseSchema(getDatabaseReq); } else { DatabaseSchemaResp dataSet = new DatabaseSchemaResp(); dataSet.setStatus(status); @@ -551,33 +551,32 @@ public class ConfigManager implements IManager { } @Override - public synchronized TSStatus deleteStorageGroups(List<String> deletedPaths) { + public synchronized TSStatus deleteDatabases(List<String> deletedPaths) { TSStatus status = confirmLeader(); if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { // remove wild - Map<String, TDatabaseSchema> deleteStorageSchemaMap = + Map<String, TDatabaseSchema> deleteDatabaseSchemaMap = getClusterSchemaManager().getMatchedDatabaseSchemasByName(deletedPaths); - if (deleteStorageSchemaMap.isEmpty()) { + if (deleteDatabaseSchemaMap.isEmpty()) { return RpcUtils.getStatus( TSStatusCode.PATH_NOT_EXIST.getStatusCode(), String.format("Path %s does not exist", Arrays.toString(deletedPaths.toArray()))); } - ArrayList<TDatabaseSchema> parsedDeleteStorageGroups = - new ArrayList<>(deleteStorageSchemaMap.values()); - return procedureManager.deleteStorageGroups(parsedDeleteStorageGroups); + ArrayList<TDatabaseSchema> parsedDeleteDatabases = + new ArrayList<>(deleteDatabaseSchemaMap.values()); + return procedureManager.deleteDatabases(parsedDeleteDatabases); } else { return status; } } - private List<TSeriesPartitionSlot> calculateRelatedSlot( - PartialPath path, PartialPath storageGroup) { + private List<TSeriesPartitionSlot> calculateRelatedSlot(PartialPath path, PartialPath database) { // The path contains `**` if (path.getFullPath().contains(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) { return new ArrayList<>(); } // path doesn't contain * so the size of innerPathList should be 1 - PartialPath innerPath = path.alterPrefixPath(storageGroup).get(0); + PartialPath innerPath = path.alterPrefixPath(database).get(0); // The innerPath contains `*` and the only `*` is not in last level if (innerPath.getDevice().contains(IoTDBConstant.ONE_LEVEL_PATH_WILDCARD)) { return new ArrayList<>(); @@ -599,30 +598,28 @@ public class ConfigManager implements IManager { // Build GetSchemaPartitionPlan Map<String, Set<TSeriesPartitionSlot>> partitionSlotsMap = new HashMap<>(); List<PartialPath> relatedPaths = patternTree.getAllPathPatterns(); - List<String> allStorageGroups = getClusterSchemaManager().getDatabaseNames(); - List<PartialPath> allStorageGroupPaths = new ArrayList<>(); - for (String storageGroup : allStorageGroups) { + List<String> allDatabases = getClusterSchemaManager().getDatabaseNames(); + List<PartialPath> allDatabasePaths = new ArrayList<>(); + for (String database : allDatabases) { try { - allStorageGroupPaths.add(new PartialPath(storageGroup)); + allDatabasePaths.add(new PartialPath(database)); } catch (IllegalPathException e) { throw new RuntimeException(e); } } Map<String, Boolean> scanAllRegions = new HashMap<>(); for (PartialPath path : relatedPaths) { - for (int i = 0; i < allStorageGroups.size(); i++) { - String storageGroup = allStorageGroups.get(i); - PartialPath storageGroupPath = allStorageGroupPaths.get(i); - if (path.overlapWith(storageGroupPath.concatNode(MULTI_LEVEL_PATH_WILDCARD)) - && !scanAllRegions.containsKey(storageGroup)) { - List<TSeriesPartitionSlot> relatedSlot = calculateRelatedSlot(path, storageGroupPath); + for (int i = 0; i < allDatabases.size(); i++) { + String database = allDatabases.get(i); + PartialPath databasePath = allDatabasePaths.get(i); + if (path.overlapWith(databasePath.concatNode(MULTI_LEVEL_PATH_WILDCARD)) + && !scanAllRegions.containsKey(database)) { + List<TSeriesPartitionSlot> relatedSlot = calculateRelatedSlot(path, databasePath); if (relatedSlot.isEmpty()) { - scanAllRegions.put(storageGroup, true); - partitionSlotsMap.put(storageGroup, new HashSet<>()); + scanAllRegions.put(database, true); + partitionSlotsMap.put(database, new HashSet<>()); } else { - partitionSlotsMap - .computeIfAbsent(storageGroup, k -> new HashSet<>()) - .addAll(relatedSlot); + partitionSlotsMap.computeIfAbsent(database, k -> new HashSet<>()).addAll(relatedSlot); } } } @@ -657,17 +654,17 @@ public class ConfigManager implements IManager { } List<String> devicePaths = patternTree.getAllDevicePatterns(); - List<String> storageGroups = getClusterSchemaManager().getDatabaseNames(); + List<String> databases = getClusterSchemaManager().getDatabaseNames(); // Build GetOrCreateSchemaPartitionPlan Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap = new HashMap<>(); for (String devicePath : devicePaths) { if (!devicePath.contains("*")) { // Only check devicePaths that without "*" - for (String storageGroup : storageGroups) { - if (PathUtils.isStartWith(devicePath, storageGroup)) { + for (String database : databases) { + if (PathUtils.isStartWith(devicePath, database)) { partitionSlotsMap - .computeIfAbsent(storageGroup, key -> new ArrayList<>()) + .computeIfAbsent(database, key -> new ArrayList<>()) .add(getPartitionManager().getSeriesPartitionSlot(devicePath)); break; } @@ -1345,10 +1342,10 @@ public class ConfigManager implements IManager { } @Override - public TShowDatabaseResp showStorageGroup(GetDatabasePlan getStorageGroupPlan) { + public TShowDatabaseResp showDatabase(GetDatabasePlan getDatabasePlan) { TSStatus status = confirmLeader(); if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - return getClusterSchemaManager().showStorageGroup(getStorageGroupPlan); + return getClusterSchemaManager().showDatabase(getDatabasePlan); } else { return new TShowDatabaseResp().setStatus(status); } @@ -1369,23 +1366,6 @@ public class ConfigManager implements IManager { return retryFailedTasksThread; } - /** - * @param storageGroups the databases to check - * @return List of PartialPath the databases that not exist - */ - public List<PartialPath> checkStorageGroupExist(List<PartialPath> storageGroups) { - List<PartialPath> noExistSg = new ArrayList<>(); - if (storageGroups == null) { - return noExistSg; - } - for (PartialPath storageGroup : storageGroups) { - if (!clusterSchemaManager.getDatabaseNames().contains(storageGroup.toString())) { - noExistSg.add(storageGroup); - } - } - return noExistSg; - } - @Override public void addMetrics() { MetricService.getInstance().addMetricSet(new NodeMetrics(getNodeManager())); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java index b82577df94..8ace5e2f13 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java @@ -34,12 +34,12 @@ import org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlo import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan; import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan; +import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan; import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DatabaseSchemaPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetSchemaReplicationFactorPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTTLPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTimePartitionIntervalPlan; import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan; import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan; import org.apache.iotdb.confignode.manager.consensus.ConsensusManager; @@ -273,14 +273,14 @@ public interface IManager { * * @return The number of matched StorageGroups */ - DataSet countMatchedStorageGroups(CountDatabasePlan countDatabasePlan); + DataSet countMatchedDatabases(CountDatabasePlan countDatabasePlan); /** * Get StorageGroupSchemas * * @return StorageGroupSchemaDataSet */ - DataSet getMatchedStorageGroupSchemas(GetDatabasePlan getOrCountStorageGroupPlan); + DataSet getMatchedDatabaseSchemas(GetDatabasePlan getOrCountStorageGroupPlan); /** * Set Database @@ -302,7 +302,7 @@ public interface IManager { * @param deletedPaths List<StringPattern> * @return status */ - TSStatus deleteStorageGroups(List<String> deletedPaths); + TSStatus deleteDatabases(List<String> deletedPaths); /** * Get SchemaPartition @@ -463,7 +463,7 @@ public interface IManager { * @param getStorageGroupPlan GetStorageGroupPlan, including path patterns about StorageGroups * @return TShowStorageGroupResp */ - TShowDatabaseResp showStorageGroup(GetDatabasePlan getStorageGroupPlan); + TShowDatabaseResp showDatabase(GetDatabasePlan getStorageGroupPlan); /** * create schema template diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index bc53cc14d1..85a6cf3b57 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -150,7 +150,7 @@ public class ProcedureManager { } } - public TSStatus deleteStorageGroups(ArrayList<TDatabaseSchema> deleteSgSchemaList) { + public TSStatus deleteDatabases(ArrayList<TDatabaseSchema> deleteSgSchemaList) { List<Long> procedureIds = new ArrayList<>(); for (TDatabaseSchema storageGroupSchema : deleteSgSchemaList) { DeleteDatabaseProcedure deleteDatabaseProcedure = diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 0ab2017a9f..5e2ad788aa 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -32,6 +32,7 @@ import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.confignode.client.DataNodeRequestType; import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool; import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; @@ -46,12 +47,12 @@ import org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlo import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan; import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteDatabasePlan; import org.apache.iotdb.confignode.consensus.response.partition.DataPartitionResp; import org.apache.iotdb.confignode.consensus.response.partition.GetRegionIdResp; import org.apache.iotdb.confignode.consensus.response.partition.GetSeriesSlotListResp; @@ -180,6 +181,20 @@ public class PartitionManager { * STORAGE_GROUP_NOT_EXIST if some StorageGroup don't exist. */ public SchemaPartitionResp getOrCreateSchemaPartition(GetOrCreateSchemaPartitionPlan req) { + // Check if the related Databases exist + for (String database : req.getPartitionSlotsMap().keySet()) { + if (!isDatabaseExist(database)) { + return new SchemaPartitionResp( + new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) + .setMessage( + String.format( + "Create SchemaPartition failed because the database: %s is not exists", + database)), + false, + null); + } + } + // After all the SchemaPartitions are allocated, // all the read requests about SchemaPartitionTable are parallel. SchemaPartitionResp resp = (SchemaPartitionResp) getSchemaPartition(req); @@ -243,7 +258,16 @@ public class PartitionManager { } } - return (SchemaPartitionResp) getSchemaPartition(req); + resp = (SchemaPartitionResp) getSchemaPartition(req); + if (!resp.isAllPartitionsExist()) { + LOGGER.error( + "Lacked some SchemaPartition allocation result in the response of getOrCreateDataPartition method"); + resp.setStatus( + new TSStatus(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode()) + .setMessage("Lacked some SchemaPartition allocation result in the response")); + return resp; + } + return resp; } /** @@ -256,6 +280,20 @@ public class PartitionManager { * STORAGE_GROUP_NOT_EXIST if some StorageGroup don't exist. */ public DataPartitionResp getOrCreateDataPartition(GetOrCreateDataPartitionPlan req) { + // Check if the related Databases exist + for (String database : req.getPartitionSlotsMap().keySet()) { + if (!isDatabaseExist(database)) { + return new DataPartitionResp( + new TSStatus(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()) + .setMessage( + String.format( + "Create DataPartition failed because the database: %s is not exists", + database)), + false, + null); + } + } + // After all the DataPartitions are allocated, // all the read requests about DataPartitionTable are parallel. DataPartitionResp resp = (DataPartitionResp) getDataPartition(req); @@ -322,10 +360,10 @@ public class PartitionManager { resp = (DataPartitionResp) getDataPartition(req); if (!resp.isAllPartitionsExist()) { LOGGER.error( - "Lacked some data partition allocation result in the response of getOrCreateDataPartition method"); + "Lacked some DataPartition allocation result in the response of getOrCreateDataPartition method"); resp.setStatus( - new TSStatus(TSStatusCode.LACK_DATA_PARTITION_ALLOCATION.getStatusCode()) - .setMessage("Lacked some data partition allocation result in the response")); + new TSStatus(TSStatusCode.LACK_PARTITION_ALLOCATION.getStatusCode()) + .setMessage("Lacked some DataPartition allocation result in the response")); return resp; } return resp; @@ -589,10 +627,35 @@ public class PartitionManager { return partitionInfo.getRegionGroupCount(database, type); } - public boolean isDatabaseExisted(String database) { + /** + * Check if the specified Database exists. + * + * @param database The specified Database + * @return True if the DatabaseSchema is exists and the Database is not pre-deleted + */ + public boolean isDatabaseExist(String database) { return partitionInfo.isDatabaseExisted(database); } + /** + * Filter the un-exist Databases. + * + * @param databases the Databases to check + * @return List of PartialPath the Databases that not exist + */ + public List<PartialPath> filterUnExistDatabases(List<PartialPath> databases) { + List<PartialPath> unExistDatabases = new ArrayList<>(); + if (databases == null) { + return unExistDatabases; + } + for (PartialPath database : databases) { + if (!isDatabaseExist(database.getFullPath())) { + unExistDatabases.add(database); + } + } + return unExistDatabases; + } + /** * Only leader use this interface. * diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index aa07955bf0..03600a1846 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -52,6 +52,14 @@ import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan; +import org.apache.iotdb.confignode.consensus.request.write.database.AdjustMaxRegionGroupNumPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan; +import org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan; import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan; @@ -69,14 +77,6 @@ import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProce import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan; import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.AdjustMaxRegionGroupNumPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DatabaseSchemaPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteDatabasePlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteDatabasePlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetSchemaReplicationFactorPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTTLPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTimePartitionIntervalPlan; import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan; import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipePlan; import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan; diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java index bb0bb153c9..882cee0b14 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java @@ -59,7 +59,8 @@ import java.util.stream.Stream; public class DatabasePartitionTable { private static final Logger LOGGER = LoggerFactory.getLogger(DatabasePartitionTable.class); - private volatile boolean isPredeleted = false; + // Is the Database pre-deleted + private volatile boolean preDeleted = false; // The name of database private String databaseName; @@ -79,12 +80,12 @@ public class DatabasePartitionTable { this.dataPartitionTable = new DataPartitionTable(); } - public boolean isPredeleted() { - return isPredeleted; + public boolean isNotPreDeleted() { + return !preDeleted; } - public void setPredeleted(boolean predeleted) { - isPredeleted = predeleted; + public void setPreDeleted(boolean preDeleted) { + this.preDeleted = preDeleted; } /** @@ -371,7 +372,7 @@ public class DatabasePartitionTable { public void serialize(OutputStream outputStream, TProtocol protocol) throws IOException, TException { - ReadWriteIOUtils.write(isPredeleted, outputStream); + ReadWriteIOUtils.write(preDeleted, outputStream); ReadWriteIOUtils.write(databaseName, outputStream); ReadWriteIOUtils.write(regionGroupMap.size(), outputStream); @@ -386,7 +387,7 @@ public class DatabasePartitionTable { public void deserialize(InputStream inputStream, TProtocol protocol) throws IOException, TException { - isPredeleted = ReadWriteIOUtils.readBool(inputStream); + preDeleted = ReadWriteIOUtils.readBool(inputStream); databaseName = ReadWriteIOUtils.readString(inputStream); int length = ReadWriteIOUtils.readInt(inputStream); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java index ea8ad01653..70b0679e3c 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java @@ -35,15 +35,15 @@ import org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlo import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan; +import org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan; import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan; import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DatabaseSchemaPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteDatabasePlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteDatabasePlan; import org.apache.iotdb.confignode.consensus.response.partition.DataPartitionResp; import org.apache.iotdb.confignode.consensus.response.partition.GetRegionIdResp; import org.apache.iotdb.confignode.consensus.response.partition.GetSeriesSlotListResp; @@ -99,7 +99,7 @@ public class PartitionInfo implements SnapshotProcessor { /** For Cluster Partition */ // For allocating Regions private final AtomicInteger nextRegionGroupId; - // Map<StorageGroupName, StorageGroupPartitionInfo> + // Map<DatabaseName, DatabasePartitionInfo> private final Map<String, DatabasePartitionTable> databasePartitionTables; /** For Region-Maintainer */ @@ -124,15 +124,15 @@ public class PartitionInfo implements SnapshotProcessor { // ====================================================== /** - * Thread-safely create new StorageGroupPartitionInfo + * Thread-safely create new DatabasePartitionTable * - * @param plan SetStorageGroupPlan - * @return SUCCESS_STATUS if the new StorageGroupPartitionInfo is created successfully. + * @param plan DatabaseSchemaPlan + * @return SUCCESS_STATUS if the new DatabasePartitionTable is created successfully. */ public TSStatus createDatabase(DatabaseSchemaPlan plan) { - String storageGroupName = plan.getSchema().getName(); - DatabasePartitionTable databasePartitionTable = new DatabasePartitionTable(storageGroupName); - databasePartitionTables.put(storageGroupName, databasePartitionTable); + String databaseName = plan.getSchema().getName(); + DatabasePartitionTable databasePartitionTable = new DatabasePartitionTable(databaseName); + databasePartitionTables.put(databaseName, databasePartitionTable); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } @@ -248,10 +248,10 @@ public class PartitionInfo implements SnapshotProcessor { } switch (preDeleteType) { case EXECUTE: - databasePartitionTable.setPredeleted(true); + databasePartitionTable.setPreDeleted(true); break; case ROLLBACK: - databasePartitionTable.setPredeleted(false); + databasePartitionTable.setPreDeleted(false); break; default: break; @@ -284,7 +284,7 @@ public class PartitionInfo implements SnapshotProcessor { // Return all SchemaPartitions when the queried PartitionSlots are empty databasePartitionTables.forEach( (storageGroup, databasePartitionTable) -> { - if (!databasePartitionTable.isPredeleted()) { + if (databasePartitionTable.isNotPreDeleted()) { schemaPartition.put(storageGroup, new SchemaPartitionTable()); databasePartitionTable.getSchemaPartition( @@ -300,19 +300,19 @@ public class PartitionInfo implements SnapshotProcessor { // Return the SchemaPartition for each StorageGroup plan.getPartitionSlotsMap() .forEach( - (storageGroup, partitionSlots) -> { - if (isDatabaseExisted(storageGroup)) { - schemaPartition.put(storageGroup, new SchemaPartitionTable()); + (database, partitionSlots) -> { + if (isDatabaseExisted(database)) { + schemaPartition.put(database, new SchemaPartitionTable()); if (!databasePartitionTables - .get(storageGroup) - .getSchemaPartition(partitionSlots, schemaPartition.get(storageGroup))) { + .get(database) + .getSchemaPartition(partitionSlots, schemaPartition.get(database))) { isAllPartitionsExist.set(false); } - if (schemaPartition.get(storageGroup).getSchemaPartitionMap().isEmpty()) { + if (schemaPartition.get(database).getSchemaPartitionMap().isEmpty()) { // Remove empty Map - schemaPartition.remove(storageGroup); + schemaPartition.remove(database); } } else { isAllPartitionsExist.set(false); @@ -339,19 +339,19 @@ public class PartitionInfo implements SnapshotProcessor { plan.getPartitionSlotsMap() .forEach( - (storageGroup, partitionSlots) -> { - if (isDatabaseExisted(storageGroup)) { - dataPartition.put(storageGroup, new DataPartitionTable()); + (database, partitionSlots) -> { + if (isDatabaseExisted(database)) { + dataPartition.put(database, new DataPartitionTable()); if (!databasePartitionTables - .get(storageGroup) - .getDataPartition(partitionSlots, dataPartition.get(storageGroup))) { + .get(database) + .getDataPartition(partitionSlots, dataPartition.get(database))) { isAllPartitionsExist.set(false); } - if (dataPartition.get(storageGroup).getDataPartitionMap().isEmpty()) { + if (dataPartition.get(database).getDataPartitionMap().isEmpty()) { // Remove empty Map - dataPartition.remove(storageGroup); + dataPartition.remove(database); } } else { isAllPartitionsExist.set(false); @@ -387,9 +387,15 @@ public class PartitionInfo implements SnapshotProcessor { } } + /** + * Check if the specified Database exists. + * + * @param database The specified Database + * @return True if the DatabaseSchema is exists and the Database is not pre-deleted + */ public boolean isDatabaseExisted(String database) { final DatabasePartitionTable databasePartitionTable = databasePartitionTables.get(database); - return databasePartitionTable != null && !databasePartitionTable.isPredeleted(); + return databasePartitionTable != null && databasePartitionTable.isNotPreDeleted(); } /** @@ -401,11 +407,9 @@ public class PartitionInfo implements SnapshotProcessor { public TSStatus createSchemaPartition(CreateSchemaPartitionPlan plan) { plan.getAssignedSchemaPartition() .forEach( - (storageGroup, schemaPartitionTable) -> { - if (isDatabaseExisted(storageGroup)) { - databasePartitionTables - .get(storageGroup) - .createSchemaPartition(schemaPartitionTable); + (database, schemaPartitionTable) -> { + if (isDatabaseExisted(database)) { + databasePartitionTables.get(database).createSchemaPartition(schemaPartitionTable); } }); @@ -421,21 +425,21 @@ public class PartitionInfo implements SnapshotProcessor { public TSStatus createDataPartition(CreateDataPartitionPlan plan) { plan.getAssignedDataPartition() .forEach( - (storageGroup, dataPartitionTable) -> { - if (isDatabaseExisted(storageGroup)) { - databasePartitionTables.get(storageGroup).createDataPartition(dataPartitionTable); + (database, dataPartitionTable) -> { + if (isDatabaseExisted(database)) { + databasePartitionTables.get(database).createDataPartition(dataPartitionTable); } }); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } - /** Get SchemaNodeManagementPartition through matched storageGroup */ - public DataSet getSchemaNodeManagementPartition(List<String> matchedStorageGroups) { + /** Get SchemaNodeManagementPartition through matched Database. */ + public DataSet getSchemaNodeManagementPartition(List<String> matchedDatabases) { SchemaNodeManagementResp schemaNodeManagementResp = new SchemaNodeManagementResp(); Map<String, SchemaPartitionTable> schemaPartitionMap = new ConcurrentHashMap<>(); - matchedStorageGroups.stream() + matchedDatabases.stream() .filter(this::isDatabaseExisted) .forEach( storageGroup -> { @@ -533,12 +537,12 @@ public class PartitionInfo implements SnapshotProcessor { Map<String, List<TSeriesPartitionSlot>> result = new ConcurrentHashMap<>(); partitionSlotsMap.forEach( - (storageGroup, partitionSlots) -> { - if (isDatabaseExisted(storageGroup)) { + (database, partitionSlots) -> { + if (isDatabaseExisted(database)) { result.put( - storageGroup, + database, databasePartitionTables - .get(storageGroup) + .get(database) .filterUnassignedSchemaPartitionSlots(partitionSlots)); } }); @@ -558,12 +562,12 @@ public class PartitionInfo implements SnapshotProcessor { Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> result = new ConcurrentHashMap<>(); partitionSlotsMap.forEach( - (storageGroup, partitionSlots) -> { - if (isDatabaseExisted(storageGroup)) { + (database, partitionSlots) -> { + if (isDatabaseExisted(database)) { result.put( - storageGroup, + database, databasePartitionTables - .get(storageGroup) + .get(database) .filterUnassignedDataPartitionSlots(partitionSlots)); } }); @@ -785,11 +789,11 @@ public class PartitionInfo implements SnapshotProcessor { } public DataSet getRegionId(GetRegionIdPlan plan) { - if (!isDatabaseExisted(plan.getStorageGroup())) { + if (!isDatabaseExisted(plan.getDatabase())) { return new GetRegionIdResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), new ArrayList<>()); } - DatabasePartitionTable sgPartitionTable = databasePartitionTables.get(plan.getStorageGroup()); + DatabasePartitionTable sgPartitionTable = databasePartitionTables.get(plan.getDatabase()); return new GetRegionIdResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), sgPartitionTable.getRegionId( @@ -797,11 +801,11 @@ public class PartitionInfo implements SnapshotProcessor { } public DataSet getTimeSlotList(GetTimeSlotListPlan plan) { - if (!isDatabaseExisted(plan.getStorageGroup())) { + if (!isDatabaseExisted(plan.getDatabase())) { return new GetTimeSlotListResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), new ArrayList<>()); } - DatabasePartitionTable sgPartitionTable = databasePartitionTables.get(plan.getStorageGroup()); + DatabasePartitionTable sgPartitionTable = databasePartitionTables.get(plan.getDatabase()); return new GetTimeSlotListResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), sgPartitionTable.getTimeSlotList( @@ -809,11 +813,11 @@ public class PartitionInfo implements SnapshotProcessor { } public DataSet getSeriesSlotList(GetSeriesSlotListPlan plan) { - if (!isDatabaseExisted(plan.getStorageGroup())) { + if (!isDatabaseExisted(plan.getDatabase())) { return new GetSeriesSlotListResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), new ArrayList<>()); } - DatabasePartitionTable sgPartitionTable = databasePartitionTables.get(plan.getStorageGroup()); + DatabasePartitionTable sgPartitionTable = databasePartitionTables.get(plan.getDatabase()); return new GetSeriesSlotListResp( new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()), sgPartitionTable.getSeriesSlotList(plan.getPartitionType())); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java index 20c5b25f87..67e5d0ef63 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ClusterSchemaInfo.java @@ -33,13 +33,13 @@ import org.apache.iotdb.confignode.consensus.request.read.template.CheckTemplate import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan; import org.apache.iotdb.confignode.consensus.request.read.template.GetSchemaTemplatePlan; import org.apache.iotdb.confignode.consensus.request.read.template.GetTemplateSetInfoPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.AdjustMaxRegionGroupNumPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DatabaseSchemaPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteDatabasePlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetSchemaReplicationFactorPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTTLPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTimePartitionIntervalPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.AdjustMaxRegionGroupNumPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan; import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan; import org.apache.iotdb.confignode.consensus.request.write.template.DropSchemaTemplatePlan; import org.apache.iotdb.confignode.consensus.request.write.template.PreUnsetSchemaTemplatePlan; @@ -91,13 +91,13 @@ public class ClusterSchemaInfo implements SnapshotProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(ClusterSchemaInfo.class); - // StorageGroup read write lock + // Database read write lock private final ReentrantReadWriteLock databaseReadWriteLock; private final ConfigMTree mTree; private static final String SNAPSHOT_FILENAME = "cluster_schema.bin"; - private final String ERROR_NAME = "Error StorageGroup name"; + private final String ERROR_NAME = "Error Database name"; private final TemplateTable templateTable; @@ -108,7 +108,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { mTree = new ConfigMTree(); templateTable = new TemplateTable(); } catch (MetadataException e) { - LOGGER.error("Can't construct StorageGroupInfo", e); + LOGGER.error("Can't construct ClusterSchemaInfo", e); throw new IOException(e); } } @@ -127,15 +127,15 @@ public class ClusterSchemaInfo implements SnapshotProcessor { TSStatus result = new TSStatus(); databaseReadWriteLock.writeLock().lock(); try { - // Set StorageGroup - TDatabaseSchema storageGroupSchema = plan.getSchema(); - PartialPath partialPathName = new PartialPath(storageGroupSchema.getName()); + // Set Database + TDatabaseSchema databaseSchema = plan.getSchema(); + PartialPath partialPathName = new PartialPath(databaseSchema.getName()); mTree.setStorageGroup(partialPathName); - // Set StorageGroupSchema + // Set DatabaseSchema mTree .getStorageGroupNodeByStorageGroupPath(partialPathName) - .setStorageGroupSchema(storageGroupSchema); + .setStorageGroupSchema(databaseSchema); result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } catch (MetadataException e) { @@ -234,12 +234,12 @@ public class ClusterSchemaInfo implements SnapshotProcessor { return result; } - /** @return The number of matched StorageGroups by the specific StorageGroup pattern */ + /** @return The number of matched Databases by the specified Database pattern */ public CountDatabaseResp countMatchedDatabases(CountDatabasePlan plan) { CountDatabaseResp result = new CountDatabaseResp(); databaseReadWriteLock.readLock().lock(); try { - PartialPath patternPath = new PartialPath(plan.getStorageGroupPattern()); + PartialPath patternPath = new PartialPath(plan.getDatabasePattern()); result.setCount(mTree.getStorageGroupNum(patternPath, false)); result.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); } catch (MetadataException e) { @@ -253,13 +253,13 @@ public class ClusterSchemaInfo implements SnapshotProcessor { return result; } - /** @return All StorageGroupSchemas that matches to the specific StorageGroup pattern */ + /** @return All DatabaseSchemas that matches to the specified Database pattern */ public DatabaseSchemaResp getMatchedDatabaseSchemas(GetDatabasePlan plan) { DatabaseSchemaResp result = new DatabaseSchemaResp(); databaseReadWriteLock.readLock().lock(); try { Map<String, TDatabaseSchema> schemaMap = new HashMap<>(); - PartialPath patternPath = new PartialPath(plan.getStorageGroupPattern()); + PartialPath patternPath = new PartialPath(plan.getDatabasePattern()); List<PartialPath> matchedPaths = mTree.getMatchedStorageGroups(patternPath, false); for (PartialPath path : matchedPaths) { schemaMap.put( @@ -295,7 +295,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } else { result.setCode(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode()); - result.setMessage("StorageGroup does not exist"); + result.setMessage("Database does not exist"); } } catch (MetadataException e) { LOGGER.error(ERROR_NAME, e); @@ -333,7 +333,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { TSStatus result = new TSStatus(); databaseReadWriteLock.writeLock().lock(); try { - PartialPath path = new PartialPath(plan.getStorageGroup()); + PartialPath path = new PartialPath(plan.getDatabase()); if (mTree.isStorageGroupAlreadySet(path)) { mTree .getStorageGroupNodeByStorageGroupPath(path) @@ -356,7 +356,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { TSStatus result = new TSStatus(); databaseReadWriteLock.writeLock().lock(); try { - PartialPath path = new PartialPath(plan.getStorageGroup()); + PartialPath path = new PartialPath(plan.getDatabase()); if (mTree.isStorageGroupAlreadySet(path)) { mTree .getStorageGroupNodeByStorageGroupPath(path) @@ -376,7 +376,7 @@ public class ClusterSchemaInfo implements SnapshotProcessor { } /** - * Adjust the maximum RegionGroup count of each StorageGroup + * Adjust the maximum RegionGroup count of each Database * * @param plan AdjustMaxRegionGroupCountPlan * @return SUCCESS_STATUS @@ -388,10 +388,10 @@ public class ClusterSchemaInfo implements SnapshotProcessor { for (Map.Entry<String, Pair<Integer, Integer>> entry : plan.getMaxRegionGroupNumMap().entrySet()) { PartialPath path = new PartialPath(entry.getKey()); - TDatabaseSchema storageGroupSchema = + TDatabaseSchema databaseSchema = mTree.getStorageGroupNodeByStorageGroupPath(path).getStorageGroupSchema(); - storageGroupSchema.setMaxSchemaRegionGroupNum(entry.getValue().getLeft()); - storageGroupSchema.setMaxDataRegionGroupNum(entry.getValue().getRight()); + databaseSchema.setMaxSchemaRegionGroupNum(entry.getValue().getLeft()); + databaseSchema.setMaxDataRegionGroupNum(entry.getValue().getRight()); } result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } catch (MetadataException e) { @@ -427,30 +427,16 @@ public class ClusterSchemaInfo implements SnapshotProcessor { } /** - * Only leader use this interface. Check if the specified Database already exists. - * - * @param databaseName The specified Database's name - * @throws IllegalPathException If the specified Database's name is illegal - */ - public boolean isDatabaseExisted(String databaseName) throws IllegalPathException { - databaseReadWriteLock.readLock().lock(); - try { - return mTree.isStorageGroupAlreadySet(new PartialPath(databaseName)); - } finally { - databaseReadWriteLock.readLock().unlock(); - } - } - - /** - * Only leader use this interface. Check if the specific StorageGroup already exists. + * Check if the specified DatabaseName is valid. * - * @param storageName The specific StorageGroup's name - * @throws MetadataException If the specific StorageGroup already exists + * @param databaseName The specified DatabaseName + * @throws MetadataException If the DatabaseName invalid i.e. the specified DatabaseName is + * already exist, or it's a prefix of another DatabaseName */ - public void checkContainsStorageGroup(String storageName) throws MetadataException { + public void isDatabaseNameValid(String databaseName) throws MetadataException { databaseReadWriteLock.readLock().lock(); try { - mTree.checkStorageGroupAlreadySet(new PartialPath(storageName)); + mTree.checkStorageGroupAlreadySet(new PartialPath(databaseName)); } finally { databaseReadWriteLock.readLock().unlock(); } @@ -561,11 +547,10 @@ public class ClusterSchemaInfo implements SnapshotProcessor { @Override public boolean processTakeSnapshot(File snapshotDir) throws IOException { - processMtreeTakeSnapshot(snapshotDir); - return templateTable.processTakeSnapshot(snapshotDir); + return processMTreeTakeSnapshot(snapshotDir) && templateTable.processTakeSnapshot(snapshotDir); } - public boolean processMtreeTakeSnapshot(File snapshotDir) throws IOException { + public boolean processMTreeTakeSnapshot(File snapshotDir) throws IOException { File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME); if (snapshotFile.exists() && snapshotFile.isFile()) { LOGGER.error( @@ -601,11 +586,11 @@ public class ClusterSchemaInfo implements SnapshotProcessor { @Override public void processLoadSnapshot(File snapshotDir) throws IOException { - processMtreeLoadSnapshot(snapshotDir); + processMTreeLoadSnapshot(snapshotDir); templateTable.processLoadSnapshot(snapshotDir); } - public void processMtreeLoadSnapshot(File snapshotDir) throws IOException { + public void processMTreeLoadSnapshot(File snapshotDir) throws IOException { File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME); if (!snapshotFile.exists() || !snapshotFile.isFile()) { LOGGER.error( diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java index ca5377efb1..4bb874c4ba 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java @@ -37,9 +37,9 @@ import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool; import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan; +import org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan; +import org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan; import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteDatabasePlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteDatabasePlan; import org.apache.iotdb.confignode.exception.AddConsensusGroupException; import org.apache.iotdb.confignode.exception.AddPeerException; import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; @@ -122,7 +122,7 @@ public class ConfigNodeProcedureEnv { */ public TSStatus deleteConfig(String name) { DeleteDatabasePlan deleteDatabasePlan = new DeleteDatabasePlan(name); - return getClusterSchemaManager().deleteStorageGroup(deleteDatabasePlan); + return getClusterSchemaManager().deleteDatabase(deleteDatabasePlan); } /** @@ -162,18 +162,21 @@ public class ConfigNodeProcedureEnv { } if (nodeStatus == NodeStatus.Running) { - final TSStatus invalidateSchemaStatus = + // Always invalidate PartitionCache first + final TSStatus invalidatePartitionStatus = SyncDataNodeClientPool.getInstance() .sendSyncRequestToDataNodeWithRetry( dataNodeConfiguration.getLocation().getInternalEndPoint(), invalidateCacheReq, - DataNodeRequestType.INVALIDATE_SCHEMA_CACHE); - final TSStatus invalidatePartitionStatus = + DataNodeRequestType.INVALIDATE_PARTITION_CACHE); + + final TSStatus invalidateSchemaStatus = SyncDataNodeClientPool.getInstance() .sendSyncRequestToDataNodeWithRetry( dataNodeConfiguration.getLocation().getInternalEndPoint(), invalidateCacheReq, - DataNodeRequestType.INVALIDATE_PARTITION_CACHE); + DataNodeRequestType.INVALIDATE_SCHEMA_CACHE); + if (!verifySucceed(invalidatePartitionStatus, invalidateSchemaStatus)) { LOG.error( "Invalidate cache failed, invalidate partition cache status is {}, invalidate schema cache status is {}", diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java index 2d8f038152..afa2d3c543 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java @@ -560,6 +560,9 @@ public class DataNodeRemoveHandler { List<TDataNodeLocation> removeDataNodes = Collections.singletonList(dataNodeLocation); configManager.getConsensusManager().write(new RemoveDataNodePlan(removeDataNodes)); + // Adjust maxRegionGroupNum + configManager.getClusterSchemaManager().adjustMaxRegionGroupNum(); + // Remove metrics PartitionMetrics.unbindDataNodePartitionMetrics( NodeUrlUtils.convertTEndPointUrl(dataNodeLocation.getClientRpcEndPoint())); diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java index 4a49f5ad22..b2beaf2f35 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java @@ -29,8 +29,8 @@ import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils; import org.apache.iotdb.confignode.client.DataNodeRequestType; import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool; import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler; +import org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan; import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.PreDeleteDatabasePlan; import org.apache.iotdb.confignode.manager.partition.PartitionMetrics; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; @@ -61,58 +61,60 @@ public class DeleteDatabaseProcedure private static final Logger LOG = LoggerFactory.getLogger(DeleteDatabaseProcedure.class); private static final int RETRY_THRESHOLD = 5; - private TDatabaseSchema deleteSgSchema; + private TDatabaseSchema deleteDatabaseSchema; public DeleteDatabaseProcedure() { super(); } - public DeleteDatabaseProcedure(TDatabaseSchema deleteSgSchema) { + public DeleteDatabaseProcedure(TDatabaseSchema deleteDatabaseSchema) { super(); - this.deleteSgSchema = deleteSgSchema; + this.deleteDatabaseSchema = deleteDatabaseSchema; } - public TDatabaseSchema getDeleteSgSchema() { - return deleteSgSchema; + public TDatabaseSchema getDeleteDatabaseSchema() { + return deleteDatabaseSchema; } - public void setDeleteSgSchema(TDatabaseSchema deleteSgSchema) { - this.deleteSgSchema = deleteSgSchema; + public void setDeleteDatabaseSchema(TDatabaseSchema deleteDatabaseSchema) { + this.deleteDatabaseSchema = deleteDatabaseSchema; } @Override protected Flow executeFromState(ConfigNodeProcedureEnv env, DeleteStorageGroupState state) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { - if (deleteSgSchema == null) { + if (deleteDatabaseSchema == null) { return Flow.NO_MORE_STATE; } try { switch (state) { - case DELETE_STORAGE_GROUP_PREPARE: - // TODO: lock related ClusterSchemaInfo, PartitionInfo and Regions - setNextState(DeleteStorageGroupState.DELETE_PRE); - break; - case DELETE_PRE: - LOG.info("Pre delete for database {}", deleteSgSchema.getName()); - env.preDelete(PreDeleteDatabasePlan.PreDeleteType.EXECUTE, deleteSgSchema.getName()); + case PRE_DELETE_DATABASE: + LOG.info( + "[DeleteDatabaseProcedure] Pre delete database: {}", deleteDatabaseSchema.getName()); + env.preDelete( + PreDeleteDatabasePlan.PreDeleteType.EXECUTE, deleteDatabaseSchema.getName()); setNextState(DeleteStorageGroupState.INVALIDATE_CACHE); break; case INVALIDATE_CACHE: - LOG.info("Invalidate cache of {}", deleteSgSchema.getName()); - if (env.invalidateCache(deleteSgSchema.getName())) { - setNextState(DeleteStorageGroupState.DELETE_CONFIG); + LOG.info( + "[DeleteDatabaseProcedure] Invalidate cache of database: {}", + deleteDatabaseSchema.getName()); + if (env.invalidateCache(deleteDatabaseSchema.getName())) { + setNextState(DeleteStorageGroupState.DELETE_DATABASE_SCHEMA); } else { - setFailure(new ProcedureException("Invalidate cache failed")); + setFailure(new ProcedureException("[DeleteDatabaseProcedure] Invalidate cache failed")); } break; - case DELETE_CONFIG: - LOG.info("Delete config info of {}", deleteSgSchema.getName()); + case DELETE_DATABASE_SCHEMA: + LOG.info( + "[DeleteDatabaseProcedure] Delete DatabaseSchema: {}", + deleteDatabaseSchema.getName()); // Submit RegionDeleteTasks OfferRegionMaintainTasksPlan dataRegionDeleteTaskOfferPlan = new OfferRegionMaintainTasksPlan(); List<TRegionReplicaSet> regionReplicaSets = - env.getAllReplicaSets(deleteSgSchema.getName()); + env.getAllReplicaSets(deleteDatabaseSchema.getName()); List<TRegionReplicaSet> schemaRegionReplicaSets = new ArrayList<>(); regionReplicaSets.forEach( regionReplicaSet -> { @@ -148,10 +150,10 @@ public class DeleteDatabaseProcedure } // Delete DatabasePartitionTable - TSStatus deleteConfigResult = env.deleteConfig(deleteSgSchema.getName()); + final TSStatus deleteConfigResult = env.deleteConfig(deleteDatabaseSchema.getName()); // Delete Database metrics - PartitionMetrics.unbindDatabasePartitionMetrics(deleteSgSchema.getName()); + PartitionMetrics.unbindDatabasePartitionMetrics(deleteDatabaseSchema.getName()); // try sync delete schema region AsyncClientHandler<TConsensusGroupId, TSStatus> asyncClientHandler = @@ -176,13 +178,13 @@ public class DeleteDatabaseProcedure asyncClientHandler.getResponseMap().entrySet()) { if (entry.getValue().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOG.info( - "Successfully delete SchemaRegion[{}] on {}", + "[DeleteDatabaseProcedure] Successfully delete SchemaRegion[{}] on {}", asyncClientHandler.getRequest(entry.getKey()), schemaRegionDeleteTaskMap.get(entry.getKey()).getTargetDataNode()); schemaRegionDeleteTaskMap.remove(entry.getKey()); } else { LOG.warn( - "Failed to delete SchemaRegion[{}] on {}. Submit to async deletion.", + "[DeleteDatabaseProcedure] Failed to delete SchemaRegion[{}] on {}. Submit to async deletion.", asyncClientHandler.getRequest(entry.getKey()), schemaRegionDeleteTaskMap.get(entry.getKey()).getTargetDataNode()); } @@ -200,22 +202,27 @@ public class DeleteDatabaseProcedure } if (deleteConfigResult.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + LOG.info( + "[DeleteDatabaseProcedure] Database: {} is deleted successfully", + deleteDatabaseSchema.getName()); return Flow.NO_MORE_STATE; } else if (getCycles() > RETRY_THRESHOLD) { - setFailure(new ProcedureException("Delete config info id failed")); + setFailure( + new ProcedureException("[DeleteDatabaseProcedure] Delete DatabaseSchema failed")); } } } catch (TException | IOException e) { if (isRollbackSupported(state)) { - setFailure(new ProcedureException("Delete database failed " + state)); + setFailure( + new ProcedureException("[DeleteDatabaseProcedure] Delete Database failed " + state)); } else { LOG.error( - "Retriable error trying to delete database {}, state {}", - deleteSgSchema.getName(), + "[DeleteDatabaseProcedure] Retriable error trying to delete database {}, state {}", + deleteDatabaseSchema.getName(), state, e); if (getCycles() > RETRY_THRESHOLD) { - setFailure(new ProcedureException("State stuck at " + state)); + setFailure(new ProcedureException("[DeleteDatabaseProcedure] State stuck at " + state)); } } } @@ -226,10 +233,11 @@ public class DeleteDatabaseProcedure protected void rollbackState(ConfigNodeProcedureEnv env, DeleteStorageGroupState state) throws IOException, InterruptedException { switch (state) { - case DELETE_PRE: + case PRE_DELETE_DATABASE: case INVALIDATE_CACHE: - LOG.info("Rollback preDeleted:{}", deleteSgSchema.getName()); - env.preDelete(PreDeleteDatabasePlan.PreDeleteType.ROLLBACK, deleteSgSchema.getName()); + LOG.info( + "[DeleteDatabaseProcedure] Rollback to preDeleted: {}", deleteDatabaseSchema.getName()); + env.preDelete(PreDeleteDatabasePlan.PreDeleteType.ROLLBACK, deleteDatabaseSchema.getName()); break; default: break; @@ -239,7 +247,7 @@ public class DeleteDatabaseProcedure @Override protected boolean isRollbackSupported(DeleteStorageGroupState state) { switch (state) { - case DELETE_PRE: + case PRE_DELETE_DATABASE: case INVALIDATE_CACHE: return true; default: @@ -259,21 +267,21 @@ public class DeleteDatabaseProcedure @Override protected DeleteStorageGroupState getInitialState() { - return DeleteStorageGroupState.DELETE_STORAGE_GROUP_PREPARE; + return DeleteStorageGroupState.PRE_DELETE_DATABASE; } @Override public void serialize(DataOutputStream stream) throws IOException { stream.writeShort(ProcedureType.DELETE_STORAGE_GROUP_PROCEDURE.getTypeCode()); super.serialize(stream); - ThriftConfigNodeSerDeUtils.serializeTStorageGroupSchema(deleteSgSchema, stream); + ThriftConfigNodeSerDeUtils.serializeTStorageGroupSchema(deleteDatabaseSchema, stream); } @Override public void deserialize(ByteBuffer byteBuffer) { super.deserialize(byteBuffer); try { - deleteSgSchema = ThriftConfigNodeSerDeUtils.deserializeTStorageGroupSchema(byteBuffer); + deleteDatabaseSchema = ThriftConfigNodeSerDeUtils.deserializeTStorageGroupSchema(byteBuffer); } catch (ThriftSerDeException e) { LOG.error("Error in deserialize DeleteStorageGroupProcedure", e); } @@ -285,13 +293,13 @@ public class DeleteDatabaseProcedure DeleteDatabaseProcedure thatProc = (DeleteDatabaseProcedure) that; return thatProc.getProcId() == this.getProcId() && thatProc.getState() == this.getState() - && thatProc.deleteSgSchema.equals(this.getDeleteSgSchema()); + && thatProc.deleteDatabaseSchema.equals(this.getDeleteDatabaseSchema()); } return false; } @Override public int hashCode() { - return Objects.hash(deleteSgSchema); + return Objects.hash(deleteDatabaseSchema); } } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteStorageGroupState.java b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteStorageGroupState.java index 50895cc926..bc19a262e6 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteStorageGroupState.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteStorageGroupState.java @@ -20,8 +20,7 @@ package org.apache.iotdb.confignode.procedure.state.schema; public enum DeleteStorageGroupState { - DELETE_STORAGE_GROUP_PREPARE, - DELETE_PRE, + PRE_DELETE_DATABASE, INVALIDATE_CACHE, - DELETE_CONFIG + DELETE_DATABASE_SCHEMA } diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index 248426f1c5..6d6fa9e9e5 100644 --- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -47,12 +47,12 @@ import org.apache.iotdb.confignode.consensus.request.read.partition.GetSeriesSlo import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotListPlan; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan; import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan; +import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan; import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DatabaseSchemaPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetSchemaReplicationFactorPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTTLPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTimePartitionIntervalPlan; import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan; import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan; import org.apache.iotdb.confignode.consensus.response.auth.PermissionInfoResp; @@ -393,13 +393,13 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac @Override public TSStatus deleteDatabase(TDeleteDatabaseReq tDeleteReq) { String prefixPath = tDeleteReq.getPrefixPath(); - return configManager.deleteStorageGroups(Collections.singletonList(prefixPath)); + return configManager.deleteDatabases(Collections.singletonList(prefixPath)); } @Override public TSStatus deleteDatabases(TDeleteDatabasesReq tDeleteReq) { List<String> prefixList = tDeleteReq.getPrefixPathList(); - return configManager.deleteStorageGroups(prefixList); + return configManager.deleteDatabases(prefixList); } @Override @@ -429,7 +429,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac public TCountDatabaseResp countMatchedDatabases(List<String> storageGroupPathPattern) { CountDatabaseResp countDatabaseResp = (CountDatabaseResp) - configManager.countMatchedStorageGroups(new CountDatabasePlan(storageGroupPathPattern)); + configManager.countMatchedDatabases(new CountDatabasePlan(storageGroupPathPattern)); TCountDatabaseResp resp = new TCountDatabaseResp(); countDatabaseResp.convertToRPCCountStorageGroupResp(resp); @@ -440,8 +440,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac public TDatabaseSchemaResp getMatchedDatabaseSchemas(List<String> storageGroupPathPattern) { DatabaseSchemaResp databaseSchemaResp = (DatabaseSchemaResp) - configManager.getMatchedStorageGroupSchemas( - new GetDatabasePlan(storageGroupPathPattern)); + configManager.getMatchedDatabaseSchemas(new GetDatabasePlan(storageGroupPathPattern)); return databaseSchemaResp.convertToRPCStorageGroupSchemaResp(); } @@ -700,7 +699,9 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac public TSStatus flush(TFlushReq req) throws TException { if (req.storageGroups != null) { List<PartialPath> noExistSg = - configManager.checkStorageGroupExist(PartialPath.fromStringList(req.storageGroups)); + configManager + .getPartitionManager() + .filterUnExistDatabases(PartialPath.fromStringList(req.storageGroups)); if (!noExistSg.isEmpty()) { StringBuilder sb = new StringBuilder(); noExistSg.forEach(storageGroup -> sb.append(storageGroup.getFullPath()).append(",")); @@ -775,7 +776,7 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac @Override public TShowDatabaseResp showDatabase(List<String> storageGroupPathPattern) { - return configManager.showStorageGroup(new GetDatabasePlan(storageGroupPathPattern)); + return configManager.showDatabase(new GetDatabasePlan(storageGroupPathPattern)); } @Override diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java index dce99b4275..9737a90f6a 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java @@ -69,6 +69,13 @@ import org.apache.iotdb.confignode.consensus.request.write.cq.AddCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.DropCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.ShowCQPlan; import org.apache.iotdb.confignode.consensus.request.write.cq.UpdateCQLastExecTimePlan; +import org.apache.iotdb.confignode.consensus.request.write.database.AdjustMaxRegionGroupNumPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetDataReplicationFactorPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetSchemaReplicationFactorPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetTTLPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.SetTimePartitionIntervalPlan; import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan; @@ -80,13 +87,6 @@ import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGr import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan; import org.apache.iotdb.confignode.consensus.request.write.region.PollRegionMaintainTaskPlan; import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.AdjustMaxRegionGroupNumPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DatabaseSchemaPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DeleteDatabasePlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetSchemaReplicationFactorPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTTLPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetTimePartitionIntervalPlan; import org.apache.iotdb.confignode.consensus.request.write.sync.CreatePipeSinkPlan; import org.apache.iotdb.confignode.consensus.request.write.sync.DropPipeSinkPlan; import org.apache.iotdb.confignode.consensus.request.write.sync.GetPipeSinkPlan; @@ -761,7 +761,7 @@ public class ConfigPhysicalPlanSerDeTest { public void updateProcedureTest() throws IOException { // test procedure equals DeleteStorageGroupProcedure DeleteDatabaseProcedure deleteDatabaseProcedure = new DeleteDatabaseProcedure(); - deleteDatabaseProcedure.setDeleteSgSchema(new TDatabaseSchema("root.sg")); + deleteDatabaseProcedure.setDeleteDatabaseSchema(new TDatabaseSchema("root.sg")); UpdateProcedurePlan updateProcedurePlan0 = new UpdateProcedurePlan(); updateProcedurePlan0.setProcedure(deleteDatabaseProcedure); UpdateProcedurePlan updateProcedurePlan1 = @@ -808,7 +808,7 @@ public class ConfigPhysicalPlanSerDeTest { DeleteDatabaseProcedure deleteDatabaseProcedure = new DeleteDatabaseProcedure(); TDatabaseSchema tDatabaseSchema = new TDatabaseSchema(); tDatabaseSchema.setName("root.sg"); - deleteDatabaseProcedure.setDeleteSgSchema(tDatabaseSchema); + deleteDatabaseProcedure.setDeleteDatabaseSchema(tDatabaseSchema); req0.setProcedure(deleteDatabaseProcedure); UpdateProcedurePlan req1 = (UpdateProcedurePlan) ConfigPhysicalPlan.Factory.create(req0.serializeToByteBuffer()); diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfoTest.java index bbd1673399..78a57b69c3 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfoTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ClusterSchemaInfoTest.java @@ -24,7 +24,7 @@ import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; import org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan; import org.apache.iotdb.confignode.consensus.request.read.template.GetPathsSetTemplatePlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DatabaseSchemaPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan; import org.apache.iotdb.confignode.consensus.request.write.template.CreateSchemaTemplatePlan; import org.apache.iotdb.confignode.consensus.request.write.template.SetSchemaTemplatePlan; import org.apache.iotdb.confignode.persistence.schema.ClusterSchemaInfo; diff --git a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java index b0553b1b92..9959769622 100644 --- a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java +++ b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java @@ -31,11 +31,11 @@ import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.commons.partition.SeriesPartitionTable; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan; +import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan; -import org.apache.iotdb.confignode.consensus.request.write.storagegroup.DatabaseSchemaPlan; import org.apache.iotdb.confignode.consensus.response.partition.RegionInfoListResp; import org.apache.iotdb.confignode.persistence.partition.PartitionInfo; import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask; diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 1702db68fe..487e711012 100644 --- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -136,7 +136,7 @@ public enum TSStatusCode { CONSENSUS_NOT_INITIALIZED(904), REGION_LEADER_CHANGE_ERROR(905), NO_AVAILABLE_REGION_GROUP(906), - LACK_DATA_PARTITION_ALLOCATION(907), + LACK_PARTITION_ALLOCATION(907), // Cluster Manager ADD_CONFIGNODE_ERROR(1000), diff --git a/thrift-confignode/src/main/thrift/confignode.thrift b/thrift-confignode/src/main/thrift/confignode.thrift index f27532fac5..b29b460d5f 100644 --- a/thrift-confignode/src/main/thrift/confignode.thrift +++ b/thrift-confignode/src/main/thrift/confignode.thrift @@ -849,7 +849,7 @@ service IConfigNodeRPCService { * * @return SUCCESS_STATUS if the SchemaPartitionTable got or created successfully * NOT_ENOUGH_DATA_NODE if the number of cluster DataNodes is not enough for creating new SchemaRegions - * STORAGE_GROUP_NOT_EXIST if some Databases don't exist + * DATABASE_NOT_EXIST if some Databases don't exist */ TSchemaPartitionTableResp getOrCreateSchemaPartitionTable(TSchemaPartitionReq req) @@ -879,7 +879,7 @@ service IConfigNodeRPCService { * * @return SUCCESS_STATUS if the DataPartitionTable got or created successfully * NOT_ENOUGH_DATA_NODE if the number of cluster DataNodes is not enough for creating new DataRegions - * STORAGE_GROUP_NOT_EXIST if some Databases don't exist + * DATABASE_NOT_EXIST if some Databases don't exist */ TDataPartitionTableResp getOrCreateDataPartitionTable(TDataPartitionReq req)
