This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6f83353c2f3 [IOTDB-6179] Load: remove uncessary create schema logic
when loading tsfiles (which allows users without create schema permission to
execute load) (#11256)
6f83353c2f3 is described below
commit 6f83353c2f3dede791c60d88eb6496947ecc815f
Author: Itami Sho <[email protected]>
AuthorDate: Tue Oct 10 16:01:24 2023 +0800
[IOTDB-6179] Load: remove uncessary create schema logic when loading
tsfiles (which allows users without create schema permission to execute load)
(#11256)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../plan/analyze/LoadTsfileAnalyzer.java | 57 +++++++++++++++++++---
1 file changed, 50 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
index e6e3bd419ac..6a64e7cc7e1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsfileAnalyzer.java
@@ -22,16 +22,25 @@ package org.apache.iotdb.db.queryengine.plan.analyze;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.VerifyMetadataException;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.schematree.DeviceSchemaInfo;
@@ -43,10 +52,12 @@ import
org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.utils.FileLoaderUtils;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
+import org.apache.iotdb.db.utils.constant.SqlConstant;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.TimeseriesMetadata;
@@ -57,12 +68,14 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -78,6 +91,9 @@ public class LoadTsfileAnalyzer {
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+ private static final IClientManager<ConfigRegionId, ConfigNodeClient>
CONFIG_NODE_CLIENT_MANAGER =
+ ConfigNodeClientManager.getInstance();
+
private final LoadTsFileStatement loadTsFileStatement;
private final MPPQueryContext context;
@@ -253,13 +269,14 @@ public class LoadTsfileAnalyzer {
private final class SchemaAutoCreatorAndVerifier {
private final Map<String, Boolean> tsfileDevice2IsAligned = new
HashMap<>();
- private final Set<PartialPath> alreadySetDatabases = new HashSet<>();
private final int maxTimeseriesNumberPerBatch =
CONFIG.getMaxLoadingTimeseriesNumber();
private final Map<String, Set<MeasurementSchema>>
currentBatchDevice2TimeseriesSchemas =
new HashMap<>();
private int currentBatchTimeseriesCount = 0;
+ private final Set<PartialPath> alreadySetDatabases = new HashSet<>();
+
private SchemaAutoCreatorAndVerifier() {}
public void autoCreateAndVerify(
@@ -394,7 +411,7 @@ public class LoadTsfileAnalyzer {
private void autoCreateDatabase()
throws VerifyMetadataException, LoadFileException,
IllegalPathException {
final int databasePrefixNodesLength =
loadTsFileStatement.getDatabaseLevel() + 1;
- final Set<PartialPath> databaseSet = new HashSet<>();
+ final Set<PartialPath> databasesNeededToBeSet = new HashSet<>();
for (final String device :
currentBatchDevice2TimeseriesSchemas.keySet()) {
final PartialPath devicePath = new PartialPath(device);
@@ -410,28 +427,54 @@ public class LoadTsfileAnalyzer {
final String[] databasePrefixNodes = new
String[databasePrefixNodesLength];
System.arraycopy(devicePrefixNodes, 0, databasePrefixNodes, 0,
databasePrefixNodesLength);
- databaseSet.add(new PartialPath(databasePrefixNodes));
+ databasesNeededToBeSet.add(new PartialPath(databasePrefixNodes));
+ }
+
+ // 1. filter out the databases that already exist
+ if (alreadySetDatabases.isEmpty()) {
+ try (final ConfigNodeClient configNodeClient =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ final TGetDatabaseReq req =
+ new TGetDatabaseReq(
+ Arrays.asList(
+ new ShowDatabaseStatement(new
PartialPath(SqlConstant.getSingleRootArray()))
+ .getPathPattern()
+ .getNodes()),
+ SchemaConstant.ALL_MATCH_SCOPE.serialize());
+ final TShowDatabaseResp resp = configNodeClient.showDatabase(req);
+
+ for (final String databaseName : resp.getDatabaseInfoMap().keySet())
{
+ alreadySetDatabases.add(new PartialPath(databaseName));
+ }
+ } catch (IOException | TException | ClientManagerException e) {
+ throw new LoadFileException(e);
+ }
}
+ databasesNeededToBeSet.removeAll(alreadySetDatabases);
- databaseSet.removeAll(alreadySetDatabases);
- for (final PartialPath databasePath : databaseSet) {
+ // 2. create the databases that do not exist
+ for (final PartialPath databasePath : databasesNeededToBeSet) {
final DatabaseSchemaStatement statement =
new
DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
statement.setDatabasePath(databasePath);
// do not print exception log because it is not an error
statement.setEnablePrintExceptionLog(false);
executeSetDatabaseStatement(statement);
+
+ alreadySetDatabases.add(databasePath);
}
- alreadySetDatabases.addAll(databaseSet);
}
private void executeSetDatabaseStatement(Statement statement) throws
LoadFileException {
- final long queryId = SessionManager.getInstance().requestQueryId();
+ // 1.check Authority
TSStatus status =
AuthorityChecker.checkAuthority(statement,
context.getSession().getUserName());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new RuntimeException(new IoTDBException(status.getMessage(),
status.getCode()));
}
+
+ // 2.execute setDatabase statement
+ final long queryId = SessionManager.getInstance().requestQueryId();
final ExecutionResult result =
Coordinator.getInstance()
.execute(