This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TableModelGrammar by this
push:
new 00100faf4b4 Implement new ClusterPartitionFetcher interface
00100faf4b4 is described below
commit 00100faf4b4cd6f864e8586a61600ef2c7f89966
Author: Potato <[email protected]>
AuthorDate: Wed Jul 3 20:15:33 2024 +0800
Implement new ClusterPartitionFetcher interface
---
.../plan/analyze/ClusterPartitionFetcher.java | 106 ++---
.../plan/analyze/IPartitionFetcher.java | 5 +-
...upCacheResult.java => DatabaseCacheResult.java} | 4 +-
.../analyze/cache/partition/PartitionCache.java | 445 ++++++++++++---------
.../schemaengine/schemaregion/utils/MetaUtils.java | 4 +-
.../iotdb/db/service/metrics/CacheMetrics.java | 28 +-
.../apache/iotdb/db/metadata/MetaUtilsTest.java | 8 +-
.../plan/analyze/cache/PartitionCacheTest.java | 36 +-
.../plan/relational/analyzer/TestMatadata.java | 6 +-
.../executor/SeriesPartitionExecutor.java | 2 +
.../partition/executor/hash/APHashExecutor.java | 18 +-
.../partition/executor/hash/BKDRHashExecutor.java | 11 +-
.../partition/executor/hash/JSHashExecutor.java | 12 +-
.../partition/executor/hash/SDBMHashExecutor.java | 12 +-
14 files changed, 414 insertions(+), 283 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
index eb6db8e6051..c6e30ea2d59 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
@@ -26,12 +26,14 @@ import
org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
import org.apache.iotdb.commons.partition.SchemaPartition;
import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
@@ -52,8 +54,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.TException;
import org.apache.tsfile.file.metadata.IDeviceID;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -64,9 +64,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+
public class ClusterPartitionFetcher implements IPartitionFetcher {
- private static final Logger logger =
LoggerFactory.getLogger(ClusterPartitionFetcher.class);
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
private final SeriesPartitionExecutor partitionExecutor;
@@ -96,44 +97,23 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
@Override
public SchemaPartition getSchemaPartition(PathPatternTree patternTree) {
- try (ConfigNodeClient client =
- configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
- patternTree.constructTree();
- List<IDeviceID> deviceIDs = patternTree.getAllDevicePatterns();
- Map<String, List<IDeviceID>> storageGroupToDeviceMap =
- partitionCache.getStorageGroupToDevice(deviceIDs, true, false, null);
- SchemaPartition schemaPartition =
partitionCache.getSchemaPartition(storageGroupToDeviceMap);
- if (null == schemaPartition) {
- TSchemaPartitionTableResp schemaPartitionTableResp =
-
client.getSchemaPartitionTable(constructSchemaPartitionReq(patternTree));
- if (schemaPartitionTableResp.getStatus().getCode()
- == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- schemaPartition =
parseSchemaPartitionTableResp(schemaPartitionTableResp);
- partitionCache.updateSchemaPartitionCache(
- schemaPartitionTableResp.getSchemaPartitionTable());
- } else {
- throw new RuntimeException(
- new IoTDBException(
- schemaPartitionTableResp.getStatus().getMessage(),
- schemaPartitionTableResp.getStatus().getCode()));
- }
- }
- return schemaPartition;
- } catch (ClientManagerException | TException e) {
- throw new StatementAnalyzeException(
- "An error occurred when executing getSchemaPartition():" +
e.getMessage());
- }
+ return getOrCreateSchemaPartition(patternTree, false, null);
}
@Override
public SchemaPartition getOrCreateSchemaPartition(PathPatternTree
patternTree, String userName) {
+ return getOrCreateSchemaPartition(patternTree, true, userName);
+ }
+
+ private SchemaPartition getOrCreateSchemaPartition(
+ PathPatternTree patternTree, boolean isAutoCreate, String userName) {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
patternTree.constructTree();
List<IDeviceID> deviceIDs = patternTree.getAllDevicePatterns();
- Map<String, List<IDeviceID>> storageGroupToDeviceMap =
- partitionCache.getStorageGroupToDevice(deviceIDs, true, true,
userName);
- SchemaPartition schemaPartition =
partitionCache.getSchemaPartition(storageGroupToDeviceMap);
+ Map<String, List<IDeviceID>> databaseToDevice =
+ partitionCache.getDatabaseToDevice(deviceIDs, true, isAutoCreate,
userName);
+ SchemaPartition schemaPartition =
partitionCache.getSchemaPartition(databaseToDevice);
if (null == schemaPartition) {
TSchemaPartitionTableResp schemaPartitionTableResp =
client.getOrCreateSchemaPartitionTable(constructSchemaPartitionReq(patternTree));
@@ -295,21 +275,54 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
@Override
public SchemaPartition getOrCreateSchemaPartition(
- String database, List<IDeviceID> deviceIDList, String userName) {
- // todo implement related logic @Potato
- throw new UnsupportedOperationException("Unsupported schema partition
operation");
+ String database, List<IDeviceID> deviceIDs, String userName) {
+ return getOrCreateSchemaPartition(database, deviceIDs, true, userName);
}
@Override
- public SchemaPartition getSchemaPartition(String database, List<IDeviceID>
deviceIDList) {
- // todo implement related logic @Potato
- throw new UnsupportedOperationException("Unsupported schema partition
operation");
+ public SchemaPartition getSchemaPartition(String database, List<IDeviceID>
deviceIDs) {
+ return getOrCreateSchemaPartition(database, deviceIDs, false, null);
+ }
+
+ private SchemaPartition getOrCreateSchemaPartition(
+ String database, List<IDeviceID> deviceIDs, boolean isAutoCreate, String
userName) {
+ try (ConfigNodeClient client =
+ configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+ partitionCache.checkAndAutoCreateDatabase(database, isAutoCreate,
userName);
+ SchemaPartition schemaPartition =
+ partitionCache.getSchemaPartition(
+ new HashMap<String, List<IDeviceID>>() {
+ {
+ put(database, deviceIDs);
+ }
+ });
+ if (null == schemaPartition) {
+ PathPatternTree tree = new PathPatternTree();
+ tree.appendPathPattern(new PartialPath(database + "." +
MULTI_LEVEL_PATH_WILDCARD));
+ TSchemaPartitionTableResp schemaPartitionTableResp =
+ client.getSchemaPartitionTable(constructSchemaPartitionReq(tree));
+ if (schemaPartitionTableResp.getStatus().getCode()
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ schemaPartition =
parseSchemaPartitionTableResp(schemaPartitionTableResp);
+ partitionCache.updateSchemaPartitionCache(
+ schemaPartitionTableResp.getSchemaPartitionTable());
+ } else {
+ throw new RuntimeException(
+ new IoTDBException(
+ schemaPartitionTableResp.getStatus().getMessage(),
+ schemaPartitionTableResp.getStatus().getCode()));
+ }
+ }
+ return schemaPartition;
+ } catch (ClientManagerException | TException | IllegalPathException e) {
+ throw new StatementAnalyzeException(
+ "An error occurred when executing getSchemaPartition():" +
e.getMessage());
+ }
}
@Override
public SchemaPartition getSchemaPartition(String database) {
- // todo implement related logic @Potato
- throw new UnsupportedOperationException("Unsupported schema partition
operation");
+ return partitionCache.getSchemaPartition(database);
}
/** split data partition query param by database */
@@ -321,14 +334,14 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
for (DataPartitionQueryParam dataPartitionQueryParam :
dataPartitionQueryParams) {
deviceIDs.add(dataPartitionQueryParam.getDeviceID());
}
- Map<IDeviceID, String> deviceToStorageGroupMap =
- partitionCache.getDeviceToStorageGroup(deviceIDs, true, isAutoCreate,
userName);
+ Map<IDeviceID, String> deviceToDatabase =
+ partitionCache.getDeviceToDatabase(deviceIDs, true, isAutoCreate,
userName);
Map<String, List<DataPartitionQueryParam>> result = new HashMap<>();
for (DataPartitionQueryParam dataPartitionQueryParam :
dataPartitionQueryParams) {
IDeviceID deviceID = dataPartitionQueryParam.getDeviceID();
- if (deviceToStorageGroupMap.containsKey(deviceID)) {
- String storageGroup = deviceToStorageGroupMap.get(deviceID);
- result.computeIfAbsent(storageGroup, key -> new
ArrayList<>()).add(dataPartitionQueryParam);
+ if (deviceToDatabase.containsKey(deviceID)) {
+ String database = deviceToDatabase.get(deviceID);
+ result.computeIfAbsent(database, key -> new
ArrayList<>()).add(dataPartitionQueryParam);
}
}
return result;
@@ -360,6 +373,7 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
}
private static class ComplexTimeSlotList {
+
Set<TTimePartitionSlot> timeSlotList;
boolean needLeftAll;
boolean needRightAll;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IPartitionFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IPartitionFetcher.java
index 2fddd356229..64b29d9dcb4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IPartitionFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IPartitionFetcher.java
@@ -94,6 +94,7 @@ public interface IPartitionFetcher {
void invalidAllCache();
// ======================== Table Model Schema Partition Interface
========================
+
/**
* Get or create schema partition, used in data insertion with
enable_auto_create_schema is true.
* if schemaPartition does not exist, then automatically create.
@@ -103,7 +104,7 @@ public interface IPartitionFetcher {
* <p>The device id shall be [table, seg1, ....]
*/
SchemaPartition getOrCreateSchemaPartition(
- String database, List<IDeviceID> deviceIDList, String userName);
+ String database, List<IDeviceID> deviceIDs, String userName);
/**
* For data query with completed id.
@@ -112,7 +113,7 @@ public interface IPartitionFetcher {
*
* <p>The device id shall be [table, seg1, ....]
*/
- SchemaPartition getSchemaPartition(String database, List<IDeviceID>
deviceIDList);
+ SchemaPartition getSchemaPartition(String database, List<IDeviceID>
deviceIDs);
/**
* For data query with partial device id conditions.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/StorageGroupCacheResult.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/DatabaseCacheResult.java
similarity index 94%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/StorageGroupCacheResult.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/DatabaseCacheResult.java
index 32e807041d7..27f07f526a9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/StorageGroupCacheResult.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/DatabaseCacheResult.java
@@ -26,7 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public abstract class StorageGroupCacheResult<K, V> {
+public abstract class DatabaseCacheResult<K, V> {
/** the result */
private boolean success = true;
@@ -56,7 +56,7 @@ public abstract class StorageGroupCacheResult<K, V> {
return map;
}
- public abstract void put(IDeviceID device, String storageGroupName);
+ public abstract void put(IDeviceID device, String databaseName);
/** set failed and clear the map */
public void setFailed() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
index 3fe5b5a6a0d..35134b4cd6d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
@@ -87,16 +87,13 @@ public class PartitionCache {
private final int seriesPartitionSlotNum =
config.getSeriesPartitionSlotNum();
private final SeriesPartitionExecutor partitionExecutor;
- /** the size of partitionCache */
- private final int cacheSize = config.getPartitionCacheSize();
-
/** the cache of database */
- private final Set<String> storageGroupCache =
Collections.synchronizedSet(new HashSet<>());
+ private final Set<String> databaseCache = new HashSet<>();
- /** storage -> schemaPartitionTable */
+ /** database -> schemaPartitionTable */
private final Cache<String, SchemaPartitionTable> schemaPartitionCache;
- /** storage -> dataPartitionTable */
+ /** database -> dataPartitionTable */
private final Cache<String, DataPartitionTable> dataPartitionCache;
/** the latest time when groupIdToReplicaSetMap updated. */
@@ -106,20 +103,22 @@ public class PartitionCache {
private final Map<TConsensusGroupId, TRegionReplicaSet>
groupIdToReplicaSetMap = new HashMap<>();
/** The lock of cache */
- private final ReentrantReadWriteLock storageGroupCacheLock = new
ReentrantReadWriteLock();
+ private final ReentrantReadWriteLock databaseCacheLock = new
ReentrantReadWriteLock();
private final ReentrantReadWriteLock schemaPartitionCacheLock = new
ReentrantReadWriteLock();
private final ReentrantReadWriteLock dataPartitionCacheLock = new
ReentrantReadWriteLock();
-
private final ReentrantReadWriteLock regionReplicaSetLock = new
ReentrantReadWriteLock();
private final IClientManager<ConfigRegionId, ConfigNodeClient>
configNodeClientManager =
ConfigNodeClientManager.getInstance();
+
private final CacheMetrics cacheMetrics;
public PartitionCache() {
- this.schemaPartitionCache =
Caffeine.newBuilder().maximumSize(cacheSize).build();
- this.dataPartitionCache =
Caffeine.newBuilder().maximumSize(cacheSize).build();
+ this.schemaPartitionCache =
+
Caffeine.newBuilder().maximumSize(config.getPartitionCacheSize()).build();
+ this.dataPartitionCache =
+
Caffeine.newBuilder().maximumSize(config.getPartitionCacheSize()).build();
this.partitionExecutor =
SeriesPartitionExecutor.getSeriesPartitionExecutor(
this.seriesSlotExecutorName, this.seriesPartitionSlotNum);
@@ -131,43 +130,42 @@ public class PartitionCache {
/**
* get database to device map
*
- * @param devicePaths the devices that need to hit
+ * @param deviceIDs the devices that need to hit
* @param tryToFetch whether try to get all database from config node
* @param isAutoCreate whether auto create database when cache miss
- * @param userName
+ * @param userName the userName
*/
- public Map<String, List<IDeviceID>> getStorageGroupToDevice(
- List<IDeviceID> devicePaths, boolean tryToFetch, boolean isAutoCreate,
String userName) {
- StorageGroupCacheResult<String, List<IDeviceID>> result =
- new StorageGroupCacheResult<String, List<IDeviceID>>() {
+ public Map<String, List<IDeviceID>> getDatabaseToDevice(
+ List<IDeviceID> deviceIDs, boolean tryToFetch, boolean isAutoCreate,
String userName) {
+ DatabaseCacheResult<String, List<IDeviceID>> result =
+ new DatabaseCacheResult<String, List<IDeviceID>>() {
@Override
- public void put(IDeviceID device, String storageGroupName) {
- map.computeIfAbsent(storageGroupName, k -> new ArrayList<>());
- map.get(storageGroupName).add(device);
+ public void put(IDeviceID device, String databaseName) {
+ map.computeIfAbsent(databaseName, k -> new
ArrayList<>()).add(device);
}
};
- getStorageGroupCacheResult(result, devicePaths, tryToFetch, isAutoCreate,
userName);
+ getDatabaseCacheResult(result, deviceIDs, tryToFetch, isAutoCreate,
userName);
return result.getMap();
}
/**
* get device to database map
*
- * @param deviceIDS the devices that need to hit
+ * @param deviceIDs the devices that need to hit
* @param tryToFetch whether try to get all database from config node
* @param isAutoCreate whether auto create database when cache miss
- * @param userName
+ * @param userName the userName
*/
- public Map<IDeviceID, String> getDeviceToStorageGroup(
- List<IDeviceID> deviceIDS, boolean tryToFetch, boolean isAutoCreate,
String userName) {
- StorageGroupCacheResult<IDeviceID, String> result =
- new StorageGroupCacheResult<IDeviceID, String>() {
+ public Map<IDeviceID, String> getDeviceToDatabase(
+ List<IDeviceID> deviceIDs, boolean tryToFetch, boolean isAutoCreate,
String userName) {
+ DatabaseCacheResult<IDeviceID, String> result =
+ new DatabaseCacheResult<IDeviceID, String>() {
@Override
- public void put(IDeviceID device, String storageGroupName) {
- map.put(device, storageGroupName);
+ public void put(IDeviceID device, String databaseName) {
+ map.put(device, databaseName);
}
};
- getStorageGroupCacheResult(result, deviceIDS, tryToFetch, isAutoCreate,
userName);
+ getDatabaseCacheResult(result, deviceIDs, tryToFetch, isAutoCreate,
userName);
return result.getMap();
}
@@ -177,44 +175,75 @@ public class PartitionCache {
* @param deviceID the path of device
* @return database name, return null if cache miss
*/
- private String getStorageGroupName(IDeviceID deviceID) {
- synchronized (storageGroupCache) {
- for (String storageGroupName : storageGroupCache) {
- if (PathUtils.isStartWith(deviceID, storageGroupName)) {
- return storageGroupName;
- }
+ private String getDatabaseName(IDeviceID deviceID) {
+ for (String databaseName : databaseCache) {
+ if (PathUtils.isStartWith(deviceID, databaseName)) {
+ return databaseName;
}
}
return null;
}
/**
- * get all database from confignode and update database cache
+ * judge whether this database is existed
+ *
+ * @param database name
+ * @return true of false
+ */
+ private boolean containsDatabase(String database) {
+ try {
+ databaseCacheLock.readLock().lock();
+ return databaseCache.contains(database);
+ } finally {
+ databaseCacheLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * get all database from configNode and update database cache
*
* @param result the result of get database cache
* @param deviceIDs the devices that need to hit
*/
- private void fetchStorageGroupAndUpdateCache(
- StorageGroupCacheResult<?, ?> result, List<IDeviceID> deviceIDs)
+ private void fetchDatabaseAndUpdateCache(
+ DatabaseCacheResult<?, ?> result, List<IDeviceID> deviceIDs)
throws ClientManagerException, TException {
- storageGroupCacheLock.writeLock().lock();
+ databaseCacheLock.writeLock().lock();
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
result.reset();
- getStorageGroupMap(result, deviceIDs, true);
+ getDatabaseMap(result, deviceIDs, true);
if (!result.isSuccess()) {
TGetDatabaseReq req = new TGetDatabaseReq(ROOT_PATH,
SchemaConstant.ALL_MATCH_SCOPE_BINARY);
- TDatabaseSchemaResp storageGroupSchemaResp =
client.getMatchedDatabaseSchemas(req);
- if (storageGroupSchemaResp.getStatus().getCode()
+ TDatabaseSchemaResp databaseSchemaResp =
client.getMatchedDatabaseSchemas(req);
+ if (databaseSchemaResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- Set<String> storageGroupNames =
storageGroupSchemaResp.getDatabaseSchemaMap().keySet();
+ Set<String> databaseNames =
databaseSchemaResp.getDatabaseSchemaMap().keySet();
// update all database into cache
- updateStorageCache(storageGroupNames);
- getStorageGroupMap(result, deviceIDs, true);
+ updateDatabaseCache(databaseNames);
+ getDatabaseMap(result, deviceIDs, true);
}
}
} finally {
- storageGroupCacheLock.writeLock().unlock();
+ databaseCacheLock.writeLock().unlock();
+ }
+ }
+
+ /** get all database from configNode and update database cache. */
+ private void fetchDatabaseAndUpdateCache(String database)
+ throws ClientManagerException, TException {
+ databaseCacheLock.writeLock().lock();
+ try (ConfigNodeClient client =
+ configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+ TGetDatabaseReq req = new TGetDatabaseReq(ROOT_PATH,
SchemaConstant.ALL_MATCH_SCOPE_BINARY);
+ TDatabaseSchemaResp databaseSchemaResp =
client.getMatchedDatabaseSchemas(req);
+ if (databaseSchemaResp.getStatus().getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ Set<String> databaseNames =
databaseSchemaResp.getDatabaseSchemaMap().keySet();
+ // update all database into cache
+ updateDatabaseCache(databaseNames);
+ }
+ } finally {
+ databaseCacheLock.writeLock().unlock();
}
}
@@ -223,32 +252,32 @@ public class PartitionCache {
*
* @param result the result of get database cache
* @param deviceIDs the devices that need to hit
- * @param userName
+ * @param userName the username
* @throws RuntimeException if failed to create database
*/
- private void createStorageGroupAndUpdateCache(
- StorageGroupCacheResult<?, ?> result, List<IDeviceID> deviceIDs, String
userName)
+ private void createDatabaseAndUpdateCache(
+ DatabaseCacheResult<?, ?> result, List<IDeviceID> deviceIDs, String
userName)
throws ClientManagerException, MetadataException, TException {
- storageGroupCacheLock.writeLock().lock();
+ databaseCacheLock.writeLock().lock();
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
// Try to check whether database need to be created
result.reset();
// Try to hit database with all missed devices
- getStorageGroupMap(result, deviceIDs, false);
+ getDatabaseMap(result, deviceIDs, false);
if (!result.isSuccess()) {
// Try to get database needed to be created from missed device
- Set<String> storageGroupNamesNeedCreated = new HashSet<>();
+ Set<String> databaseNamesNeedCreated = new HashSet<>();
for (IDeviceID deviceID : result.getMissedDevices()) {
- PartialPath storageGroupNameNeedCreated =
- MetaUtils.getStorageGroupPathByLevel(
+ PartialPath databaseNameNeedCreated =
+ MetaUtils.getDatabasePathByLevel(
new PartialPath(deviceID),
config.getDefaultStorageGroupLevel());
-
storageGroupNamesNeedCreated.add(storageGroupNameNeedCreated.getFullPath());
+ databaseNamesNeedCreated.add(databaseNameNeedCreated.getFullPath());
}
// Try to create databases one by one until done or one database fail
- Set<String> successFullyCreatedStorageGroup = new HashSet<>();
- for (String storageGroupName : storageGroupNamesNeedCreated) {
+ Set<String> successFullyCreatedDatabase = new HashSet<>();
+ for (String databaseName : databaseNamesNeedCreated) {
long startTime = System.nanoTime();
try {
if (!AuthorityChecker.SUPER_USER.equals(userName)) {
@@ -265,27 +294,71 @@ public class PartitionCache {
} finally {
PerformanceOverviewMetrics.getInstance().recordAuthCost(System.nanoTime() -
startTime);
}
- TDatabaseSchema storageGroupSchema = new TDatabaseSchema();
- storageGroupSchema.setName(storageGroupName);
- TSStatus tsStatus = client.setDatabase(storageGroupSchema);
+ TDatabaseSchema databaseSchema = new TDatabaseSchema();
+ databaseSchema.setName(databaseName);
+ TSStatus tsStatus = client.setDatabase(databaseSchema);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() ==
tsStatus.getCode()) {
- successFullyCreatedStorageGroup.add(storageGroupName);
+ successFullyCreatedDatabase.add(databaseName);
} else {
// Try to update cache by databases successfully created
- updateStorageCache(successFullyCreatedStorageGroup);
+ updateDatabaseCache(successFullyCreatedDatabase);
logger.warn(
"[{} Cache] failed to create database {}",
- CacheMetrics.STORAGE_GROUP_CACHE_NAME,
- storageGroupName);
+ CacheMetrics.DATABASE_CACHE_NAME,
+ databaseName);
throw new RuntimeException(new IoTDBException(tsStatus.message,
tsStatus.code));
}
}
- // Try to update database cache when all databases has already been
created
- updateStorageCache(storageGroupNamesNeedCreated);
- getStorageGroupMap(result, deviceIDs, false);
+ // Try to update database cache when all databases have already been
created
+ updateDatabaseCache(databaseNamesNeedCreated);
+ getDatabaseMap(result, deviceIDs, false);
}
} finally {
- storageGroupCacheLock.writeLock().unlock();
+ databaseCacheLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * create not existed database and update database cache
+ *
+ * @param database the database
+ * @param userName the username
+ * @throws RuntimeException if failed to create database
+ */
+ private void createDatabaseAndUpdateCache(String database, String userName)
+ throws ClientManagerException, TException {
+ databaseCacheLock.writeLock().lock();
+ try (ConfigNodeClient client =
+ configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+ long startTime = System.nanoTime();
+ try {
+ if (!AuthorityChecker.SUPER_USER.equals(userName)) {
+ TSStatus status =
+ AuthorityChecker.getTSStatus(
+ AuthorityChecker.checkSystemPermission(
+ userName, PrivilegeType.MANAGE_DATABASE.ordinal()),
+ PrivilegeType.MANAGE_DATABASE);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode())
{
+ throw new RuntimeException(new IoTDBException(status.getMessage(),
status.getCode()));
+ }
+ }
+ } finally {
+
PerformanceOverviewMetrics.getInstance().recordAuthCost(System.nanoTime() -
startTime);
+ }
+ TDatabaseSchema databaseSchema = new TDatabaseSchema();
+ databaseSchema.setName(database);
+ TSStatus tsStatus = client.setDatabase(databaseSchema);
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+ // Try to update database cache when database has already been created
+ updateDatabaseCache(new
HashSet<>(Collections.singletonList(database)));
+ } else {
+ // Try to update cache by databases successfully created
+ logger.warn(
+ "[{} Cache] failed to create database {}",
CacheMetrics.DATABASE_CACHE_NAME, database);
+ throw new RuntimeException(new IoTDBException(tsStatus.message,
tsStatus.code));
+ }
+ } finally {
+ databaseCacheLock.writeLock().unlock();
}
}
@@ -296,19 +369,19 @@ public class PartitionCache {
* @param deviceIDs the devices that need to hit
* @param failFast if true, return when failed. if false, return when all
devices hit
*/
- private void getStorageGroupMap(
- StorageGroupCacheResult<?, ?> result, List<IDeviceID> deviceIDs, boolean
failFast) {
+ private void getDatabaseMap(
+ DatabaseCacheResult<?, ?> result, List<IDeviceID> deviceIDs, boolean
failFast) {
try {
- storageGroupCacheLock.readLock().lock();
+ databaseCacheLock.readLock().lock();
// reset result before try
result.reset();
boolean status = true;
for (IDeviceID devicePath : deviceIDs) {
- String storageGroupName = getStorageGroupName(devicePath);
- if (null == storageGroupName) {
+ String databaseName = getDatabaseName(devicePath);
+ if (null == databaseName) {
logger.debug(
"[{} Cache] miss when search device {}",
- CacheMetrics.STORAGE_GROUP_CACHE_NAME,
+ CacheMetrics.DATABASE_CACHE_NAME,
devicePath);
status = false;
if (failFast) {
@@ -317,7 +390,7 @@ public class PartitionCache {
result.addMissedDevice(devicePath);
}
} else {
- result.put(devicePath, storageGroupName);
+ result.put(devicePath, databaseName);
}
}
// setFailed the result when miss
@@ -325,10 +398,10 @@ public class PartitionCache {
result.setFailed();
}
logger.debug(
- "[{} Cache] hit when search device {}",
CacheMetrics.STORAGE_GROUP_CACHE_NAME, deviceIDs);
- cacheMetrics.record(status, CacheMetrics.STORAGE_GROUP_CACHE_NAME);
+ "[{} Cache] hit when search device {}",
CacheMetrics.DATABASE_CACHE_NAME, deviceIDs);
+ cacheMetrics.record(status, CacheMetrics.DATABASE_CACHE_NAME);
} finally {
- storageGroupCacheLock.readLock().unlock();
+ databaseCacheLock.readLock().unlock();
}
}
@@ -336,21 +409,21 @@ public class PartitionCache {
* get database map in three try
*
* @param result contains result, failed devices and map
- * @param devicePaths the devices that need to hit
+ * @param deviceIDs the devices that need to hit
* @param tryToFetch whether try to get all database from confignode
* @param isAutoCreate whether auto create database when device miss
* @param userName
*/
- private void getStorageGroupCacheResult(
- StorageGroupCacheResult<?, ?> result,
- List<IDeviceID> devicePaths,
+ private void getDatabaseCacheResult(
+ DatabaseCacheResult<?, ?> result,
+ List<IDeviceID> deviceIDs,
boolean tryToFetch,
boolean isAutoCreate,
String userName) {
if (!isAutoCreate) {
// TODO: avoid IDeviceID contains "*"
// miss when deviceId contains *
- for (IDeviceID deviceID : devicePaths) {
+ for (IDeviceID deviceID : deviceIDs) {
for (int i = 0; i < deviceID.segmentNum(); i++) {
if (((String) deviceID.segment(i)).contains("*")) {
return;
@@ -359,62 +432,64 @@ public class PartitionCache {
}
}
// first try to hit database in fast-fail way
- getStorageGroupMap(result, devicePaths, true);
+ getDatabaseMap(result, deviceIDs, true);
if (!result.isSuccess() && tryToFetch) {
try {
// try to fetch database from config node when miss
- fetchStorageGroupAndUpdateCache(result, devicePaths);
+ fetchDatabaseAndUpdateCache(result, deviceIDs);
if (!result.isSuccess() && isAutoCreate) {
// try to auto create database of failed device
- createStorageGroupAndUpdateCache(result, devicePaths, userName);
+ createDatabaseAndUpdateCache(result, deviceIDs, userName);
if (!result.isSuccess()) {
throw new StatementAnalyzeException("Failed to get database Map");
}
}
} catch (TException | MetadataException | ClientManagerException e) {
throw new StatementAnalyzeException(
- "An error occurred when executing getDeviceToStorageGroup():" +
e.getMessage());
+ "An error occurred when executing getDeviceToDatabase():" +
e.getMessage());
}
}
}
- /**
- * update database cache
- *
- * @param storageGroupNames the database names that need to update
- */
- public void updateStorageCache(Set<String> storageGroupNames) {
- storageGroupCacheLock.writeLock().lock();
- try {
- storageGroupCache.addAll(storageGroupNames);
- } finally {
- storageGroupCacheLock.writeLock().unlock();
+ public void checkAndAutoCreateDatabase(String database, boolean
isAutoCreate, String userName) {
+ boolean isExisted = containsDatabase(database);
+ if (!isExisted) {
+ try {
+ // try to fetch database from config node when miss
+ fetchDatabaseAndUpdateCache(database);
+ isExisted = containsDatabase(database);
+ if (!isExisted && isAutoCreate) {
+ // try to auto create database of failed device
+ createDatabaseAndUpdateCache(database, userName);
+ }
+ } catch (TException | ClientManagerException e) {
+ throw new StatementAnalyzeException(
+ "An error occurred when executing getDeviceToDatabase():" +
e.getMessage());
+ }
}
}
/**
- * invalidate database cache
+ * update database cache
*
- * @param storageGroupNames the databases that need to invalid
+ * @param databaseNames the database names that need to update
*/
- public void removeFromStorageGroupCache(List<String> storageGroupNames) {
- storageGroupCacheLock.writeLock().lock();
+ public void updateDatabaseCache(Set<String> databaseNames) {
+ databaseCacheLock.writeLock().lock();
try {
- for (String storageGroupName : storageGroupNames) {
- storageGroupCache.remove(storageGroupName);
- }
+ databaseCache.addAll(databaseNames);
} finally {
- storageGroupCacheLock.writeLock().unlock();
+ databaseCacheLock.writeLock().unlock();
}
}
/** invalidate all database cache */
- public void removeFromStorageGroupCache() {
- storageGroupCacheLock.writeLock().lock();
+ public void removeFromDatabaseCache() {
+ databaseCacheLock.writeLock().lock();
try {
- storageGroupCache.clear();
+ databaseCache.clear();
} finally {
- storageGroupCacheLock.writeLock().unlock();
+ databaseCacheLock.writeLock().unlock();
}
}
@@ -423,11 +498,11 @@ public class PartitionCache {
// region replicaSet cache
/**
- * get regionReplicaSet from local and confignode
+ * get regionReplicaSet from local and configNode
*
* @param consensusGroupId the id of consensus group
* @return regionReplicaSet
- * @throws RuntimeException if failed to get regionReplicaSet from confignode
+ * @throws RuntimeException if failed to get regionReplicaSet from configNode
* @throws StatementAnalyzeException if there are exception when try to get
latestRegionRouteMap
*/
public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId
consensusGroupId) {
@@ -440,7 +515,7 @@ public class PartitionCache {
regionReplicaSetLock.readLock().unlock();
}
if (result == null) {
- // if not hit then try to get regionReplicaSet from confignode
+ // if not hit then try to get regionReplicaSet from configNode
try {
regionReplicaSetLock.writeLock().lock();
// verify that there are not hit in cache
@@ -451,9 +526,9 @@ public class PartitionCache {
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() ==
resp.getStatus().getCode()) {
updateGroupIdToReplicaSetMap(resp.getTimestamp(),
resp.getRegionRouteMap());
}
- // if confignode don't have then will throw RuntimeException
+ // if configNode don't have then will throw RuntimeException
if (!groupIdToReplicaSetMap.containsKey(consensusGroupId)) {
- // failed to get RegionReplicaSet from confignode
+ // failed to get RegionReplicaSet from configNode
throw new RuntimeException(
"Failed to get replicaSet of consensus group[id= " +
consensusGroupId + "]");
}
@@ -511,31 +586,30 @@ public class PartitionCache {
/**
* get schemaPartition
*
- * @param storageGroupToDeviceMap database to devices map
- * @return SchemaPartition of storageGroupToDeviceMap
+ * @param databaseToDeviceMap database to devices map
+ * @return SchemaPartition of databaseToDeviceMap
*/
- public SchemaPartition getSchemaPartition(Map<String, List<IDeviceID>>
storageGroupToDeviceMap) {
+ public SchemaPartition getSchemaPartition(Map<String, List<IDeviceID>>
databaseToDeviceMap) {
schemaPartitionCacheLock.readLock().lock();
try {
- if (storageGroupToDeviceMap.size() == 0) {
+ if (databaseToDeviceMap.isEmpty()) {
cacheMetrics.record(false, CacheMetrics.SCHEMA_PARTITION_CACHE_NAME);
return null;
}
Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>>
schemaPartitionMap =
new HashMap<>();
// check cache for each database
- for (Map.Entry<String, List<IDeviceID>> entry :
storageGroupToDeviceMap.entrySet()) {
- String storageGroupName = entry.getKey();
+ for (Map.Entry<String, List<IDeviceID>> entry :
databaseToDeviceMap.entrySet()) {
+ String databaseName = entry.getKey();
Map<TSeriesPartitionSlot, TRegionReplicaSet> regionReplicaSetMap =
- schemaPartitionMap.computeIfAbsent(storageGroupName, k -> new
HashMap<>());
- SchemaPartitionTable schemaPartitionTable =
- schemaPartitionCache.getIfPresent(storageGroupName);
+ schemaPartitionMap.computeIfAbsent(databaseName, k -> new
HashMap<>());
+ SchemaPartitionTable schemaPartitionTable =
schemaPartitionCache.getIfPresent(databaseName);
if (null == schemaPartitionTable) {
// if database not find, then return cache miss.
logger.debug(
"[{} Cache] miss when search database {}",
CacheMetrics.SCHEMA_PARTITION_CACHE_NAME,
- storageGroupName);
+ databaseName);
cacheMetrics.record(false, CacheMetrics.SCHEMA_PARTITION_CACHE_NAME);
return null;
}
@@ -569,6 +643,43 @@ public class PartitionCache {
}
}
+ /**
+ * get schemaPartition
+ *
+ * @param database database
+ * @return SchemaPartition of databaseToDeviceMap
+ */
+ public SchemaPartition getSchemaPartition(String database) {
+ schemaPartitionCacheLock.readLock().lock();
+ try {
+ SchemaPartitionTable schemaPartitionTable =
schemaPartitionCache.getIfPresent(database);
+ if (null == schemaPartitionTable) {
+ // if database not find, then return cache miss.
+ logger.debug(
+ "[{} Cache] miss when search database {}",
+ CacheMetrics.SCHEMA_PARTITION_CACHE_NAME,
+ database);
+ cacheMetrics.record(false, CacheMetrics.SCHEMA_PARTITION_CACHE_NAME);
+ return null;
+ }
+ Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>>
schemaPartitionMap =
+ new HashMap<>();
+ Map<TSeriesPartitionSlot, TRegionReplicaSet> regionReplicaSetMap =
+ schemaPartitionMap.computeIfAbsent(database, k -> new HashMap<>());
+ for (Map.Entry<TSeriesPartitionSlot, TConsensusGroupId> entry :
+ schemaPartitionTable.getSchemaPartitionMap().entrySet()) {
+ regionReplicaSetMap.put(entry.getKey(),
getRegionReplicaSet(entry.getValue()));
+ }
+ logger.debug("[{} Cache] hit", CacheMetrics.SCHEMA_PARTITION_CACHE_NAME);
+ // cache hit
+ cacheMetrics.record(true, CacheMetrics.SCHEMA_PARTITION_CACHE_NAME);
+ return new SchemaPartition(
+ schemaPartitionMap, seriesSlotExecutorName, seriesPartitionSlotNum);
+ } finally {
+ schemaPartitionCacheLock.readLock().unlock();
+ }
+ }
+
/**
* update schemaPartitionCache by schemaPartition.
*
@@ -580,11 +691,11 @@ public class PartitionCache {
try {
for (Map.Entry<String, Map<TSeriesPartitionSlot, TConsensusGroupId>>
entry1 :
schemaPartitionTable.entrySet()) {
- String storageGroupName = entry1.getKey();
- SchemaPartitionTable result =
schemaPartitionCache.getIfPresent(storageGroupName);
+ String databaseName = entry1.getKey();
+ SchemaPartitionTable result =
schemaPartitionCache.getIfPresent(databaseName);
if (null == result) {
result = new SchemaPartitionTable();
- schemaPartitionCache.put(storageGroupName, result);
+ schemaPartitionCache.put(databaseName, result);
}
Map<TSeriesPartitionSlot, TConsensusGroupId>
seriesPartitionSlotTConsensusGroupIdMap =
result.getSchemaPartitionMap();
@@ -595,20 +706,6 @@ public class PartitionCache {
}
}
- /**
- * invalid schemaPartitionCache by database
- *
- * @param storageGroupName the databases that need to invalid
- */
- public void invalidSchemaPartitionCache(String storageGroupName) {
- schemaPartitionCacheLock.writeLock().lock();
- try {
- schemaPartitionCache.invalidate(storageGroupName);
- } finally {
- schemaPartitionCacheLock.writeLock().unlock();
- }
- }
-
/** invalid all schemaPartitionCache */
public void invalidAllSchemaPartitionCache() {
schemaPartitionCacheLock.writeLock().lock();
@@ -626,14 +723,14 @@ public class PartitionCache {
/**
* get dataPartition by query param map
*
- * @param storageGroupToQueryParamsMap database to dataPartitionQueryParam
map
- * @return DataPartition of storageGroupToQueryParamsMap
+ * @param databaseToQueryParamsMap database to dataPartitionQueryParam map
+ * @return DataPartition of databaseToQueryParamsMap
*/
public DataPartition getDataPartition(
- Map<String, List<DataPartitionQueryParam>> storageGroupToQueryParamsMap)
{
+ Map<String, List<DataPartitionQueryParam>> databaseToQueryParamsMap) {
dataPartitionCacheLock.readLock().lock();
try {
- if (storageGroupToQueryParamsMap.size() == 0) {
+ if (databaseToQueryParamsMap.isEmpty()) {
cacheMetrics.record(false, CacheMetrics.DATA_PARTITION_CACHE_NAME);
return null;
}
@@ -641,10 +738,10 @@ public class PartitionCache {
dataPartitionMap = new HashMap<>();
// check cache for each database
for (Map.Entry<String, List<DataPartitionQueryParam>> entry :
- storageGroupToQueryParamsMap.entrySet()) {
+ databaseToQueryParamsMap.entrySet()) {
if (null == entry.getValue()
|| entry.getValue().isEmpty()
- || !getStorageGroupDataPartition(dataPartitionMap, entry.getKey(),
entry.getValue())) {
+ || !getDatabaseDataPartition(dataPartitionMap, entry.getKey(),
entry.getValue())) {
cacheMetrics.record(false, CacheMetrics.DATA_PARTITION_CACHE_NAME);
return null;
}
@@ -662,32 +759,32 @@ public class PartitionCache {
* get dataPartition from database
*
* @param dataPartitionMap result
- * @param storageGroupName database that need to get
+ * @param databaseName database that need to get
* @param dataPartitionQueryParams specific query params of data partition
* @return whether hit
*/
- private boolean getStorageGroupDataPartition(
+ private boolean getDatabaseDataPartition(
Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>>
dataPartitionMap,
- String storageGroupName,
+ String databaseName,
List<DataPartitionQueryParam> dataPartitionQueryParams) {
- DataPartitionTable dataPartitionTable =
dataPartitionCache.getIfPresent(storageGroupName);
+ DataPartitionTable dataPartitionTable =
dataPartitionCache.getIfPresent(databaseName);
if (null == dataPartitionTable) {
logger.debug(
"[{} Cache] miss when search database {}",
CacheMetrics.DATA_PARTITION_CACHE_NAME,
- storageGroupName);
+ databaseName);
return false;
}
- Map<TSeriesPartitionSlot, SeriesPartitionTable>
cachedStorageGroupPartitionMap =
+ Map<TSeriesPartitionSlot, SeriesPartitionTable> cachedDatabasePartitionMap
=
dataPartitionTable.getDataPartitionMap();
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>
seriesSlotToTimePartitionMap =
- dataPartitionMap.computeIfAbsent(storageGroupName, k -> new
HashMap<>());
+ dataPartitionMap.computeIfAbsent(databaseName, k -> new
HashMap<>());
// check cache for each device
for (DataPartitionQueryParam dataPartitionQueryParam :
dataPartitionQueryParams) {
if (!getDeviceDataPartition(
- seriesSlotToTimePartitionMap, dataPartitionQueryParam,
cachedStorageGroupPartitionMap)) {
+ seriesSlotToTimePartitionMap, dataPartitionQueryParam,
cachedDatabasePartitionMap)) {
return false;
}
}
@@ -699,14 +796,14 @@ public class PartitionCache {
*
* @param seriesSlotToTimePartitionMap result
* @param dataPartitionQueryParam specific query param of data partition
- * @param cachedStorageGroupPartitionMap all cached data partition map of
related database
+ * @param cachedDatabasePartitionMap all cached data partition map of
related database
* @return whether hit
*/
private boolean getDeviceDataPartition(
Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>
seriesSlotToTimePartitionMap,
DataPartitionQueryParam dataPartitionQueryParam,
- Map<TSeriesPartitionSlot, SeriesPartitionTable>
cachedStorageGroupPartitionMap) {
+ Map<TSeriesPartitionSlot, SeriesPartitionTable>
cachedDatabasePartitionMap) {
TSeriesPartitionSlot seriesPartitionSlot;
if (null != dataPartitionQueryParam.getDeviceID()) {
seriesPartitionSlot =
@@ -715,7 +812,7 @@ public class PartitionCache {
return false;
}
SeriesPartitionTable cachedSeriesPartitionTable =
- cachedStorageGroupPartitionMap.get(seriesPartitionSlot);
+ cachedDatabasePartitionMap.get(seriesPartitionSlot);
if (null == cachedSeriesPartitionTable) {
if (logger.isDebugEnabled()) {
logger.debug(
@@ -788,9 +885,9 @@ public class PartitionCache {
for (Map.Entry<
String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TConsensusGroupId>>>>
entry1 : dataPartitionTable.entrySet()) {
- String storageGroupName = entry1.getKey();
- if (null != storageGroupName) {
- DataPartitionTable result =
dataPartitionCache.getIfPresent(storageGroupName);
+ String databaseName = entry1.getKey();
+ if (null != databaseName) {
+ DataPartitionTable result =
dataPartitionCache.getIfPresent(databaseName);
boolean needToUpdateCache = (null == result);
if (needToUpdateCache) {
result = new DataPartitionTable();
@@ -818,7 +915,7 @@ public class PartitionCache {
}
}
if (needToUpdateCache) {
- dataPartitionCache.put(storageGroupName, result);
+ dataPartitionCache.put(databaseName, result);
}
}
}
@@ -827,20 +924,6 @@ public class PartitionCache {
}
}
- /**
- * invalid dataPartitionCache by storageGroup
- *
- * @param storageGroup the databases that need to invalid
- */
- public void invalidDataPartitionCache(String storageGroup) {
- dataPartitionCacheLock.writeLock().lock();
- try {
- dataPartitionCache.invalidate(storageGroup);
- } finally {
- dataPartitionCacheLock.writeLock().unlock();
- }
- }
-
/** invalid all dataPartitionCache */
public void invalidAllDataPartitionCache() {
dataPartitionCacheLock.writeLock().lock();
@@ -855,7 +938,7 @@ public class PartitionCache {
public void invalidAllCache() {
logger.debug("[Partition Cache] invalid");
- removeFromStorageGroupCache();
+ removeFromDatabaseCache();
invalidAllDataPartitionCache();
invalidAllSchemaPartitionCache();
invalidReplicaSetCache();
@@ -865,10 +948,8 @@ public class PartitionCache {
@Override
public String toString() {
return "PartitionCache{"
- + "cacheSize="
- + cacheSize
- + ", storageGroupCache="
- + storageGroupCache
+ + ", databaseCache="
+ + databaseCache
+ ", replicaSetCache="
+ groupIdToReplicaSetMap
+ ", schemaPartitionCache="
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java
index 680ad2a9c1e..92a19bcaa75 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/MetaUtils.java
@@ -52,14 +52,14 @@ public class MetaUtils {
private MetaUtils() {}
/**
- * Get database path when creating schema automatically is enable
+ * Get database path when creating schema automatically is enabled
*
* <p>e.g., path = root.a.b.c and level = 1, return root.a
*
* @param path path
* @param level level
*/
- public static PartialPath getStorageGroupPathByLevel(PartialPath path, int
level)
+ public static PartialPath getDatabasePathByLevel(PartialPath path, int level)
throws MetadataException {
String[] nodeNames = path.getNodes();
if (nodeNames.length <= level ||
!nodeNames[0].equals(IoTDBConstant.PATH_ROOT)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CacheMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CacheMetrics.java
index 8003b55b1d2..2225a377f9f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CacheMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/CacheMetrics.java
@@ -31,26 +31,26 @@ import org.apache.iotdb.metrics.utils.MetricType;
import java.util.Arrays;
public class CacheMetrics implements IMetricSet {
- public static final String STORAGE_GROUP_CACHE_NAME = "Database";
+ public static final String DATABASE_CACHE_NAME = "Database";
public static final String SCHEMA_PARTITION_CACHE_NAME = "SchemaPartition";
public static final String DATA_PARTITION_CACHE_NAME = "DataPartition";
private static final String HIT = "hit";
private static final String ALL = "all";
- private Counter storageGroupCacheHitCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
+ private Counter databaseCacheHitCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
private Counter schemaPartitionCacheHitCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
private Counter dataPartitionCacheHitCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
- private Counter storageGroupCacheTotalCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
+ private Counter databaseCacheTotalCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
private Counter schemaPartitionCacheTotalCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
private Counter dataPartitionCacheTotalCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
@Override
public void bindTo(AbstractMetricService metricService) {
- storageGroupCacheHitCounter =
+ databaseCacheHitCounter =
metricService.getOrCreateCounter(
Metric.CACHE.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- STORAGE_GROUP_CACHE_NAME,
+ DATABASE_CACHE_NAME,
Tag.TYPE.toString(),
HIT);
schemaPartitionCacheHitCounter =
@@ -69,12 +69,12 @@ public class CacheMetrics implements IMetricSet {
DATA_PARTITION_CACHE_NAME,
Tag.TYPE.toString(),
HIT);
- storageGroupCacheTotalCounter =
+ databaseCacheTotalCounter =
metricService.getOrCreateCounter(
Metric.CACHE.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
- STORAGE_GROUP_CACHE_NAME,
+ DATABASE_CACHE_NAME,
Tag.TYPE.toString(),
ALL);
schemaPartitionCacheTotalCounter =
@@ -97,7 +97,7 @@ public class CacheMetrics implements IMetricSet {
@Override
public void unbindFrom(AbstractMetricService metricService) {
- Arrays.asList(STORAGE_GROUP_CACHE_NAME, SCHEMA_PARTITION_CACHE_NAME,
DATA_PARTITION_CACHE_NAME)
+ Arrays.asList(DATABASE_CACHE_NAME, SCHEMA_PARTITION_CACHE_NAME,
DATA_PARTITION_CACHE_NAME)
.forEach(
name -> {
metricService.remove(
@@ -115,14 +115,20 @@ public class CacheMetrics implements IMetricSet {
Tag.TYPE.toString(),
ALL);
});
+ databaseCacheHitCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+ schemaPartitionCacheHitCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+ dataPartitionCacheHitCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+ databaseCacheTotalCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+ schemaPartitionCacheTotalCounter =
DoNothingMetricManager.DO_NOTHING_COUNTER;
+ dataPartitionCacheTotalCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
}
public void record(boolean result, String name) {
switch (name) {
- case STORAGE_GROUP_CACHE_NAME:
- storageGroupCacheTotalCounter.inc();
+ case DATABASE_CACHE_NAME:
+ databaseCacheTotalCounter.inc();
if (result) {
- storageGroupCacheHitCounter.inc();
+ databaseCacheHitCounter.inc();
}
break;
case SCHEMA_PARTITION_CACHE_NAME:
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/MetaUtilsTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/MetaUtilsTest.java
index 986fdf95846..e4827ffeeda 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/MetaUtilsTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/MetaUtilsTest.java
@@ -111,12 +111,12 @@ public class MetaUtilsTest {
}
@Test
- public void testGetStorageGroupPathByLevel() {
+ public void testGetDatabasePathByLevel() {
int level = 1;
try {
assertEquals(
"root.laptop",
- MetaUtils.getStorageGroupPathByLevel(new
PartialPath("root.laptop.d1.s1"), level)
+ MetaUtils.getDatabasePathByLevel(new
PartialPath("root.laptop.d1.s1"), level)
.getFullPath());
} catch (MetadataException e) {
e.printStackTrace();
@@ -125,7 +125,7 @@ public class MetaUtilsTest {
boolean caughtException = false;
try {
- MetaUtils.getStorageGroupPathByLevel(new
PartialPath("root1.laptop.d1.s1"), level);
+ MetaUtils.getDatabasePathByLevel(new PartialPath("root1.laptop.d1.s1"),
level);
} catch (MetadataException e) {
caughtException = true;
assertEquals("root1.laptop.d1.s1 is not a legal path", e.getMessage());
@@ -134,7 +134,7 @@ public class MetaUtilsTest {
caughtException = false;
try {
- MetaUtils.getStorageGroupPathByLevel(new PartialPath("root"), level);
+ MetaUtils.getDatabasePathByLevel(new PartialPath("root"), level);
} catch (MetadataException e) {
caughtException = true;
assertEquals("root is not a legal path", e.getMessage());
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/PartitionCacheTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/PartitionCacheTest.java
index 4f917b7ce27..c198c3971b4 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/PartitionCacheTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/PartitionCacheTest.java
@@ -84,7 +84,7 @@ public class PartitionCacheTest {
storageGroupNumber < STORAGE_GROUP_NUMBER;
storageGroupNumber++) {
// init each database
- String storageGroupName = getStorageGroupName(storageGroupNumber);
+ String storageGroupName = getDatabaseName(storageGroupNumber);
storageGroups.add(storageGroupName);
if (!schemaPartitionTable.containsKey(storageGroupName)) {
schemaPartitionTable.put(storageGroupName, new HashMap<>());
@@ -137,7 +137,7 @@ public class PartitionCacheTest {
}
}
- private static String getStorageGroupName(int storageGroupNumber) {
+ private static String getDatabaseName(int storageGroupNumber) {
return STORAGE_GROUP_PREFIX + storageGroupNumber;
}
@@ -148,7 +148,7 @@ public class PartitionCacheTest {
@Before
public void setUp() throws Exception {
partitionCache = new PartitionCache();
- partitionCache.updateStorageCache(storageGroups);
+ partitionCache.updateDatabaseCache(storageGroups);
partitionCache.updateSchemaPartitionCache(schemaPartitionTable);
partitionCache.updateDataPartitionCache(dataPartitionTable);
partitionCache.updateGroupIdToReplicaSetMap(100,
consensusGroupIdToRegionReplicaSet);
@@ -174,14 +174,14 @@ public class PartitionCacheTest {
Factory.DEFAULT_FACTORY.create("root.sg2.d2")));
for (List<IDeviceID> searchDevices : existedDevicesInOneStorageGroup) {
storageGroupToDeviceMap =
- partitionCache.getStorageGroupToDevice(
+ partitionCache.getDatabaseToDevice(
searchDevices, false, false, AuthorityChecker.SUPER_USER);
assertEquals(1, storageGroupToDeviceMap.size());
for (List<IDeviceID> devices : storageGroupToDeviceMap.values()) {
assertEquals(2, devices.size());
}
deviceToStorageGroupMap =
- partitionCache.getDeviceToStorageGroup(
+ partitionCache.getDeviceToDatabase(
searchDevices, false, false, AuthorityChecker.SUPER_USER);
assertEquals(2, deviceToStorageGroupMap.size());
}
@@ -196,14 +196,14 @@ public class PartitionCacheTest {
Factory.DEFAULT_FACTORY.create("root.sg2.d2")));
for (List<IDeviceID> searchDevices : existedDevicesInMultiStorageGroup) {
storageGroupToDeviceMap =
- partitionCache.getStorageGroupToDevice(
+ partitionCache.getDatabaseToDevice(
searchDevices, false, false, AuthorityChecker.SUPER_USER);
assertEquals(2, storageGroupToDeviceMap.size());
for (List<IDeviceID> devices : storageGroupToDeviceMap.values()) {
assertEquals(1, devices.size());
}
deviceToStorageGroupMap =
- partitionCache.getDeviceToStorageGroup(
+ partitionCache.getDeviceToDatabase(
searchDevices, false, false, AuthorityChecker.SUPER_USER);
assertEquals(2, deviceToStorageGroupMap.size());
}
@@ -221,11 +221,11 @@ public class PartitionCacheTest {
Factory.DEFAULT_FACTORY.create("root.sg4.**")));
for (List<IDeviceID> searchDevices : nonExistedDevices) {
storageGroupToDeviceMap =
- partitionCache.getStorageGroupToDevice(
+ partitionCache.getDatabaseToDevice(
searchDevices, false, false, AuthorityChecker.SUPER_USER);
assertEquals(0, storageGroupToDeviceMap.size());
deviceToStorageGroupMap =
- partitionCache.getDeviceToStorageGroup(
+ partitionCache.getDeviceToDatabase(
searchDevices, false, false, AuthorityChecker.SUPER_USER);
assertEquals(0, deviceToStorageGroupMap.size());
}
@@ -234,11 +234,11 @@ public class PartitionCacheTest {
List<IDeviceID> oneDeviceList =
Collections.singletonList(Factory.DEFAULT_FACTORY.create("root.sg1.d1"));
storageGroupToDeviceMap =
- partitionCache.getStorageGroupToDevice(
+ partitionCache.getDatabaseToDevice(
oneDeviceList, false, false, AuthorityChecker.SUPER_USER);
assertEquals(0, storageGroupToDeviceMap.size());
deviceToStorageGroupMap =
- partitionCache.getDeviceToStorageGroup(
+ partitionCache.getDeviceToDatabase(
oneDeviceList, false, false, AuthorityChecker.SUPER_USER);
assertEquals(0, deviceToStorageGroupMap.size());
}
@@ -287,7 +287,7 @@ public class PartitionCacheTest {
for (int storageGroupNumber = 0;
storageGroupNumber < STORAGE_GROUP_NUMBER;
storageGroupNumber++) {
- String storageGroupName = getStorageGroupName(storageGroupNumber);
+ String storageGroupName = getDatabaseName(storageGroupNumber);
for (int deviceNumber = 0; deviceNumber < DEVICE_PER_STORAGE_GROUP;
deviceNumber++) {
IDeviceID deviceID =
Factory.DEFAULT_FACTORY.create(getDeviceName(storageGroupName,
deviceNumber));
@@ -322,7 +322,7 @@ public class PartitionCacheTest {
for (int storageGroupNumber = 0;
storageGroupNumber < STORAGE_GROUP_NUMBER;
storageGroupNumber++) {
- String storageGroupName = getStorageGroupName(storageGroupNumber);
+ String storageGroupName = getDatabaseName(storageGroupNumber);
for (int deviceNumber = DEVICE_PER_STORAGE_GROUP;
deviceNumber < 2 * DEVICE_PER_STORAGE_GROUP;
deviceNumber++) {
@@ -339,7 +339,7 @@ public class PartitionCacheTest {
for (int storageGroupNumber = 0;
storageGroupNumber < STORAGE_GROUP_NUMBER;
storageGroupNumber++) {
- String storageGroupName = getStorageGroupName(storageGroupNumber);
+ String storageGroupName = getDatabaseName(storageGroupNumber);
for (int deviceNumber = 0; deviceNumber < DEVICE_PER_STORAGE_GROUP;
deviceNumber++) {
IDeviceID deviceID =
Factory.DEFAULT_FACTORY.create(getDeviceName(storageGroupName,
deviceNumber));
@@ -356,7 +356,7 @@ public class PartitionCacheTest {
for (int storageGroupNumber = 0;
storageGroupNumber < STORAGE_GROUP_NUMBER;
storageGroupNumber++) {
- String storageGroupName = getStorageGroupName(storageGroupNumber);
+ String storageGroupName = getDatabaseName(storageGroupNumber);
for (int deviceNumber = 0; deviceNumber < DEVICE_PER_STORAGE_GROUP;
deviceNumber++) {
IDeviceID deviceID =
IDeviceID.Factory.DEFAULT_FACTORY.create(getDeviceName(storageGroupName,
deviceNumber));
@@ -402,7 +402,7 @@ public class PartitionCacheTest {
for (int storageGroupNumber = 0;
storageGroupNumber < STORAGE_GROUP_NUMBER;
storageGroupNumber++) {
- String storageGroupName = getStorageGroupName(storageGroupNumber);
+ String storageGroupName = getDatabaseName(storageGroupNumber);
for (int deviceNumber = DEVICE_PER_STORAGE_GROUP;
deviceNumber < 2 * DEVICE_PER_STORAGE_GROUP;
deviceNumber++) {
@@ -419,7 +419,7 @@ public class PartitionCacheTest {
for (int storageGroupNumber = 0;
storageGroupNumber < STORAGE_GROUP_NUMBER;
storageGroupNumber++) {
- String storageGroupName = getStorageGroupName(storageGroupNumber);
+ String storageGroupName = getDatabaseName(storageGroupNumber);
for (int deviceNumber = 0; deviceNumber < DEVICE_PER_STORAGE_GROUP;
deviceNumber++) {
IDeviceID deviceID =
Factory.DEFAULT_FACTORY.create(getDeviceName(storageGroupName,
deviceNumber));
@@ -435,7 +435,7 @@ public class PartitionCacheTest {
for (int storageGroupNumber = 0;
storageGroupNumber < STORAGE_GROUP_NUMBER;
storageGroupNumber++) {
- String storageGroupName = getStorageGroupName(storageGroupNumber);
+ String storageGroupName = getDatabaseName(storageGroupNumber);
for (int deviceNumber = 0; deviceNumber < DEVICE_PER_STORAGE_GROUP;
deviceNumber++) {
IDeviceID deviceID =
Factory.DEFAULT_FACTORY.create(getDeviceName(storageGroupName,
deviceNumber));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
index 0584cf62dfb..ae4d4a7315d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
@@ -301,17 +301,17 @@ public class TestMatadata implements Metadata {
@Override
public SchemaPartition getOrCreateSchemaPartition(
String database, List<IDeviceID> deviceIDList, String userName) {
- return null;
+ return SCHEMA_PARTITION;
}
@Override
public SchemaPartition getSchemaPartition(String database,
List<IDeviceID> deviceIDList) {
- return null;
+ return SCHEMA_PARTITION;
}
@Override
public SchemaPartition getSchemaPartition(String database) {
- return null;
+ return SCHEMA_PARTITION;
}
};
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java
index a07cd600476..d2666446e8e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/SeriesPartitionExecutor.java
@@ -36,6 +36,8 @@ public abstract class SeriesPartitionExecutor {
protected final int seriesPartitionSlotNum;
+ protected final int NULL_SEGMENT_HASH_NUM = (int) Character.MAX_VALUE + 1;
+
public SeriesPartitionExecutor(int seriesPartitionSlotNum) {
this.seriesPartitionSlotNum = seriesPartitionSlotNum;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java
index 1573502fb65..9390111d247 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/APHashExecutor.java
@@ -55,14 +55,24 @@ public class APHashExecutor extends SeriesPartitionExecutor
{
int index = 0;
for (int segmentID = 0; segmentID < segmentNum; segmentID++) {
- String segment = (String) deviceID.segment(segmentID);
- for (int i = 0; i < segment.length(); i++) {
+ Object segment = deviceID.segment(segmentID);
+ if (segment instanceof String) {
+ String segmentStr = (String) segment;
+ for (int i = 0; i < segmentStr.length(); i++) {
+ if ((index++ & 1) == 0) {
+ hash ^= ((hash << 7) ^ (int) segmentStr.charAt(i) ^ (hash >> 3));
+ } else {
+ hash ^= (~((hash << 11) ^ (int) segmentStr.charAt(i) ^ (hash >>
5)));
+ }
+ }
+ } else {
if ((index++ & 1) == 0) {
- hash ^= ((hash << 7) ^ (int) segment.charAt(i) ^ (hash >> 3));
+ hash ^= ((hash << 7) ^ NULL_SEGMENT_HASH_NUM ^ (hash >> 3));
} else {
- hash ^= (~((hash << 11) ^ (int) segment.charAt(i) ^ (hash >> 5)));
+ hash ^= (~((hash << 11) ^ NULL_SEGMENT_HASH_NUM ^ (hash >> 5)));
}
}
+
if (segmentID < segmentNum - 1) {
if ((index++ & 1) == 0) {
hash ^= ((hash << 7) ^ (int) PATH_SEPARATOR ^ (hash >> 3));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java
index 25aaa055bdc..c039e8ddd83 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/BKDRHashExecutor.java
@@ -52,9 +52,14 @@ public class BKDRHashExecutor extends
SeriesPartitionExecutor {
int segmentNum = deviceID.segmentNum();
for (int segmentID = 0; segmentID < segmentNum; segmentID++) {
- String segment = (String) deviceID.segment(segmentID);
- for (int i = 0; i < segment.length(); i++) {
- hash = hash * SEED + (int) segment.charAt(i);
+ Object segment = deviceID.segment(segmentID);
+ if (segment instanceof String) {
+ String segmentStr = (String) segment;
+ for (int i = 0; i < segmentStr.length(); i++) {
+ hash = hash * SEED + (int) segmentStr.charAt(i);
+ }
+ } else {
+ hash = hash * SEED + NULL_SEGMENT_HASH_NUM;
}
if (segmentID < segmentNum - 1) {
hash = hash * SEED + (int) PATH_SEPARATOR;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java
index c3e088e6e8f..1e8c2031583 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/JSHashExecutor.java
@@ -51,10 +51,16 @@ public class JSHashExecutor extends SeriesPartitionExecutor
{
int segmentNum = deviceID.segmentNum();
for (int segmentID = 0; segmentID < segmentNum; segmentID++) {
- String segment = (String) deviceID.segment(segmentID);
- for (int i = 0; i < segment.length(); i++) {
- hash ^= ((hash << 5) + (int) segment.charAt(i) + (hash >> 2));
+ Object segment = deviceID.segment(segmentID);
+ if (segment instanceof String) {
+ String segmentStr = (String) segment;
+ for (int i = 0; i < segmentStr.length(); i++) {
+ hash ^= ((hash << 5) + (int) segmentStr.charAt(i) + (hash >> 2));
+ }
+ } else {
+ hash ^= ((hash << 5) + NULL_SEGMENT_HASH_NUM + (hash >> 2));
}
+
if (segmentID < segmentNum - 1) {
hash ^= ((hash << 5) + (int) PATH_SEPARATOR + (hash >> 2));
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java
index 48cff807bab..e2143c00c0c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/executor/hash/SDBMHashExecutor.java
@@ -49,10 +49,16 @@ public class SDBMHashExecutor extends
SeriesPartitionExecutor {
int segmentNum = deviceID.segmentNum();
for (int segmentID = 0; segmentID < segmentNum; segmentID++) {
- String segment = (String) deviceID.segment(segmentID);
- for (int i = 0; i < segment.length(); i++) {
- hash = ((int) segment.charAt(i) + (hash << 6) + (hash << 16) - hash);
+ Object segment = deviceID.segment(segmentID);
+ if (segment instanceof String) {
+ String segmentStr = (String) segment;
+ for (int i = 0; i < segmentStr.length(); i++) {
+ hash = ((int) segmentStr.charAt(i) + (hash << 6) + (hash << 16) -
hash);
+ }
+ } else {
+ hash = (NULL_SEGMENT_HASH_NUM + (hash << 6) + (hash << 16) - hash);
}
+
if (segmentID < segmentNum - 1) {
hash = ((int) PATH_SEPARATOR + (hash << 6) + (hash << 16) - hash);
}