This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 4db4229ce36 [To dev/1.3] Pipe: Fixed the database cache parallel
creation logic
4db4229ce36 is described below
commit 4db4229ce36d9c098fbbf6f2e643121e0e5c7d6f
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jul 21 16:23:09 2025 +0800
[To dev/1.3] Pipe: Fixed the database cache parallel creation logic
---
.../configuration/PipeRuntimeEnvironment.java | 2 +
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../PipeConfigPhysicalPlanTSStatusVisitor.java | 16 ++----
.../confignode/persistence/schema/ConfigMTree.java | 16 +++---
.../metadata/DatabaseConflictException.java | 52 ++++++++++++++++++++
.../protocol/legacy/IoTDBLegacyPipeConnector.java | 11 ++++-
.../legacy/IoTDBLegacyPipeReceiverAgent.java | 3 +-
...upCacheResult.java => DatabaseCacheResult.java} | 2 +-
.../analyze/cache/partition/PartitionCache.java | 57 +++++++++++-----------
.../plan/analyze/load/LoadTsFileAnalyzer.java | 9 +++-
.../plugin/env/PipeTaskRuntimeEnvironment.java | 1 +
11 files changed, 117 insertions(+), 53 deletions(-)
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeEnvironment.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeEnvironment.java
index 455d293dccc..8395568791c 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeEnvironment.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/configuration/PipeRuntimeEnvironment.java
@@ -24,4 +24,6 @@ public interface PipeRuntimeEnvironment {
String getPipeName();
long getCreationTime();
+
+ int getRegionId();
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 34f9872bb10..e7780a98f92 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -81,6 +81,7 @@ public enum TSStatusCode {
SCHEMA_QUOTA_EXCEEDED(526),
MEASUREMENT_ALREADY_EXISTS_IN_TEMPLATE(527),
ONLY_LOGICAL_VIEW(528),
+ DATABASE_CONFLICT(529),
// Storage Engine
SYSTEM_READ_ONLY(600),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
index 73e84632596..a772ab77457 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
@@ -52,20 +52,10 @@ public class PipeConfigPhysicalPlanTSStatusVisitor
@Override
public TSStatus visitCreateDatabase(final DatabaseSchemaPlan plan, final
TSStatus context) {
if (context.getCode() ==
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
- if (context
- .getMessage()
- .contains(
- String.format(
- "%s has already been created as database",
plan.getSchema().getName()))) {
- // The same database has been created
- return new TSStatus(
-
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
- }
- // Lower or higher level database has been created
- return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+ return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
- } else if (context.getCode() ==
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
+ } else if (context.getCode() ==
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()
+ || context.getCode() ==
TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getCode() ==
TSStatusCode.METADATA_ERROR.getStatusCode()) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java
index 54f8d8663f0..6d0d13d4c65 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils;
import org.apache.iotdb.confignode.persistence.schema.mnode.IConfigMNode;
import
org.apache.iotdb.confignode.persistence.schema.mnode.factory.ConfigMNodeFactory;
import org.apache.iotdb.db.exception.metadata.DatabaseAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.DatabaseConflictException;
import org.apache.iotdb.db.exception.metadata.DatabaseNotSetException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import
org.apache.iotdb.db.schemaengine.schemaregion.mtree.traverser.collector.DatabaseCollector;
@@ -114,7 +115,7 @@ public class ConfigMTree {
store.addChild(cur, nodeNames[i], nodeFactory.createInternalMNode(cur,
nodeNames[i]));
} else if (temp.isDatabase()) {
// before create database, check whether the database already exists
- throw new DatabaseAlreadySetException(temp.getFullPath());
+ throw new DatabaseConflictException(temp.getFullPath(), false);
}
cur = store.getChild(cur, nodeNames[i]);
i++;
@@ -128,7 +129,7 @@ public class ConfigMTree {
if (store.getChild(cur, nodeNames[i]).isDatabase()) {
throw new DatabaseAlreadySetException(path.getFullPath());
} else {
- throw new DatabaseAlreadySetException(path.getFullPath(), true);
+ throw new DatabaseConflictException(path.getFullPath(), true);
}
} else {
IDatabaseMNode<IConfigMNode> databaseMNode =
@@ -137,7 +138,7 @@ public class ConfigMTree {
IConfigMNode result = store.addChild(cur, nodeNames[i],
databaseMNode.getAsMNode());
if (result != databaseMNode) {
- throw new DatabaseAlreadySetException(path.getFullPath(), true);
+ throw new DatabaseConflictException(path.getFullPath(), true);
}
}
}
@@ -262,7 +263,7 @@ public class ConfigMTree {
throw new DatabaseNotSetException(databasePath.getFullPath());
}
if (cur.isDatabase()) {
- throw new DatabaseAlreadySetException(cur.getFullPath());
+ throw new DatabaseConflictException(cur.getFullPath(), false);
}
}
@@ -273,7 +274,7 @@ public class ConfigMTree {
if (cur.isDatabase()) {
return cur.getAsDatabaseMNode();
} else {
- throw new DatabaseAlreadySetException(databasePath.getFullPath(), true);
+ throw new DatabaseConflictException(databasePath.getFullPath(), true);
}
}
@@ -331,7 +332,8 @@ public class ConfigMTree {
*
* @param path a full path or a prefix path
*/
- public void checkDatabaseAlreadySet(PartialPath path) throws
DatabaseAlreadySetException {
+ public void checkDatabaseAlreadySet(PartialPath path)
+ throws DatabaseAlreadySetException, DatabaseConflictException {
String[] nodeNames = path.getNodes();
IConfigMNode cur = root;
if (!nodeNames[0].equals(root.getName())) {
@@ -346,7 +348,7 @@ public class ConfigMTree {
throw new DatabaseAlreadySetException(cur.getFullPath());
}
}
- throw new DatabaseAlreadySetException(path.getFullPath(), true);
+ throw new DatabaseConflictException(path.getFullPath(), true);
}
// endregion
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseConflictException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseConflictException.java
new file mode 100644
index 00000000000..ee08ef4e759
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseConflictException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception.metadata;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class DatabaseConflictException extends MetadataException {
+
+ private final boolean isChild;
+
+ private final String databasePath;
+
+ public DatabaseConflictException(final String path, final boolean isChild) {
+ super(getMessage(path, isChild),
TSStatusCode.DATABASE_CONFLICT.getStatusCode());
+ this.isChild = isChild;
+ databasePath = path;
+ }
+
+ public boolean isChild() {
+ return isChild;
+ }
+
+ public String getDatabasePath() {
+ return databasePath;
+ }
+
+ private static String getMessage(final String path, final boolean isChild) {
+ if (isChild) {
+ return String.format("some children of %s have already been created as
database", path);
+ } else {
+ return String.format("%s has already been created as database", path);
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index 58a929d30d0..82a5bb7dfbc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
@@ -37,6 +37,7 @@ import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertio
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.pipe.api.PipeConnector;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -109,6 +110,7 @@ public class IoTDBLegacyPipeConnector implements
PipeConnector {
private String syncConnectorVersion;
private String pipeName;
+ private String databaseName;
private IoTDBSyncClient client;
private SessionPool sessionPool;
@@ -217,6 +219,11 @@ public class IoTDBLegacyPipeConnector implements
PipeConnector {
useSSL = parameters.getBooleanOrDefault(SINK_IOTDB_SSL_ENABLE_KEY, false);
trustStore = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY);
trustStorePwd = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY);
+
+ databaseName =
+ StorageEngine.getInstance()
+ .getDataRegion(new
DataRegionId(configuration.getRuntimeEnvironment().getRegionId()))
+ .getDatabaseName();
}
@Override
@@ -237,7 +244,7 @@ public class IoTDBLegacyPipeConnector implements
PipeConnector {
trustStorePwd);
final TSyncIdentityInfo identityInfo =
new TSyncIdentityInfo(
- pipeName, System.currentTimeMillis(), syncConnectorVersion,
IoTDBConstant.PATH_ROOT);
+ pipeName, System.currentTimeMillis(), syncConnectorVersion,
databaseName);
final TSStatus status = client.handshake(identityInfo);
if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
final String errorMsg =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
index a67d3e55815..cfc3bdcdd9e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -146,7 +146,8 @@ public class IoTDBLegacyPipeReceiverAgent {
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
false);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && result.status.code !=
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+ && result.status.code !=
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()
+ && result.status.code !=
TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
LOGGER.error(
"Create Database error, statement: {}, result status : {}.",
statement, result.status);
return false;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/StorageGroupCacheResult.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/DatabaseCacheResult.java
similarity index 97%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/StorageGroupCacheResult.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/DatabaseCacheResult.java
index 53771fa1871..52b6fd258a9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/StorageGroupCacheResult.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/DatabaseCacheResult.java
@@ -24,7 +24,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public abstract class StorageGroupCacheResult<V> {
+public abstract class DatabaseCacheResult<V> {
/** the result */
private boolean success = true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
index e649822bac4..4ca1fee18cb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
@@ -138,8 +138,8 @@ public class PartitionCache {
*/
public Map<String, List<String>> getStorageGroupToDevice(
List<String> devicePaths, boolean tryToFetch, boolean isAutoCreate,
String userName) {
- StorageGroupCacheResult<List<String>> result =
- new StorageGroupCacheResult<List<String>>() {
+ DatabaseCacheResult<List<String>> result =
+ new DatabaseCacheResult<List<String>>() {
@Override
public void put(String device, String storageGroupName) {
map.computeIfAbsent(storageGroupName, k -> new ArrayList<>());
@@ -160,8 +160,8 @@ public class PartitionCache {
*/
public Map<String, String> getDeviceToStorageGroup(
List<String> devicePaths, boolean tryToFetch, boolean isAutoCreate,
String userName) {
- StorageGroupCacheResult<String> result =
- new StorageGroupCacheResult<String>() {
+ DatabaseCacheResult<String> result =
+ new DatabaseCacheResult<String>() {
@Override
public void put(String device, String storageGroupName) {
map.put(device, storageGroupName);
@@ -195,13 +195,13 @@ public class PartitionCache {
* @param devicePaths the devices that need to hit
*/
private void fetchStorageGroupAndUpdateCache(
- StorageGroupCacheResult<?> result, List<String> devicePaths)
+ DatabaseCacheResult<?> result, List<String> devicePaths)
throws ClientManagerException, TException {
storageGroupCacheLock.writeLock().lock();
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
result.reset();
- getStorageGroupMap(result, devicePaths, true);
+ getDatabaseMap(result, devicePaths, true);
if (!result.isSuccess()) {
TGetDatabaseReq req = new TGetDatabaseReq(ROOT_PATH,
SchemaConstant.ALL_MATCH_SCOPE_BINARY);
TDatabaseSchemaResp storageGroupSchemaResp =
client.getMatchedDatabaseSchemas(req);
@@ -210,7 +210,7 @@ public class PartitionCache {
Set<String> storageGroupNames =
storageGroupSchemaResp.getDatabaseSchemaMap().keySet();
// update all database into cache
updateStorageCache(storageGroupNames);
- getStorageGroupMap(result, devicePaths, true);
+ getDatabaseMap(result, devicePaths, true);
}
}
} finally {
@@ -227,7 +227,7 @@ public class PartitionCache {
* @throws RuntimeException if failed to create database
*/
private void createStorageGroupAndUpdateCache(
- StorageGroupCacheResult<?> result, List<String> devicePaths, String
userName)
+ DatabaseCacheResult<?> result, List<String> devicePaths, String userName)
throws ClientManagerException, MetadataException, TException {
storageGroupCacheLock.writeLock().lock();
try (ConfigNodeClient client =
@@ -235,25 +235,25 @@ public class PartitionCache {
// Try to check whether database need to be created
result.reset();
// Try to hit database with all missed devices
- getStorageGroupMap(result, devicePaths, false);
+ getDatabaseMap(result, devicePaths, false);
if (!result.isSuccess()) {
// Try to get database needed to be created from missed device
- Set<String> storageGroupNamesNeedCreated = new HashSet<>();
+ Set<String> databaseNamesNeedCreated = new HashSet<>();
for (String devicePath : result.getMissedDevices()) {
if (devicePath.equals(SchemaConstant.SYSTEM_DATABASE)
|| devicePath.startsWith(SchemaConstant.SYSTEM_DATABASE + ".")) {
- storageGroupNamesNeedCreated.add(SchemaConstant.SYSTEM_DATABASE);
+ databaseNamesNeedCreated.add(SchemaConstant.SYSTEM_DATABASE);
} else {
- PartialPath storageGroupNameNeedCreated =
+ PartialPath databaseNameNeedCreated =
MetaUtils.getStorageGroupPathByLevel(
new PartialPath(devicePath),
config.getDefaultStorageGroupLevel());
-
storageGroupNamesNeedCreated.add(storageGroupNameNeedCreated.getFullPath());
+
databaseNamesNeedCreated.add(databaseNameNeedCreated.getFullPath());
}
}
// Try to create databases one by one until done or one database fail
- Set<String> successFullyCreatedStorageGroup = new HashSet<>();
- for (String storageGroupName : storageGroupNamesNeedCreated) {
+ Set<String> successFullyCreatedDatabases = new HashSet<>();
+ for (String databaseName : databaseNamesNeedCreated) {
long startTime = System.nanoTime();
try {
if (!AuthorityChecker.SUPER_USER.equals(userName)) {
@@ -271,29 +271,30 @@ public class PartitionCache {
PerformanceOverviewMetrics.getInstance().recordAuthCost(System.nanoTime() -
startTime);
}
TDatabaseSchema storageGroupSchema = new TDatabaseSchema();
- storageGroupSchema.setName(storageGroupName);
- if (SchemaConstant.SYSTEM_DATABASE.equals(storageGroupName)) {
+ storageGroupSchema.setName(databaseName);
+ if (SchemaConstant.SYSTEM_DATABASE.equals(databaseName)) {
storageGroupSchema.setMinSchemaRegionGroupNum(1);
storageGroupSchema.setMaxSchemaRegionGroupNum(1);
storageGroupSchema.setMaxDataRegionGroupNum(1);
storageGroupSchema.setMaxDataRegionGroupNum(1);
}
TSStatus tsStatus = client.setDatabase(storageGroupSchema);
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() ==
tsStatus.getCode()) {
- successFullyCreatedStorageGroup.add(storageGroupName);
- } else {
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == tsStatus.getCode()
+ || TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() ==
tsStatus.getCode()) {
+ successFullyCreatedDatabases.add(databaseName);
+ } else if (TSStatusCode.DATABASE_CONFLICT.getStatusCode() !=
tsStatus.getCode()) {
// Try to update cache by databases successfully created
- updateStorageCache(successFullyCreatedStorageGroup);
+ updateStorageCache(successFullyCreatedDatabases);
logger.warn(
"[{} Cache] failed to create database {}",
CacheMetrics.STORAGE_GROUP_CACHE_NAME,
- storageGroupName);
+ databaseName);
throw new RuntimeException(new IoTDBException(tsStatus.message,
tsStatus.code));
}
}
// Try to update database cache when all databases has already been
created
- updateStorageCache(storageGroupNamesNeedCreated);
- getStorageGroupMap(result, devicePaths, false);
+ updateStorageCache(databaseNamesNeedCreated);
+ getDatabaseMap(result, devicePaths, false);
}
} finally {
storageGroupCacheLock.writeLock().unlock();
@@ -307,8 +308,8 @@ public class PartitionCache {
* @param devicePaths the devices that need to hit
* @param failFast if true, return when failed. if false, return when all
devices hit
*/
- private void getStorageGroupMap(
- StorageGroupCacheResult<?> result, List<String> devicePaths, boolean
failFast) {
+ private void getDatabaseMap(
+ DatabaseCacheResult<?> result, List<String> devicePaths, boolean
failFast) {
storageGroupCacheLock.readLock().lock();
try {
// reset result before try
@@ -355,7 +356,7 @@ public class PartitionCache {
* @param userName
*/
private void getStorageGroupCacheResult(
- StorageGroupCacheResult<?> result,
+ DatabaseCacheResult<?> result,
List<String> devicePaths,
boolean tryToFetch,
boolean isAutoCreate,
@@ -369,7 +370,7 @@ public class PartitionCache {
}
}
// first try to hit database in fast-fail way
- getStorageGroupMap(result, devicePaths, true);
+ getDatabaseMap(result, devicePaths, true);
if (!result.isSuccess() && tryToFetch) {
try {
// try to fetch database from config node when miss
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 171f2b2f87f..22990e471ca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -819,7 +819,14 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
false);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && result.status.code !=
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+ && result.status.code
+ != TSStatusCode.DATABASE_ALREADY_EXISTS
+ .getStatusCode() // In tree model, if the user creates a
conflict database
+ // concurrently, for instance, the
+ // database created by user is root.db.ss.a, the auto-creation
failed database is root.db,
+ // we wait till "getOrCreatePartition" to judge if the time series
(like root.db.ss.a.e /
+ // root.db.ss.a) conflicts with the created database. just do not
throw exception here.
+ && result.status.code !=
TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
LOGGER.warn(
"Create database error, statement: {}, result status is: {}",
statement, result.status);
throw new LoadFileException(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
index e14c73f2860..3e9abb67a94 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskRuntimeEnvironment.java
@@ -43,6 +43,7 @@ public class PipeTaskRuntimeEnvironment implements
PipeRuntimeEnvironment {
return creationTime;
}
+ @Override
public int getRegionId() {
return regionId;
}