This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d8c06a80d3 [IOTDB-3036][Partition Cache] Add auto create judge when
get deviceToStorageGroupMap (#5719)
d8c06a80d3 is described below
commit d8c06a80d3dade2d5fda066c1dea3d03ceb1b09b
Author: ZhangHongYin <[email protected]>
AuthorDate: Thu Apr 28 20:17:48 2022 +0800
[IOTDB-3036][Partition Cache] Add auto create judge when get
deviceToStorageGroupMap (#5719)
---
.../mpp/sql/analyze/ClusterPartitionFetcher.java | 80 +++++++++++++---------
1 file changed, 46 insertions(+), 34 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
index f50812fc1d..b6426dca9e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/analyze/ClusterPartitionFetcher.java
@@ -102,7 +102,7 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
try {
patternTree.constructTree();
List<String> devicePaths = patternTree.findAllDevicePaths();
- Map<String, String> deviceToStorageGroupMap =
getDeviceToStorageGroup(devicePaths);
+ Map<String, String> deviceToStorageGroupMap =
getDeviceToStorageGroup(devicePaths, false);
SchemaPartition schemaPartition =
partitionCache.getSchemaPartition(deviceToStorageGroupMap);
if (null == schemaPartition) {
TSchemaPartitionResp schemaPartitionResp =
@@ -125,7 +125,7 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
try {
patternTree.constructTree();
List<String> devicePaths = patternTree.findAllDevicePaths();
- Map<String, String> deviceToStorageGroupMap =
getDeviceToStorageGroup(devicePaths);
+ Map<String, String> deviceToStorageGroupMap =
getDeviceToStorageGroup(devicePaths, true);
SchemaPartition schemaPartition =
partitionCache.getSchemaPartition(deviceToStorageGroupMap);
if (null == schemaPartition) {
TSchemaPartitionResp schemaPartitionResp =
@@ -163,7 +163,7 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
public DataPartition getDataPartition(List<DataPartitionQueryParam>
dataPartitionQueryParams) {
try {
Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParams
=
- splitDataPartitionQueryParam(dataPartitionQueryParams);
+ splitDataPartitionQueryParam(dataPartitionQueryParams, false);
DataPartition dataPartition =
partitionCache.getDataPartition(splitDataPartitionQueryParams);
if (null == dataPartition) {
TDataPartitionResp dataPartitionResp =
@@ -203,7 +203,7 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
List<DataPartitionQueryParam> dataPartitionQueryParams) {
try {
Map<String, List<DataPartitionQueryParam>> splitDataPartitionQueryParams
=
- splitDataPartitionQueryParam(dataPartitionQueryParams);
+ splitDataPartitionQueryParam(dataPartitionQueryParams, true);
DataPartition dataPartition =
partitionCache.getDataPartition(splitDataPartitionQueryParams);
if (null == dataPartition) {
TDataPartitionResp dataPartitionResp =
@@ -223,7 +223,8 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
}
/** get deviceToStorageGroup map */
- private Map<String, String> getDeviceToStorageGroup(List<String>
devicePaths) {
+ private Map<String, String> getDeviceToStorageGroup(
+ List<String> devicePaths, boolean isAutoCreate) {
Map<String, String> deviceToStorageGroup = new HashMap<>();
// miss when devicePath contains *
for (String devicePath : devicePaths) {
@@ -232,7 +233,8 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
}
}
// first try to hit cache
- if (!partitionCache.getStorageGroup(devicePaths, deviceToStorageGroup)) {
+ boolean firstTryResult = partitionCache.getStorageGroup(devicePaths,
deviceToStorageGroup);
+ if (!firstTryResult) {
List<String> storageGroupPathPattern = ROOT_PATH;
try {
TStorageGroupSchemaResp storageGroupSchemaResp =
@@ -245,7 +247,9 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
partitionCache.updateStorageCache(storageGroupNames);
// second try to hit cache
deviceToStorageGroup = new HashMap<>();
- if (!partitionCache.getStorageGroup(devicePaths,
deviceToStorageGroup)) {
+ boolean secondTryResult =
+ partitionCache.getStorageGroup(devicePaths,
deviceToStorageGroup);
+ if (!secondTryResult && isAutoCreate) {
// try to auto create storage group
Set<String> storageGroupNamesNeedCreated = new HashSet<>();
for (String devicePath : devicePaths) {
@@ -265,7 +269,9 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
partitionCache.updateStorageCache(storageGroupNamesNeedCreated);
// third try to hit cache
deviceToStorageGroup = new HashMap<>();
- if (!partitionCache.getStorageGroup(devicePaths,
deviceToStorageGroup)) {
+ boolean thirdTryResult =
+ partitionCache.getStorageGroup(devicePaths,
deviceToStorageGroup);
+ if (!thirdTryResult) {
throw new StatementAnalyzeException(
"Failed to get Storage Group Map when executing
getOrCreateDataPartition()");
}
@@ -281,20 +287,23 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
/** split data partition query param by storage group */
private Map<String, List<DataPartitionQueryParam>>
splitDataPartitionQueryParam(
- List<DataPartitionQueryParam> dataPartitionQueryParams) {
+ List<DataPartitionQueryParam> dataPartitionQueryParams, boolean
isAutoCreate) {
List<String> devicePaths = new ArrayList<>();
for (DataPartitionQueryParam dataPartitionQueryParam :
dataPartitionQueryParams) {
devicePaths.add(dataPartitionQueryParam.getDevicePath());
}
- Map<String, String> deviceToStorageGroup =
getDeviceToStorageGroup(devicePaths);
+ Map<String, String> deviceToStorageGroup =
getDeviceToStorageGroup(devicePaths, isAutoCreate);
Map<String, List<DataPartitionQueryParam>> result = new HashMap<>();
for (DataPartitionQueryParam dataPartitionQueryParam :
dataPartitionQueryParams) {
- String storageGroup =
deviceToStorageGroup.get(dataPartitionQueryParam.getDevicePath());
- if (!result.containsKey(storageGroup)) {
- result.put(storageGroup, new ArrayList<>());
+ String devicePath = dataPartitionQueryParam.getDevicePath();
+ if (deviceToStorageGroup.containsKey(devicePath)) {
+ String storageGroup = deviceToStorageGroup.get(devicePath);
+ if (!result.containsKey(storageGroup)) {
+ result.put(storageGroup, new ArrayList<>());
+ }
+ result.get(storageGroup).add(dataPartitionQueryParam);
}
- result.get(storageGroup).add(dataPartitionQueryParam);
}
return result;
}
@@ -546,27 +555,31 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
schemaPartition.getSchemaPartitionMap();
Set<String> storageGroupNames = storageGroupPartitionMap.keySet();
for (String device : devices) {
- String storageGroup = null;
- for (String storageGroupName : storageGroupNames) {
- if (device.startsWith(storageGroupName)) {
- storageGroup = storageGroupName;
- break;
+ if (!device.contains("*")) {
+ String storageGroup = null;
+ for (String storageGroupName : storageGroupNames) {
+ if (device.startsWith(storageGroupName)) {
+ storageGroup = storageGroupName;
+ break;
+ }
}
+ if (null == storageGroup) {
+ logger.error(
+ "Failed to get the storage group of {} when update
SchemaPartitionCache", device);
+ continue;
+ }
+ TSeriesPartitionSlot seriesPartitionSlot =
+ partitionExecutor.getSeriesPartitionSlot(device);
+ TRegionReplicaSet regionReplicaSet =
+
storageGroupPartitionMap.get(storageGroup).getOrDefault(seriesPartitionSlot,
null);
+ if (null == regionReplicaSet) {
+ logger.error(
+ "Failed to get the regionReplicaSet of {} when update
SchemaPartitionCache",
+ device);
+ continue;
+ }
+ schemaPartitionCache.put(device, regionReplicaSet);
}
- if (null == storageGroup) {
- logger.error(
- "Failed to get the storage group of {} when update
SchemaPartitionCache", device);
- continue;
- }
- TSeriesPartitionSlot seriesPartitionSlot =
partitionExecutor.getSeriesPartitionSlot(device);
- TRegionReplicaSet regionReplicaSet =
-
storageGroupPartitionMap.get(storageGroup).getOrDefault(seriesPartitionSlot,
null);
- if (null == regionReplicaSet) {
- logger.error(
- "Failed to get the regionReplicaSet of {} when update
SchemaPartitionCache", device);
- continue;
- }
- schemaPartitionCache.put(device, regionReplicaSet);
}
}
@@ -575,7 +588,6 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
for (Map.Entry<
String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>>
entry1 : dataPartition.getDataPartitionMap().entrySet()) {
- String storageGroup = entry1.getKey();
for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>
entry2 : entry1.getValue().entrySet()) {
TSeriesPartitionSlot seriesPartitionSlot = entry2.getKey();