This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new ad41c5518b1 fix NPE bug & rename some storagegroup to database (#14155)
ad41c5518b1 is described below
commit ad41c5518b1b7b19b34c1803c3d0fc4a56794a04
Author: Potato <[email protected]>
AuthorDate: Thu Nov 21 12:32:40 2024 +0800
fix NPE bug & rename some storagegroup to database (#14155)
Signed-off-by: OneSizeFitQuorum <[email protected]>
---
.../manager/load/balancer/RouteBalancer.java | 2 +-
.../manager/partition/PartitionManager.java | 4 +-
.../manager/partition/PartitionMetrics.java | 8 +-
.../runtime/PipeLeaderChangeHandler.java | 2 +-
.../persistence/partition/PartitionInfo.java | 96 +++++++++++-----------
.../procedure/env/RegionMaintainHandler.java | 4 +-
.../impl/pipe/task/AlterPipeProcedureV2.java | 2 +-
.../impl/pipe/task/CreatePipeProcedureV2.java | 4 +-
8 files changed, 62 insertions(+), 60 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index 758c05f2c1e..872318ef2bd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -277,7 +277,7 @@ public class RouteBalancer implements
IClusterStatusSubscriber {
requestIndex.get(), dataNodeLocation);
// set req
final TConsensusGroupId consensusGroupId = entry.getKey();
- final String database =
getPartitionManager().getRegionStorageGroup(consensusGroupId);
+ final String database =
getPartitionManager().getRegionDatabase(consensusGroupId);
invalidateSchemaCacheRequestHandler.putRequest(requestIndex.get(), database);
requestIndex.incrementAndGet();
});
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 603e8829456..911f265c2e7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -1185,8 +1185,8 @@ public class PartitionManager {
* @param regionId regionId
* @return database name
*/
- public String getRegionStorageGroup(TConsensusGroupId regionId) {
- return partitionInfo.getRegionStorageGroup(regionId);
+ public String getRegionDatabase(TConsensusGroupId regionId) {
+ return partitionInfo.getRegionDatabase(regionId);
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
index 3e7426b3786..771856a465d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionMetrics.java
@@ -345,8 +345,8 @@ public class PartitionMetrics implements IMetricSet {
try {
return manager.getRegionGroupCount(database,
TConsensusGroupType.SchemaRegion);
} catch (DatabaseNotExistsException e) {
- LOGGER.warn("Error when counting SchemaRegionGroups in Database:
{}", database, e);
- return -1;
+ LOGGER.info("Error when counting SchemaRegionGroups in Database:
{}", database, e);
+ return 0;
}
},
Tag.NAME.toString(),
@@ -361,8 +361,8 @@ public class PartitionMetrics implements IMetricSet {
try {
return manager.getRegionGroupCount(database,
TConsensusGroupType.DataRegion);
} catch (DatabaseNotExistsException e) {
- LOGGER.warn("Error when counting DataRegionGroups in Database:
{}", database, e);
- return -1;
+ LOGGER.info("Error when counting DataRegionGroups in Database:
{}", database, e);
+ return 0;
}
},
Tag.NAME.toString(),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
index 284779fcb03..9121ba3caa4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
@@ -83,7 +83,7 @@ public class PipeLeaderChangeHandler implements
IClusterStatusSubscriber {
.forEach(
(regionGroupId, pair) -> {
final String databaseName =
-
configManager.getPartitionManager().getRegionStorageGroup(regionGroupId);
+
configManager.getPartitionManager().getRegionDatabase(regionGroupId);
// Pipe only collect user's data, filter metric database here.
// DatabaseName may be null for config region group
if (Objects.isNull(databaseName)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index fe48dd50797..14308982932 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -273,7 +273,7 @@ public class PartitionInfo implements SnapshotProcessor {
}
/**
- * Thread-safely pre-delete the specific StorageGroup.
+ * Thread-safely pre-delete the specific database.
*
* @param preDeleteDatabasePlan PreDeleteStorageGroupPlan
* @return {@link TSStatusCode#SUCCESS_STATUS}
@@ -281,8 +281,8 @@ public class PartitionInfo implements SnapshotProcessor {
public TSStatus preDeleteDatabase(PreDeleteDatabasePlan
preDeleteDatabasePlan) {
final PreDeleteDatabasePlan.PreDeleteType preDeleteType =
preDeleteDatabasePlan.getPreDeleteType();
- final String storageGroup = preDeleteDatabasePlan.getStorageGroup();
- DatabasePartitionTable databasePartitionTable =
databasePartitionTables.get(storageGroup);
+ final String database = preDeleteDatabasePlan.getStorageGroup();
+ DatabasePartitionTable databasePartitionTable =
databasePartitionTables.get(database);
if (databasePartitionTable == null) {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@@ -305,12 +305,12 @@ public class PartitionInfo implements SnapshotProcessor {
}
/**
- * Thread-safely delete StorageGroup.
+ * Thread-safely delete database.
*
- * @param plan DeleteStorageGroupPlan
+ * @param plan DeleteDatabasePlan
*/
public void deleteDatabase(DeleteDatabasePlan plan) {
- // Clean the StorageGroupTable cache
+ // Clean the databaseTable cache
databasePartitionTables.remove(plan.getName());
}
@@ -325,24 +325,24 @@ public class PartitionInfo implements SnapshotProcessor {
// TODO: Replace this map with new SchemaPartition
Map<String, SchemaPartitionTable> schemaPartition = new
ConcurrentHashMap<>();
- if (plan.getPartitionSlotsMap().size() == 0) {
+ if (plan.getPartitionSlotsMap().isEmpty()) {
// Return all SchemaPartitions when the queried PartitionSlots are empty
databasePartitionTables.forEach(
- (storageGroup, databasePartitionTable) -> {
+ (database, databasePartitionTable) -> {
if (databasePartitionTable.isNotPreDeleted()) {
- schemaPartition.put(storageGroup, new SchemaPartitionTable());
+ schemaPartition.put(database, new SchemaPartitionTable());
databasePartitionTable.getSchemaPartition(
- new ArrayList<>(), schemaPartition.get(storageGroup));
+ new ArrayList<>(), schemaPartition.get(database));
- if
(schemaPartition.get(storageGroup).getSchemaPartitionMap().isEmpty()) {
+ if
(schemaPartition.get(database).getSchemaPartitionMap().isEmpty()) {
// Remove empty Map
- schemaPartition.remove(storageGroup);
+ schemaPartition.remove(database);
}
}
});
} else {
- // Return the SchemaPartition for each StorageGroup
+ // Return the SchemaPartition for each database
plan.getPartitionSlotsMap()
.forEach(
(database, partitionSlots) -> {
@@ -506,16 +506,16 @@ public class PartitionInfo implements SnapshotProcessor {
matchedDatabases.stream()
.filter(this::isDatabaseExisted)
.forEach(
- storageGroup -> {
- schemaPartitionMap.put(storageGroup, new SchemaPartitionTable());
+ database -> {
+ schemaPartitionMap.put(database, new SchemaPartitionTable());
databasePartitionTables
- .get(storageGroup)
- .getSchemaPartition(new ArrayList<>(),
schemaPartitionMap.get(storageGroup));
+ .get(database)
+ .getSchemaPartition(new ArrayList<>(),
schemaPartitionMap.get(database));
- if
(schemaPartitionMap.get(storageGroup).getSchemaPartitionMap().isEmpty()) {
+ if
(schemaPartitionMap.get(database).getSchemaPartitionMap().isEmpty()) {
// Remove empty Map
- schemaPartitionMap.remove(storageGroup);
+ schemaPartitionMap.remove(database);
}
});
@@ -534,10 +534,10 @@ public class PartitionInfo implements SnapshotProcessor {
return regionResp;
}
TShowRegionReq showRegionReq = regionsInfoPlan.getShowRegionReq();
- final List<String> storageGroups = showRegionReq != null ?
showRegionReq.getDatabases() : null;
+ final List<String> databases = showRegionReq != null ?
showRegionReq.getDatabases() : null;
databasePartitionTables.forEach(
- (storageGroup, databasePartitionTable) -> {
- if (storageGroups != null && !storageGroups.contains(storageGroup)) {
+ (database, databasePartitionTable) -> {
+ if (databases != null && !databases.contains(database)) {
return;
}
regionInfoList.addAll(databasePartitionTable.getRegionInfoList(regionsInfoPlan));
@@ -602,7 +602,7 @@ public class PartitionInfo implements SnapshotProcessor {
* @param regionId regionId
* @return database name
*/
- public String getRegionStorageGroup(TConsensusGroupId regionId) {
+ public String getRegionDatabase(TConsensusGroupId regionId) {
Optional<DatabasePartitionTable> sgPartitionTableOptional =
databasePartitionTables.values().stream()
.filter(s -> s.containRegionGroup(regionId))
@@ -617,9 +617,9 @@ public class PartitionInfo implements SnapshotProcessor {
/**
* Only Leader use this interface. Filter unassigned SchemaPartitionSlots.
*
- * @param partitionSlotsMap Map<StorageGroupName, List<TSeriesPartitionSlot>>
- * @return Map<StorageGroupName, List<TSeriesPartitionSlot>>,
SchemaPartitionSlots that is not
- * assigned in partitionSlotsMap
+ * @param partitionSlotsMap Map<database, List<TSeriesPartitionSlot>>
+ * @return Map<database, List<TSeriesPartitionSlot>>, SchemaPartitionSlots
that is not assigned in
+ * partitionSlotsMap
*/
public Map<String, List<TSeriesPartitionSlot>>
filterUnassignedSchemaPartitionSlots(
Map<String, List<TSeriesPartitionSlot>> partitionSlotsMap) {
@@ -642,9 +642,9 @@ public class PartitionInfo implements SnapshotProcessor {
/**
* Only Leader use this interface. Filter unassigned SchemaPartitionSlots
*
- * @param partitionSlotsMap Map<StorageGroupName, Map<TSeriesPartitionSlot,
TTimeSlotList>>
- * @return Map<StorageGroupName, Map<TSeriesPartitionSlot, TTimeSlotList>>,
DataPartitionSlots
- * that is not assigned in partitionSlotsMap
+ * @param partitionSlotsMap Map<database, Map<TSeriesPartitionSlot,
TTimeSlotList>>
+ * @return Map<database, Map<TSeriesPartitionSlot, TTimeSlotList>>,
DataPartitionSlots that is not
+ * assigned in partitionSlotsMap
*/
public Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>>
filterUnassignedDataPartitionSlots(
Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap)
{
@@ -806,8 +806,8 @@ public class PartitionInfo implements SnapshotProcessor {
*
* @param database DatabaseName
* @param type SchemaRegion or DataRegion
- * @return Number of Regions currently owned by the specific StorageGroup
- * @throws DatabaseNotExistsException When the specific StorageGroup doesn't
exist
+ * @return Number of Regions currently owned by the specific database
+ * @throws DatabaseNotExistsException When the specific database doesn't
exist
*/
public int getRegionGroupCount(String database, TConsensusGroupType type)
throws DatabaseNotExistsException {
@@ -865,7 +865,9 @@ public class PartitionInfo implements SnapshotProcessor {
* @return The assigned SeriesPartitionSlots count
*/
public int getAssignedSeriesPartitionSlotsCount(String database) {
- return
databasePartitionTables.get(database).getAssignedSeriesPartitionSlotsCount();
+ return Optional.ofNullable(databasePartitionTables.get(database))
+ .map(DatabasePartitionTable::getAssignedSeriesPartitionSlotsCount)
+ .orElse(0);
}
/**
@@ -877,13 +879,15 @@ public class PartitionInfo implements SnapshotProcessor {
* @return The assigned TimePartitionSlots count
*/
public long getAssignedTimePartitionSlotsCount(String database) {
- return databasePartitionTables.get(database).getTimeSlotCount();
+ return Optional.ofNullable(databasePartitionTables.get(database))
+ .map(DatabasePartitionTable::getTimeSlotCount)
+ .orElse(0L);
}
/**
- * Get the DataNodes who contain the specific StorageGroup's Schema or Data.
+ * Get the DataNodes who contain the specific database's Schema or Data.
*
- * @param database The specific StorageGroup's name
+ * @param database The specific database's name
* @param type SchemaRegion or DataRegion
* @return Set {@literal <}TDataNodeLocation{@literal >}, the related
DataNodes
*/
@@ -897,7 +901,7 @@ public class PartitionInfo implements SnapshotProcessor {
*
* @param database DatabaseName
* @param type SchemaRegion or DataRegion
- * @return The StorageGroup's Running or Available Regions that sorted by
the number of allocated
+ * @return The database's Running or Available Regions that sorted by the
number of allocated
* slots
*/
public List<Pair<Long, TConsensusGroupId>> getRegionGroupSlotsCounter(
@@ -955,12 +959,12 @@ public class PartitionInfo implements SnapshotProcessor {
// serialize nextRegionGroupId
ReadWriteIOUtils.write(nextRegionGroupId.get(), bufferedOutputStream);
- // serialize StorageGroupPartitionTable
+ // serialize databasePartitionTable
ReadWriteIOUtils.write(databasePartitionTables.size(),
bufferedOutputStream);
- for (Map.Entry<String, DatabasePartitionTable>
storageGroupPartitionTableEntry :
+ for (Map.Entry<String, DatabasePartitionTable>
databasePartitionTableEntry :
databasePartitionTables.entrySet()) {
- ReadWriteIOUtils.write(storageGroupPartitionTableEntry.getKey(),
bufferedOutputStream);
-
storageGroupPartitionTableEntry.getValue().serialize(bufferedOutputStream,
protocol);
+ ReadWriteIOUtils.write(databasePartitionTableEntry.getKey(),
bufferedOutputStream);
+ databasePartitionTableEntry.getValue().serialize(bufferedOutputStream,
protocol);
}
// serialize regionCleanList
@@ -1012,16 +1016,16 @@ public class PartitionInfo implements SnapshotProcessor
{
// start to restore
nextRegionGroupId.set(ReadWriteIOUtils.readInt(fileInputStream));
- // restore StorageGroupPartitionTable
+ // restore databasePartitionTable
int length = ReadWriteIOUtils.readInt(fileInputStream);
for (int i = 0; i < length; i++) {
- String storageGroup = ReadWriteIOUtils.readString(fileInputStream);
- if (storageGroup == null) {
- throw new IOException("Failed to load snapshot because get null
StorageGroup name");
+ final String database = ReadWriteIOUtils.readString(fileInputStream);
+ if (database == null) {
+ throw new IOException("Failed to load snapshot because get null
database name");
}
- DatabasePartitionTable databasePartitionTable = new
DatabasePartitionTable(storageGroup);
+ final DatabasePartitionTable databasePartitionTable = new
DatabasePartitionTable(database);
databasePartitionTable.deserialize(fileInputStream, protocol);
- databasePartitionTables.put(storageGroup, databasePartitionTable);
+ databasePartitionTables.put(database, databasePartitionTable);
}
// restore deletedRegionSet
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index 827ecd8b6d6..6418540e1f3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -222,8 +222,8 @@ public class RegionMaintainHandler {
currentPeerNodes = Collections.emptyList();
}
- String storageGroup =
configManager.getPartitionManager().getRegionStorageGroup(regionId);
- TCreatePeerReq req = new TCreatePeerReq(regionId, currentPeerNodes,
storageGroup);
+ String database =
configManager.getPartitionManager().getRegionDatabase(regionId);
+ TCreatePeerReq req = new TCreatePeerReq(regionId, currentPeerNodes,
database);
status =
(TSStatus)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index ccdd0e76b31..8b6aceb9c30 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -144,7 +144,7 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
.forEach(
(regionGroupId, regionLeaderNodeId) -> {
final String databaseName =
-
env.getConfigManager().getPartitionManager().getRegionStorageGroup(regionGroupId);
+
env.getConfigManager().getPartitionManager().getRegionDatabase(regionGroupId);
final PipeTaskMeta currentPipeTaskMeta =
currentConsensusGroupId2PipeTaskMeta.get(regionGroupId.getId());
if (databaseName != null
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index b8c712fdc16..813d4ebe69e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -180,9 +180,7 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
.forEach(
(regionGroupId, regionLeaderNodeId) -> {
final String databaseName =
- env.getConfigManager()
- .getPartitionManager()
- .getRegionStorageGroup(regionGroupId);
+
env.getConfigManager().getPartitionManager().getRegionDatabase(regionGroupId);
if (databaseName != null
&& !databaseName.equals(SchemaConstant.SYSTEM_DATABASE)
&& !databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE
+ ".")) {