This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ty/TableModelGrammar by this
push:
new 373c9607f12 Fixed the bug of auto creating database
373c9607f12 is described below
commit 373c9607f12bea31c1272286260f6d3dd346fabb
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jul 8 14:29:09 2024 +0800
Fixed the bug of auto creating database
---
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../PipeConfigPhysicalPlanTSStatusVisitor.java | 16 +---
.../manager/schema/ClusterSchemaManager.java | 14 +--
.../confignode/persistence/schema/ConfigMTree.java | 38 ++++----
.../metadata/DatabaseAlreadySetException.java | 29 +-----
...ception.java => DatabaseConflictException.java} | 26 ++----
.../legacy/IoTDBLegacyPipeReceiverAgent.java | 104 +++++++++++----------
.../plan/analyze/ClusterPartitionFetcher.java | 8 +-
.../plan/analyze/LoadTsfileAnalyzer.java | 7 +-
.../analyze/cache/partition/PartitionCache.java | 50 +++++-----
10 files changed, 129 insertions(+), 164 deletions(-)
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 95123228234..fc6f3f905bb 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -83,6 +83,7 @@ public enum TSStatusCode {
SCHEMA_QUOTA_EXCEEDED(526),
MEASUREMENT_ALREADY_EXISTS_IN_TEMPLATE(527),
TYPE_NOT_FOUND(528),
+ DATABASE_CONFLICT(529),
TABLE_NOT_EXISTS(550),
TABLE_ALREADY_EXISTS(551),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
index f2db9d8c3ee..2e028bf312e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/visitor/PipeConfigPhysicalPlanTSStatusVisitor.java
@@ -52,20 +52,10 @@ public class PipeConfigPhysicalPlanTSStatusVisitor
@Override
public TSStatus visitCreateDatabase(final DatabaseSchemaPlan plan, final
TSStatus context) {
if (context.getCode() ==
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
- if (context
- .getMessage()
- .contains(
- String.format(
- "%s has already been created as database",
plan.getSchema().getName()))) {
- // The same database has been created
- return new TSStatus(
-
TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
- .setMessage(context.getMessage());
- }
- // Lower or higher level database has been created
- return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+ return new
TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
- } else if (context.getCode() ==
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()) {
+ } else if (context.getCode() ==
TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode()
+ || context.getCode() ==
TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (context.getCode() ==
TSStatusCode.METADATA_ERROR.getStatusCode()) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
index 7b65d63896c..0cb3bed3639 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/schema/ClusterSchemaManager.java
@@ -81,8 +81,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
import org.apache.iotdb.consensus.exception.ConsensusException;
-import org.apache.iotdb.db.exception.metadata.DatabaseAlreadySetException;
-import org.apache.iotdb.db.exception.metadata.SchemaQuotaExceededException;
import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUpdateType;
import org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUtil;
@@ -190,17 +188,9 @@ public class ClusterSchemaManager {
LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
result = new
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
result.setMessage(e.getMessage());
- } catch (MetadataException metadataException) {
+ } catch (final MetadataException metadataException) {
// Reject if Database already set
- if (metadataException instanceof IllegalPathException) {
- result = new TSStatus(TSStatusCode.ILLEGAL_PATH.getStatusCode());
- } else if (metadataException instanceof DatabaseAlreadySetException) {
- result = new
TSStatus(TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode());
- } else if (metadataException instanceof SchemaQuotaExceededException) {
- result = new
TSStatus(TSStatusCode.SCHEMA_QUOTA_EXCEEDED.getStatusCode());
- } else {
- result = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- }
+ result = new TSStatus(metadataException.getErrorCode());
result.setMessage(metadataException.getMessage());
} finally {
createDatabaseLock.unlock();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java
index 76ded35c73f..b2935d772d0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/schema/ConfigMTree.java
@@ -35,6 +35,7 @@ import
org.apache.iotdb.confignode.persistence.schema.mnode.factory.ConfigMNodeF
import
org.apache.iotdb.confignode.persistence.schema.mnode.impl.ConfigTableNode;
import
org.apache.iotdb.confignode.persistence.schema.mnode.impl.TableNodeStatus;
import org.apache.iotdb.db.exception.metadata.DatabaseAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.DatabaseConflictException;
import org.apache.iotdb.db.exception.metadata.DatabaseNotSetException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
@@ -108,8 +109,8 @@ public class ConfigMTree {
*
* @param path path
*/
- public void setStorageGroup(PartialPath path) throws MetadataException {
- String[] nodeNames = path.getNodes();
+ public void setStorageGroup(final PartialPath path) throws MetadataException
{
+ final String[] nodeNames = path.getNodes();
MetaFormatUtils.checkDatabase(path.getFullPath());
if (nodeNames.length <= 1 || !nodeNames[0].equals(root.getName())) {
throw new IllegalPathException(path.getFullPath());
@@ -118,12 +119,12 @@ public class ConfigMTree {
int i = 1;
// e.g., path = root.a.b.sg, create internal nodes for a, b
while (i < nodeNames.length - 1) {
- IConfigMNode temp = store.getChild(cur, nodeNames[i]);
+ final IConfigMNode temp = store.getChild(cur, nodeNames[i]);
if (temp == null) {
store.addChild(cur, nodeNames[i], nodeFactory.createInternalMNode(cur,
nodeNames[i]));
} else if (temp.isDatabase()) {
// before create database, check whether the database already exists
- throw new DatabaseAlreadySetException(temp.getFullPath());
+ throw new DatabaseConflictException(temp.getFullPath(), false);
}
cur = store.getChild(cur, nodeNames[i]);
i++;
@@ -134,19 +135,17 @@ public class ConfigMTree {
synchronized (this) {
if (store.hasChild(cur, nodeNames[i])) {
// node b has child sg
- if (store.getChild(cur, nodeNames[i]).isDatabase()) {
- throw new DatabaseAlreadySetException(path.getFullPath());
- } else {
- throw new DatabaseAlreadySetException(path.getFullPath(), true);
- }
+ throw store.getChild(cur, nodeNames[i]).isDatabase()
+ ? new DatabaseAlreadySetException(path.getFullPath())
+ : new DatabaseConflictException(path.getFullPath(), true);
} else {
- IDatabaseMNode<IConfigMNode> databaseMNode =
+ final IDatabaseMNode<IConfigMNode> databaseMNode =
nodeFactory.createDatabaseMNode(cur, nodeNames[i]);
- IConfigMNode result = store.addChild(cur, nodeNames[i],
databaseMNode.getAsMNode());
+ final IConfigMNode result = store.addChild(cur, nodeNames[i],
databaseMNode.getAsMNode());
if (result != databaseMNode) {
- throw new DatabaseAlreadySetException(path.getFullPath(), true);
+ throw new DatabaseConflictException(path.getFullPath(), true);
}
}
}
@@ -258,9 +257,9 @@ public class ConfigMTree {
/**
* E.g., root.sg is database given [root, sg], if the give path is not a
database, throw exception
*/
- public IDatabaseMNode<IConfigMNode>
getDatabaseNodeByDatabasePath(PartialPath databasePath)
+ public IDatabaseMNode<IConfigMNode> getDatabaseNodeByDatabasePath(final
PartialPath databasePath)
throws MetadataException {
- String[] nodes = databasePath.getNodes();
+ final String[] nodes = databasePath.getNodes();
if (nodes.length == 0 || !nodes[0].equals(root.getName())) {
throw new IllegalPathException(databasePath.getFullPath());
}
@@ -271,7 +270,7 @@ public class ConfigMTree {
throw new DatabaseNotSetException(databasePath.getFullPath());
}
if (cur.isDatabase()) {
- throw new DatabaseAlreadySetException(cur.getFullPath());
+ throw new DatabaseConflictException(cur.getFullPath(), false);
}
}
@@ -282,7 +281,7 @@ public class ConfigMTree {
if (cur.isDatabase()) {
return cur.getAsDatabaseMNode();
} else {
- throw new DatabaseAlreadySetException(databasePath.getFullPath(), true);
+ throw new DatabaseConflictException(databasePath.getFullPath(), true);
}
}
@@ -340,8 +339,9 @@ public class ConfigMTree {
*
* @param path a full path or a prefix path
*/
- public void checkDatabaseAlreadySet(PartialPath path) throws
DatabaseAlreadySetException {
- String[] nodeNames = path.getNodes();
+ public void checkDatabaseAlreadySet(final PartialPath path)
+ throws DatabaseAlreadySetException, DatabaseConflictException {
+ final String[] nodeNames = path.getNodes();
IConfigMNode cur = root;
if (!nodeNames[0].equals(root.getName())) {
return;
@@ -355,7 +355,7 @@ public class ConfigMTree {
throw new DatabaseAlreadySetException(cur.getFullPath());
}
}
- throw new DatabaseAlreadySetException(path.getFullPath(), true);
+ throw new DatabaseConflictException(path.getFullPath(), true);
}
// endregion
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseAlreadySetException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseAlreadySetException.java
index 20f42144ea1..71ba387f7e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseAlreadySetException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseAlreadySetException.java
@@ -24,37 +24,16 @@ import org.apache.iotdb.rpc.TSStatusCode;
public class DatabaseAlreadySetException extends MetadataException {
- private static final long serialVersionUID = 9110669164701929779L;
-
- private final boolean hasChild;
-
private final String storageGroupPath;
- public DatabaseAlreadySetException(String path) {
- super(getMessage(path, false),
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode());
- storageGroupPath = path;
- hasChild = false;
- }
-
- public DatabaseAlreadySetException(String path, boolean hasChild) {
- super(getMessage(path, hasChild),
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode());
- this.hasChild = hasChild;
+ public DatabaseAlreadySetException(final String path) {
+ super(
+ String.format("%s has already been created as database", path),
+ TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode());
storageGroupPath = path;
}
- public boolean isHasChild() {
- return hasChild;
- }
-
public String getStorageGroupPath() {
return storageGroupPath;
}
-
- private static String getMessage(String path, boolean hasChild) {
- if (hasChild) {
- return String.format("some children of %s have already been created as
database", path);
- } else {
- return String.format("%s has already been created as database", path);
- }
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseAlreadySetException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseConflictException.java
similarity index 65%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseAlreadySetException.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseConflictException.java
index 20f42144ea1..a6e4ed952c4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseAlreadySetException.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/DatabaseConflictException.java
@@ -22,36 +22,28 @@ package org.apache.iotdb.db.exception.metadata;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.rpc.TSStatusCode;
-public class DatabaseAlreadySetException extends MetadataException {
+public class DatabaseConflictException extends MetadataException {
- private static final long serialVersionUID = 9110669164701929779L;
-
- private final boolean hasChild;
+ private final boolean isChild;
private final String storageGroupPath;
- public DatabaseAlreadySetException(String path) {
- super(getMessage(path, false),
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode());
- storageGroupPath = path;
- hasChild = false;
- }
-
- public DatabaseAlreadySetException(String path, boolean hasChild) {
- super(getMessage(path, hasChild),
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode());
- this.hasChild = hasChild;
+ public DatabaseConflictException(final String path, boolean isChild) {
+ super(getMessage(path, isChild),
TSStatusCode.DATABASE_CONFLICT.getStatusCode());
+ this.isChild = isChild;
storageGroupPath = path;
}
- public boolean isHasChild() {
- return hasChild;
+ public boolean isChild() {
+ return isChild;
}
public String getStorageGroupPath() {
return storageGroupPath;
}
- private static String getMessage(String path, boolean hasChild) {
- if (hasChild) {
+ private static String getMessage(final String path, final boolean isChild) {
+ if (isChild) {
return String.format("some children of %s have already been created as
database", path);
} else {
return String.format("%s has already been created as database", path);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
index 3fb132523d8..daecab687f9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -98,11 +98,11 @@ public class IoTDBLegacyPipeReceiverAgent {
* TSStatusCode#SUCCESS_STATUS} if success to connect.
*/
public TSStatus handshake(
- TSyncIdentityInfo syncIdentityInfo,
- String remoteAddress,
- IPartitionFetcher partitionFetcher,
- ISchemaFetcher schemaFetcher) {
- SyncIdentityInfo identityInfo = new SyncIdentityInfo(syncIdentityInfo,
remoteAddress);
+ final TSyncIdentityInfo syncIdentityInfo,
+ final String remoteAddress,
+ final IPartitionFetcher partitionFetcher,
+ final ISchemaFetcher schemaFetcher) {
+ final SyncIdentityInfo identityInfo = new
SyncIdentityInfo(syncIdentityInfo, remoteAddress);
LOGGER.info("Invoke handshake method from client ip = {}",
identityInfo.getRemoteAddress());
if (!new File(getFileDataDir(identityInfo)).exists()) {
@@ -118,23 +118,25 @@ public class IoTDBLegacyPipeReceiverAgent {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
}
- private void createConnection(SyncIdentityInfo identityInfo) {
- long connectionId = connectionIdGenerator.incrementAndGet();
+ private void createConnection(final SyncIdentityInfo identityInfo) {
+ final long connectionId = connectionIdGenerator.incrementAndGet();
currentConnectionId.set(connectionId);
connectionIdToIdentityInfoMap.put(connectionId, identityInfo);
}
private boolean registerDatabase(
- String database, IPartitionFetcher partitionFetcher, ISchemaFetcher
schemaFetcher) {
+ final String database,
+ final IPartitionFetcher partitionFetcher,
+ final ISchemaFetcher schemaFetcher) {
if (registeredDatabase.containsKey(database)) {
return true;
}
try {
- DatabaseSchemaStatement statement =
+ final DatabaseSchemaStatement statement =
new
DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
statement.setDatabasePath(new PartialPath(database));
- long queryId = SessionManager.getInstance().requestQueryId();
- ExecutionResult result =
+ final long queryId = SessionManager.getInstance().requestQueryId();
+ final ExecutionResult result =
Coordinator.getInstance()
.executeForTreeModel(
statement,
@@ -145,12 +147,13 @@ public class IoTDBLegacyPipeReceiverAgent {
schemaFetcher,
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && result.status.code !=
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+ && result.status.code !=
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()
+ && result.status.code !=
TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
LOGGER.error(
"Create Database error, statement: {}, result status : {}.",
statement, result.status);
return false;
}
- } catch (IllegalPathException e) {
+ } catch (final IllegalPathException e) {
LOGGER.error("Parse database PartialPath {} error", database, e);
return false;
}
@@ -167,21 +170,21 @@ public class IoTDBLegacyPipeReceiverAgent {
* @throws TException The connection between the sender and the receiver has
not been established
* by {@link IoTDBLegacyPipeReceiverAgent#handshake}
*/
- public TSStatus transportPipeData(ByteBuffer buff) throws TException {
+ public TSStatus transportPipeData(final ByteBuffer buff) throws TException {
// step1. check connection
- SyncIdentityInfo identityInfo = getCurrentSyncIdentityInfo();
+ final SyncIdentityInfo identityInfo = getCurrentSyncIdentityInfo();
if (identityInfo == null) {
throw new TException("Thrift connection is not alive.");
}
LOGGER.debug(
"Invoke transportPipeData method from client ip = {}",
identityInfo.getRemoteAddress());
- String fileDir = getFileDataDir(identityInfo);
+ final String fileDir = getFileDataDir(identityInfo);
// step2. deserialize PipeData
- PipeData pipeData;
+ final PipeData pipeData;
try {
- int length = buff.remaining();
- byte[] byteArray = new byte[length];
+ final int length = buff.remaining();
+ final byte[] byteArray = new byte[length];
buff.get(byteArray);
pipeData = PipeData.createPipeData(byteArray);
if (pipeData instanceof TsFilePipeData) {
@@ -189,7 +192,7 @@ public class IoTDBLegacyPipeReceiverAgent {
tsFilePipeData.setDatabase(identityInfo.getDatabase());
handleTsFilePipeData(tsFilePipeData, fileDir);
}
- } catch (IOException e) {
+ } catch (final IOException e) {
LOGGER.error("Pipe data transport error, {}", e.getMessage());
return RpcUtils.getStatus(
TSStatusCode.PIPESERVER_ERROR, "Pipe data transport error, " +
e.getMessage());
@@ -205,7 +208,7 @@ public class IoTDBLegacyPipeReceiverAgent {
pipeData.createLoader().load();
LOGGER.info(
"Load pipeData with serialize number {} successfully.",
pipeData.getSerialNumber());
- } catch (PipeException e) {
+ } catch (final PipeException e) {
LOGGER.error("Fail to load pipeData because {}.", e.getMessage());
return RpcUtils.getStatus(
TSStatusCode.PIPESERVER_ERROR, "Fail to load pipeData because " +
e.getMessage());
@@ -220,7 +223,7 @@ public class IoTDBLegacyPipeReceiverAgent {
* @return null if connection has been exited
*/
private SyncIdentityInfo getCurrentSyncIdentityInfo() {
- Long id = currentConnectionId.get();
+ final Long id = currentConnectionId.get();
if (id != null) {
return connectionIdToIdentityInfoMap.get(id);
} else {
@@ -235,14 +238,14 @@ public class IoTDBLegacyPipeReceiverAgent {
* @param tsFilePipeData pipeData
* @param fileDir path of file data dir
*/
- private void handleTsFilePipeData(TsFilePipeData tsFilePipeData, String
fileDir) {
- String tsFileName = tsFilePipeData.getTsFileName();
- File dir = new File(fileDir);
- File[] targetFiles =
+ private void handleTsFilePipeData(final TsFilePipeData tsFilePipeData, final
String fileDir) {
+ final String tsFileName = tsFilePipeData.getTsFileName();
+ final File dir = new File(fileDir);
+ final File[] targetFiles =
dir.listFiles((dir1, name) -> name.startsWith(tsFileName) &&
name.endsWith(PATCH_SUFFIX));
if (targetFiles != null) {
- for (File targetFile : targetFiles) {
- File newFile =
+ for (final File targetFile : targetFiles) {
+ final File newFile =
new File(
dir,
targetFile
@@ -265,37 +268,37 @@ public class IoTDBLegacyPipeReceiverAgent {
* @throws TException The connection between the sender and the receiver has
not been established
* by {@link IoTDBLegacyPipeReceiverAgent#handshake}
*/
- public TSStatus transportFile(TSyncTransportMetaInfo metaInfo, ByteBuffer
buff)
+ public TSStatus transportFile(final TSyncTransportMetaInfo metaInfo, final
ByteBuffer buff)
throws TException {
// step1. check connection
- SyncIdentityInfo identityInfo = getCurrentSyncIdentityInfo();
+ final SyncIdentityInfo identityInfo = getCurrentSyncIdentityInfo();
if (identityInfo == null) {
throw new TException("Thrift connection is not alive.");
}
LOGGER.debug(
"Invoke transportData method from client ip = {}",
identityInfo.getRemoteAddress());
- String fileDir = getFileDataDir(identityInfo);
- String fileName = metaInfo.fileName;
- long startIndex = metaInfo.startIndex;
- File file = new File(fileDir, fileName + PATCH_SUFFIX);
+ final String fileDir = getFileDataDir(identityInfo);
+ final String fileName = metaInfo.fileName;
+ final long startIndex = metaInfo.startIndex;
+ final File file = new File(fileDir, fileName + PATCH_SUFFIX);
// step2. check startIndex
- IndexCheckResult result = checkStartIndexValid(new File(fileDir,
fileName), startIndex);
+ final IndexCheckResult result = checkStartIndexValid(new File(fileDir,
fileName), startIndex);
if (!result.isResult()) {
return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_REDIRECTION_ERROR,
result.getIndex());
}
// step3. append file
- try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"))
{
- int length = buff.remaining();
+ try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file,
"rw")) {
+ final int length = buff.remaining();
randomAccessFile.seek(startIndex);
- byte[] byteArray = new byte[length];
+ final byte[] byteArray = new byte[length];
buff.get(byteArray);
randomAccessFile.write(byteArray);
recordStartIndex(new File(fileDir, fileName), startIndex + length);
LOGGER.debug("Sync {} start at {} to {} is done.", fileName, startIndex,
startIndex + length);
- } catch (IOException e) {
+ } catch (final IOException e) {
LOGGER.error(e.getMessage());
return RpcUtils.getStatus(TSStatusCode.SYNC_FILE_ERROR, e.getMessage());
}
@@ -303,7 +306,7 @@ public class IoTDBLegacyPipeReceiverAgent {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "");
}
- private IndexCheckResult checkStartIndexValid(File file, long startIndex) {
+ private IndexCheckResult checkStartIndexValid(final File file, final long
startIndex) {
// get local index from memory map
long localIndex = getCurrentFileStartIndex(file.getAbsolutePath());
// get local index from file
@@ -334,10 +337,10 @@ public class IoTDBLegacyPipeReceiverAgent {
*
* @return startIndex of file: -1 if file doesn't exist
*/
- private long getCurrentFileStartIndex(String absolutePath) {
- Long id = currentConnectionId.get();
+ private long getCurrentFileStartIndex(final String absolutePath) {
+ final Long id = currentConnectionId.get();
if (id != null) {
- Map<String, Long> map = connectionIdToStartIndexRecord.get(id);
+ final Map<String, Long> map = connectionIdToStartIndexRecord.get(id);
if (map != null && map.containsKey(absolutePath)) {
return map.get(absolutePath);
}
@@ -345,10 +348,10 @@ public class IoTDBLegacyPipeReceiverAgent {
return -1;
}
- private void recordStartIndex(File file, long position) {
- Long id = currentConnectionId.get();
+ private void recordStartIndex(final File file, final long position) {
+ final Long id = currentConnectionId.get();
if (id != null) {
- Map<String, Long> map =
+ final Map<String, Long> map =
connectionIdToStartIndexRecord.computeIfAbsent(id, i -> new
ConcurrentHashMap<>());
map.put(file.getAbsolutePath(), position);
}
@@ -364,7 +367,7 @@ public class IoTDBLegacyPipeReceiverAgent {
private static final String RECEIVER_DIR_NAME = "receiver";
private static final String FILE_DATA_DIR_NAME = "file-data";
- private static String getFileDataDir(SyncIdentityInfo identityInfo) {
+ private static String getFileDataDir(final SyncIdentityInfo identityInfo) {
return getReceiverPipeDir(
identityInfo.getPipeName(),
identityInfo.getRemoteAddress(),
@@ -373,7 +376,8 @@ public class IoTDBLegacyPipeReceiverAgent {
+ FILE_DATA_DIR_NAME;
}
- private static String getReceiverPipeDir(String pipeName, String remoteIp,
long createTime) {
+ private static String getReceiverPipeDir(
+ final String pipeName, final String remoteIp, final long createTime) {
return getReceiverDir()
+ File.separator
+ String.format("%s-%d-%s", pipeName, createTime, remoteIp);
@@ -395,7 +399,7 @@ public class IoTDBLegacyPipeReceiverAgent {
private final String database;
private final String remoteAddress;
- public SyncIdentityInfo(TSyncIdentityInfo identityInfo, String
remoteAddress) {
+ public SyncIdentityInfo(final TSyncIdentityInfo identityInfo, final String
remoteAddress) {
this.pipeName = identityInfo.getPipeName();
this.createTime = identityInfo.getCreateTime();
this.version = identityInfo.getVersion();
@@ -429,7 +433,7 @@ public class IoTDBLegacyPipeReceiverAgent {
private final boolean result;
private final String index;
- public IndexCheckResult(boolean result, String index) {
+ public IndexCheckResult(final boolean result, final String index) {
this.result = result;
this.index = index;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
index c6e30ea2d59..1f4f63f6d2d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
@@ -57,6 +57,7 @@ import org.apache.tsfile.file.metadata.IDeviceID;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -290,12 +291,7 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
partitionCache.checkAndAutoCreateDatabase(database, isAutoCreate,
userName);
SchemaPartition schemaPartition =
- partitionCache.getSchemaPartition(
- new HashMap<String, List<IDeviceID>>() {
- {
- put(database, deviceIDs);
- }
- });
+ partitionCache.getSchemaPartition(Collections.singletonMap(database,
deviceIDs));
if (null == schemaPartition) {
PathPatternTree tree = new PathPatternTree();
tree.appendPathPattern(new PartialPath(database + "." +
MULTI_LEVEL_PATH_WILDCARD));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
index 64f5fcefdc0..f8e6245afa8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
@@ -514,7 +514,12 @@ public class LoadTsfileAnalyzer implements AutoCloseable {
schemaFetcher,
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && result.status.code !=
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+ && result.status.code !=
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()
+ // In tree model, if the user creates a conflict database
concurrently, for instance, the
+ // database created by user is root.db.ss.a, the auto-creation
failed database is root.db,
+ // we wait till "getOrCreatePartition" to judge if the time series
(like root.db.ss.a.e /
+ // root.db.ss.a) conflicts with the created database. just do not
throw exception here.
+ && result.status.code !=
TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
LOGGER.warn(
"Create database error, statement: {}, result status is: {}",
statement, result.status);
throw new LoadFileException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
index 35134b4cd6d..f88f7c7a1ce 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
@@ -256,10 +256,12 @@ public class PartitionCache {
* @throws RuntimeException if failed to create database
*/
private void createDatabaseAndUpdateCache(
- DatabaseCacheResult<?, ?> result, List<IDeviceID> deviceIDs, String
userName)
+ final DatabaseCacheResult<?, ?> result,
+ final List<IDeviceID> deviceIDs,
+ final String userName)
throws ClientManagerException, MetadataException, TException {
databaseCacheLock.writeLock().lock();
- try (ConfigNodeClient client =
+ try (final ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
// Try to check whether database need to be created
result.reset();
@@ -267,8 +269,8 @@ public class PartitionCache {
getDatabaseMap(result, deviceIDs, false);
if (!result.isSuccess()) {
// Try to get database needed to be created from missed device
- Set<String> databaseNamesNeedCreated = new HashSet<>();
- for (IDeviceID deviceID : result.getMissedDevices()) {
+ final Set<String> databaseNamesNeedCreated = new HashSet<>();
+ for (final IDeviceID deviceID : result.getMissedDevices()) {
PartialPath databaseNameNeedCreated =
MetaUtils.getDatabasePathByLevel(
new PartialPath(deviceID),
config.getDefaultStorageGroupLevel());
@@ -276,12 +278,12 @@ public class PartitionCache {
}
// Try to create databases one by one until done or one database fail
- Set<String> successFullyCreatedDatabase = new HashSet<>();
- for (String databaseName : databaseNamesNeedCreated) {
- long startTime = System.nanoTime();
+ final Set<String> successFullyCreatedDatabase = new HashSet<>();
+ for (final String databaseName : databaseNamesNeedCreated) {
+ final long startTime = System.nanoTime();
try {
if (!AuthorityChecker.SUPER_USER.equals(userName)) {
- TSStatus status =
+ final TSStatus status =
AuthorityChecker.getTSStatus(
AuthorityChecker.checkSystemPermission(
userName, PrivilegeType.MANAGE_DATABASE.ordinal()),
@@ -294,12 +296,18 @@ public class PartitionCache {
} finally {
PerformanceOverviewMetrics.getInstance().recordAuthCost(System.nanoTime() -
startTime);
}
- TDatabaseSchema databaseSchema = new TDatabaseSchema();
+ final TDatabaseSchema databaseSchema = new TDatabaseSchema();
databaseSchema.setName(databaseName);
- TSStatus tsStatus = client.setDatabase(databaseSchema);
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() ==
tsStatus.getCode()) {
+ final TSStatus tsStatus = client.setDatabase(databaseSchema);
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == tsStatus.getCode()
+ || TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() ==
tsStatus.getCode()) {
successFullyCreatedDatabase.add(databaseName);
- } else {
+ // In tree model, if the user creates a conflict database
concurrently, for instance,
+ // the database created by user is root.db.ss.a, the auto-creation
failed database is
+ // root.db, we wait till "getOrCreatePartition" to judge if the
time series (like
+ // root.db.ss.a.e / root.db.ss.a) conflicts with the created
database. just do not throw
+ // exception here.
+ } else if (TSStatusCode.DATABASE_CONFLICT.getStatusCode() !=
tsStatus.getCode()) {
// Try to update cache by databases successfully created
updateDatabaseCache(successFullyCreatedDatabase);
logger.warn(
@@ -325,15 +333,15 @@ public class PartitionCache {
* @param userName the username
* @throws RuntimeException if failed to create database
*/
- private void createDatabaseAndUpdateCache(String database, String userName)
+ private void createDatabaseAndUpdateCache(final String database, final
String userName)
throws ClientManagerException, TException {
databaseCacheLock.writeLock().lock();
- try (ConfigNodeClient client =
+ try (final ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
long startTime = System.nanoTime();
try {
if (!AuthorityChecker.SUPER_USER.equals(userName)) {
- TSStatus status =
+ final TSStatus status =
AuthorityChecker.getTSStatus(
AuthorityChecker.checkSystemPermission(
userName, PrivilegeType.MANAGE_DATABASE.ordinal()),
@@ -345,14 +353,14 @@ public class PartitionCache {
} finally {
PerformanceOverviewMetrics.getInstance().recordAuthCost(System.nanoTime() -
startTime);
}
- TDatabaseSchema databaseSchema = new TDatabaseSchema();
+ final TDatabaseSchema databaseSchema = new TDatabaseSchema();
databaseSchema.setName(database);
- TSStatus tsStatus = client.setDatabase(databaseSchema);
- if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
- // Try to update database cache when database has already been created
- updateDatabaseCache(new
HashSet<>(Collections.singletonList(database)));
- } else {
+ final TSStatus tsStatus = client.setDatabase(databaseSchema);
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() == tsStatus.getCode()
+ || TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() ==
tsStatus.getCode()) {
// Try to update cache by databases successfully created
+ updateDatabaseCache(Collections.singleton(database));
+ } else {
logger.warn(
"[{} Cache] failed to create database {}",
CacheMetrics.DATABASE_CACHE_NAME, database);
throw new RuntimeException(new IoTDBException(tsStatus.message,
tsStatus.code));