This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 82624e21d5f Load: auto detect TsFile's model (tree/table) (#14751)
82624e21d5f is described below
commit 82624e21d5f689d6910728d055d331cf73dad37a
Author: Zikun Ma <[email protected]>
AuthorDate: Mon Feb 24 18:54:48 2025 +0800
Load: auto detect TsFile's model (tree/table) (#14751)
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 +
.../protocol/thrift/IoTDBDataNodeReceiver.java | 10 +-
.../iotdb/db/protocol/session/SessionManager.java | 12 +
.../queryengine/plan/analyze/AnalyzeVisitor.java | 29 +-
.../plan/analyze/load/LoadTsFileAnalyzer.java | 558 ++++++++++++++++-----
.../load/LoadTsFileToTableModelAnalyzer.java | 226 ---------
.../load/LoadTsFileToTreeModelAnalyzer.java | 187 -------
.../load/TreeSchemaAutoCreatorAndVerifier.java | 4 +-
.../analyze/schema/ClusterSchemaFetchExecutor.java | 4 +-
.../plan/planner/LogicalPlanVisitor.java | 4 +-
.../relational/analyzer/StatementAnalyzer.java | 27 +-
.../plan/relational/planner/RelationPlanner.java | 3 +-
.../plan/relational/sql/ast/LoadTsFile.java | 14 +-
.../plan/statement/crud/LoadTsFileStatement.java | 18 +-
.../load/config/LoadTsFileConfigurator.java | 24 -
16 files changed, 501 insertions(+), 636 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 2ff675b0196..fd42c57ca1c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1165,6 +1165,7 @@ public class IoTDBConfig {
/** Load related */
private double maxAllocateMemoryRatioForLoad = 0.8;
+ private int loadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount = 4096;
private int loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber = 4096;
private int loadTsFileAnalyzeSchemaBatchFlushTableDeviceNumber = 4096; //
For table model
private long loadTsFileAnalyzeSchemaMemorySizeInBytes =
@@ -4065,6 +4066,16 @@ public class IoTDBConfig {
this.maxAllocateMemoryRatioForLoad = maxAllocateMemoryRatioForLoad;
}
+ public int getLoadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount() {
+ return loadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount;
+ }
+
+ public void setLoadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount(
+ int loadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount) {
+ this.loadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount =
+ loadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount;
+ }
+
public int getLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber() {
return loadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 8184435eadd..ca42b712106 100755
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2410,6 +2410,12 @@ public class IoTDBDescriptor {
properties.getProperty(
"max_allocate_memory_ratio_for_load",
String.valueOf(conf.getMaxAllocateMemoryRatioForLoad()))));
+ conf.setLoadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount(
+ Integer.parseInt(
+ properties.getProperty(
+
"load_tsfile_analyze_schema_batch_read_time_series_metadata_count",
+ String.valueOf(
+
conf.getLoadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount()))));
conf.setLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber(
Integer.parseInt(
properties.getProperty(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 0782362b2c1..14102c42c07 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -93,7 +93,6 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
-import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.db.tools.schema.SRStatementGenerator;
@@ -596,11 +595,6 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
statement.setConvertOnTypeMismatch(true);
statement.setVerifySchema(validateTsFile.get());
statement.setAutoCreateDatabase(false);
-
- statement.setModel(
- dataBaseName != null
- ? LoadTsFileConfigurator.MODEL_TABLE_VALUE
- : LoadTsFileConfigurator.MODEL_TREE_VALUE);
statement.setDatabase(dataBaseName);
return executeStatementAndClassifyExceptions(statement);
@@ -848,9 +842,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
final boolean isTableModelStatement;
final String databaseName;
if (statement instanceof LoadTsFileStatement
- && ((LoadTsFileStatement) statement)
- .getModel()
- .equals(LoadTsFileConfigurator.MODEL_TABLE_VALUE)) {
+ && ((LoadTsFileStatement) statement).getDatabase() != null) {
isTableModelStatement = true;
databaseName = ((LoadTsFileStatement) statement).getDatabase();
} else if (statement instanceof InsertBaseStatement
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
index 8eb64cf0d87..764d5e9fb25 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java
@@ -385,6 +385,18 @@ public class SessionManager implements SessionManagerMBean
{
session.getSqlDialect());
}
+ // Sometimes we need to switch from table model to tree model,
+ // e.g., when loading a tree model TsFile under table model dialect.
+ public SessionInfo copySessionInfoForTreeModel(final SessionInfo
sessionInfo) {
+ return new SessionInfo(
+ sessionInfo.getSessionId(),
+ sessionInfo.getUserName(),
+ ZoneId.systemDefault(),
+ sessionInfo.getVersion(),
+ sessionInfo.getDatabaseName().orElse(null),
+ IClientSession.SqlDialect.TREE);
+ }
+
public SessionInfo getSessionInfoOfTableModel(IClientSession session) {
return new SessionInfo(
session.getId(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 9977f1dd02d..7f51f95c362 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -65,8 +65,6 @@ import
org.apache.iotdb.db.queryengine.execution.operator.window.ainode.Inferenc
import
org.apache.iotdb.db.queryengine.execution.operator.window.ainode.TailInferenceWindow;
import org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet;
import org.apache.iotdb.db.queryengine.plan.analyze.load.LoadTsFileAnalyzer;
-import
org.apache.iotdb.db.queryengine.plan.analyze.load.LoadTsFileToTableModelAnalyzer;
-import
org.apache.iotdb.db.queryengine.plan.analyze.load.LoadTsFileToTreeModelAnalyzer;
import
org.apache.iotdb.db.queryengine.plan.analyze.lock.DataNodeSchemaLockManager;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
@@ -79,7 +77,6 @@ import
org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import
org.apache.iotdb.db.queryengine.plan.expression.visitor.ExistUnknownTypeInExpression;
-import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.MeasurementGroup;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.FillDescriptor;
@@ -154,7 +151,6 @@ import
org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowVersionStatement;
import org.apache.iotdb.db.schemaengine.template.Template;
-import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator;
import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.utils.constant.SqlConstant;
import org.apache.iotdb.rpc.RpcUtils;
@@ -3005,7 +3001,9 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
context.setQueryType(QueryType.WRITE);
final long startTime = System.nanoTime();
- try (final LoadTsFileAnalyzer loadTsFileAnalyzer =
getAnalyzer(loadTsFileStatement, context)) {
+ try (final LoadTsFileAnalyzer loadTsFileAnalyzer =
+ new LoadTsFileAnalyzer(
+ loadTsFileStatement, loadTsFileStatement.isGeneratedByPipe(),
context)) {
return (Analysis) loadTsFileAnalyzer.analyzeFileByFile(new Analysis());
} catch (final Exception e) {
final String exceptionMessage =
@@ -3024,27 +3022,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
}
- private LoadTsFileAnalyzer getAnalyzer(
- LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) {
- if (Objects.equals(loadTsFileStatement.getModel(),
LoadTsFileConfigurator.MODEL_TREE_VALUE)) {
- // Load to tree-model
- return new LoadTsFileToTreeModelAnalyzer(
- loadTsFileStatement, loadTsFileStatement.isGeneratedByPipe(),
context);
- } else {
- // Load to table-model
- if (Objects.nonNull(loadTsFileStatement.getDatabase())) {
- return new LoadTsFileToTableModelAnalyzer(
- loadTsFileStatement,
- loadTsFileStatement.isGeneratedByPipe(),
- LocalExecutionPlanner.getInstance().metadata,
- context);
- } else {
- throw new SemanticException(
- "Database name must be specified when loading data into the table
model.");
- }
- }
- }
-
private boolean analyzeTimeseriesRegionScan(
WhereCondition timeCondition,
PathPatternTree patternTree,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 010fdf84bf5..32709f13ca8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -22,107 +22,225 @@ package org.apache.iotdb.db.queryengine.plan.analyze.load;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.load.LoadAnalyzeException;
import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException;
+import org.apache.iotdb.db.exception.load.LoadEmptyFileException;
import org.apache.iotdb.db.exception.load.LoadReadOnlyException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
+import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
+import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.io.FileUtils;
import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.encrypt.EncryptParameter;
+import org.apache.tsfile.encrypt.EncryptUtils;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.TsFileSequenceReaderTimeseriesMetadataIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.BufferUnderflowException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
-public abstract class LoadTsFileAnalyzer implements AutoCloseable {
+import static
org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.DATABASE_NOT_SPECIFIED;
+import static
org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.validateDatabaseName;
+
+public class LoadTsFileAnalyzer implements AutoCloseable {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTsFileAnalyzer.class);
- // These are only used when constructed from tree model SQL
- private final LoadTsFileStatement loadTsFileTreeStatement;
- // These are only used when constructed from table model SQL
- private final LoadTsFile loadTsFileTableStatement;
+ final IPartitionFetcher partitionFetcher =
ClusterPartitionFetcher.getInstance();
+ final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();
+ private final Metadata metadata =
LocalExecutionPlanner.getInstance().metadata;
+ private final AccessControl accessControl =
Coordinator.getInstance().getAccessControl();
- private final boolean isTableModelStatement;
+ final MPPQueryContext context;
+ // Statement related
+ private final LoadTsFileStatement
+ loadTsFileTreeStatement; // only used when constructed from tree model
SQL
+ private final LoadTsFile
+ loadTsFileTableStatement; // only used when constructed from table model
SQL
+ private final boolean
+ isTableModelStatement; // Whether the statement itself is table model or
not (not the TsFiles)
+ private final String statementString;
private final boolean isGeneratedByPipe;
- protected final List<File> tsFiles;
- protected final String statementString;
- protected final boolean isVerifySchema;
-
- protected final boolean isDeleteAfterLoad;
-
- protected final boolean isConvertOnTypeMismatch;
-
- protected final boolean isAutoCreateDatabase;
+ private final List<File> tsFiles;
+ private final List<Boolean> isTableModelTsFile;
+ private int isTableModelTsFileReliableIndex = -1;
- protected final int databaseLevel;
+ // User specified configs
+ private final int databaseLevel;
+ private String databaseForTableData;
+ private final boolean isVerifySchema;
+ private final boolean isDeleteAfterLoad;
+ private final boolean isConvertOnTypeMismatch;
+ private final boolean isAutoCreateDatabase;
- protected final String database;
+ // Schema creators for tree and table
+ private TreeSchemaAutoCreatorAndVerifier treeSchemaAutoCreatorAndVerifier;
+ private LoadTsFileTableSchemaCache tableSchemaCache;
- final MPPQueryContext context;
-
- final IPartitionFetcher partitionFetcher =
ClusterPartitionFetcher.getInstance();
- final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance();
-
- LoadTsFileAnalyzer(
+ public LoadTsFileAnalyzer(
LoadTsFileStatement loadTsFileStatement, boolean isGeneratedByPipe,
MPPQueryContext context) {
+ this.context = context;
+
this.loadTsFileTreeStatement = loadTsFileStatement;
- this.tsFiles = loadTsFileStatement.getTsFiles();
+ this.loadTsFileTableStatement = null;
+ this.isTableModelStatement = false;
this.statementString = loadTsFileStatement.toString();
+ this.isGeneratedByPipe = isGeneratedByPipe;
+
+ this.tsFiles = loadTsFileStatement.getTsFiles();
+ this.isTableModelTsFile = new
ArrayList<>(Collections.nCopies(this.tsFiles.size(), false));
+
+ this.databaseLevel = loadTsFileStatement.getDatabaseLevel();
+ this.databaseForTableData = loadTsFileStatement.getDatabase();
this.isVerifySchema = loadTsFileStatement.isVerifySchema();
this.isDeleteAfterLoad = loadTsFileStatement.isDeleteAfterLoad();
this.isConvertOnTypeMismatch =
loadTsFileStatement.isConvertOnTypeMismatch();
this.isAutoCreateDatabase = loadTsFileStatement.isAutoCreateDatabase();
- this.databaseLevel = loadTsFileStatement.getDatabaseLevel();
- this.database = loadTsFileStatement.getDatabase();
-
- this.loadTsFileTableStatement = null;
- this.isTableModelStatement = false;
- this.isGeneratedByPipe = isGeneratedByPipe;
- this.context = context;
}
- LoadTsFileAnalyzer(
+ public LoadTsFileAnalyzer(
LoadTsFile loadTsFileTableStatement, boolean isGeneratedByPipe,
MPPQueryContext context) {
+ this.context = context;
+
+ this.loadTsFileTreeStatement = null;
this.loadTsFileTableStatement = loadTsFileTableStatement;
- this.tsFiles = loadTsFileTableStatement.getTsFiles();
+ this.isTableModelStatement = true;
this.statementString = loadTsFileTableStatement.toString();
+ this.isGeneratedByPipe = isGeneratedByPipe;
+
+ this.tsFiles = loadTsFileTableStatement.getTsFiles();
+ this.isTableModelTsFile = new
ArrayList<>(Collections.nCopies(this.tsFiles.size(), false));
+
+ this.databaseLevel = loadTsFileTableStatement.getDatabaseLevel();
+ this.databaseForTableData = loadTsFileTableStatement.getDatabase();
this.isVerifySchema = loadTsFileTableStatement.isVerifySchema();
this.isDeleteAfterLoad = loadTsFileTableStatement.isDeleteAfterLoad();
this.isConvertOnTypeMismatch =
loadTsFileTableStatement.isConvertOnTypeMismatch();
this.isAutoCreateDatabase =
loadTsFileTableStatement.isAutoCreateDatabase();
- this.databaseLevel = loadTsFileTableStatement.getDatabaseLevel();
- this.database = loadTsFileTableStatement.getDatabase();
+ }
- this.loadTsFileTreeStatement = null;
- this.isTableModelStatement = true;
- this.isGeneratedByPipe = isGeneratedByPipe;
- this.context = context;
+ protected String getStatementString() {
+ return statementString;
}
- public abstract IAnalysis analyzeFileByFile(IAnalysis analysis);
+ protected boolean isVerifySchema() {
+ return isVerifySchema;
+ }
+
+ protected boolean isConvertOnTypeMismatch() {
+ return isConvertOnTypeMismatch;
+ }
- protected boolean doAnalyzeFileByFile(IAnalysis analysis) {
+ protected boolean isAutoCreateDatabase() {
+ return isAutoCreateDatabase;
+ }
+
+ protected int getDatabaseLevel() {
+ return databaseLevel;
+ }
+
+ public IAnalysis analyzeFileByFile(IAnalysis analysis) {
+ checkBeforeAnalyzeFileByFile(analysis);
+ if (analysis.isFinishQueryAfterAnalyze()) {
+ return analysis;
+ }
+
+ if (!doAnalyzeFileByFile(analysis)) {
+ // return false means the analysis is failed because of exception
+ return analysis;
+ }
+
+ try {
+ // flush remaining metadata of tree-model, currently no need for
table-model
+ if (treeSchemaAutoCreatorAndVerifier != null) {
+ treeSchemaAutoCreatorAndVerifier.flush();
+ }
+ } catch (AuthException e) {
+ setFailAnalysisForAuthException(analysis, e);
+ return analysis;
+ } catch (LoadAnalyzeException e) {
+ executeTabletConversion(analysis, e);
+ return analysis;
+ } catch (Exception e) {
+ final String exceptionMessage =
+ String.format(
+ "Auto create or verify schema error when executing statement %s.
Detail: %s.",
+ getStatementString(),
+ e.getMessage() == null ? e.getClass().getName() :
e.getMessage());
+ LOGGER.warn(exceptionMessage, e);
+ analysis.setFinishQueryAfterAnalyze(true);
+ analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR,
exceptionMessage));
+ return analysis;
+ }
+
+ LOGGER.info("Load - Analysis Stage: all tsfiles have been analyzed.");
+
+ // data partition will be queried in the scheduler
+ setRealStatement(analysis);
+ setTsFileModelInfoToStatement();
+ return analysis;
+ }
+
+ private void checkBeforeAnalyzeFileByFile(IAnalysis analysis) {
+ if (TSFileDescriptor.getInstance().getConfig().getEncryptFlag()) {
+ analysis.setFinishQueryAfterAnalyze(true);
+ analysis.setFailStatus(
+ RpcUtils.getStatus(
+ TSStatusCode.LOAD_FILE_ERROR,
+ "TSFile encryption is enabled, and the Load TSFile function is
disabled"));
+ return;
+ }
+
+ // check if the system is read only
+ if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
+ analysis.setFinishQueryAfterAnalyze(true);
+ analysis.setFailStatus(
+ RpcUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY,
LoadReadOnlyException.MESSAGE));
+ }
+ }
+
+ private boolean doAnalyzeFileByFile(IAnalysis analysis) {
// analyze tsfile metadata file by file
for (int i = 0, tsfileNum = tsFiles.size(); i < tsfileNum; i++) {
final File tsFile = tsFiles.get(i);
@@ -140,7 +258,7 @@ public abstract class LoadTsFileAnalyzer implements
AutoCloseable {
}
try {
- analyzeSingleTsFile(tsFile);
+ analyzeSingleTsFile(tsFile, i);
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"Load - Analysis Stage: {}/{} tsfiles have been analyzed,
progress: {}%",
@@ -175,54 +293,163 @@ public abstract class LoadTsFileAnalyzer implements
AutoCloseable {
return true;
}
- protected abstract void analyzeSingleTsFile(final File tsFile)
- throws IOException, AuthException, LoadAnalyzeException;
+ private void analyzeSingleTsFile(final File tsFile, int i)
+ throws IOException, AuthException, LoadAnalyzeException {
+ try (final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
+
+ // check whether the tsfile is tree-model or not
+ final Map<String, TableSchema> tableSchemaMap =
reader.getTableSchemaMap();
+ final boolean isTableModelFile = Objects.nonNull(tableSchemaMap) &&
!tableSchemaMap.isEmpty();
+ LOGGER.info(
+ "TsFile {} is a {}-model file.", tsFile.getPath(), isTableModelFile
? "table" : "tree");
+
+ // can be reused when constructing tsfile resource
+ final TsFileSequenceReaderTimeseriesMetadataIterator
timeseriesMetadataIterator =
+ new TsFileSequenceReaderTimeseriesMetadataIterator(
+ reader,
+ !isTableModelFile, // currently we only need chunk metadata for
tree model files
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+
.getLoadTsFileAnalyzeSchemaBatchReadTimeSeriesMetadataCount());
+
+ // check if the tsfile is empty
+ if (!timeseriesMetadataIterator.hasNext()) {
+ throw new LoadEmptyFileException(tsFile.getAbsolutePath());
+ }
- protected void executeTabletConversion(final IAnalysis analysis, final
LoadAnalyzeException e) {
- final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter =
- new LoadTsFileDataTypeConverter(isGeneratedByPipe);
+ // check whether the encrypt type of the tsfile is supported
+ final EncryptParameter param = reader.getEncryptParam();
+ if (!Objects.equals(param.getType(), EncryptUtils.encryptParam.getType())
+ || !Arrays.equals(param.getKey(),
EncryptUtils.encryptParam.getKey())) {
+ throw new SemanticException("The encryption way of the TsFile is not
supported.");
+ }
- final TSStatus status =
- (!(e instanceof LoadAnalyzeTypeMismatchException) ||
isConvertOnTypeMismatch)
- ? (isTableModelStatement
- ? loadTsFileDataTypeConverter
- .convertForTableModel(loadTsFileTableStatement)
- .orElse(null)
- : loadTsFileDataTypeConverter
- .convertForTreeModel(loadTsFileTreeStatement)
- .orElse(null))
- : null;
-
- if (status == null) {
- LOGGER.warn(
- "Load: Failed to convert to tablets from statement {}. Status is
null.",
- isTableModelStatement ? loadTsFileTableStatement :
loadTsFileTreeStatement);
- analysis.setFailStatus(
- new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
- } else if (!loadTsFileDataTypeConverter.isSuccessful(status)) {
- LOGGER.warn(
- "Load: Failed to convert to tablets from statement {}. Status: {}",
- isTableModelStatement ? loadTsFileTableStatement :
loadTsFileTreeStatement,
- status);
- analysis.setFailStatus(status);
+ this.isTableModelTsFile.set(i, isTableModelFile);
+ this.isTableModelTsFileReliableIndex = i;
+
+ if (isTableModelFile) {
+ doAnalyzeSingleTableFile(tsFile, reader, timeseriesMetadataIterator,
tableSchemaMap);
+ } else {
+ doAnalyzeSingleTreeFile(tsFile, reader, timeseriesMetadataIterator);
+ }
+ } catch (final LoadEmptyFileException loadEmptyFileException) {
+ LOGGER.warn("Empty file detected, will skip loading this file: {}",
tsFile.getAbsolutePath());
+ if (isDeleteAfterLoad) {
+ FileUtils.deleteQuietly(tsFile);
+ }
}
- analysis.setFinishQueryAfterAnalyze(true);
- setRealStatement(analysis);
}
- protected void setRealStatement(IAnalysis analysis) {
- if (isTableModelStatement) {
- // Do nothing by now.
- } else {
- analysis.setRealStatement(loadTsFileTreeStatement);
+ private void doAnalyzeSingleTreeFile(
+ final File tsFile,
+ final TsFileSequenceReader reader,
+ final TsFileSequenceReaderTimeseriesMetadataIterator
timeseriesMetadataIterator)
+ throws IOException, LoadAnalyzeException, AuthException {
+ // construct tsfile resource
+ final TsFileResource tsFileResource = constructTsFileResource(reader,
tsFile);
+
+ long writePointCount = 0;
+
+
getOrCreateTreeSchemaVerifier().setCurrentModificationsAndTimeIndex(tsFileResource,
reader);
+
+ final boolean isAutoCreateSchemaOrVerifySchemaEnabled =
+ IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()
|| isVerifySchema();
+ while (timeseriesMetadataIterator.hasNext()) {
+ final Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadata
=
+ timeseriesMetadataIterator.next();
+
+ if (isAutoCreateSchemaOrVerifySchemaEnabled) {
+ getOrCreateTreeSchemaVerifier().autoCreateAndVerify(reader,
device2TimeseriesMetadata);
+ }
+
+ if (!tsFileResource.resourceFileExists()) {
+ TsFileResourceUtils.updateTsFileResource(device2TimeseriesMetadata,
tsFileResource);
+ }
+
+ // TODO: how to get the correct write point count when
+ // !isAutoCreateSchemaOrVerifySchemaEnabled
+ writePointCount += getWritePointCount(device2TimeseriesMetadata);
}
+ if (isAutoCreateSchemaOrVerifySchemaEnabled) {
+
getOrCreateTreeSchemaVerifier().flushAndClearDeviceIsAlignedCacheIfNecessary();
+ }
+
+
TimestampPrecisionUtils.checkTimestampPrecision(tsFileResource.getFileEndTime());
+ tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+
+ addTsFileResource(tsFileResource);
+ addWritePointCount(writePointCount);
}
- protected String getStatementString() {
- return statementString;
+ private void doAnalyzeSingleTableFile(
+ final File tsFile,
+ final TsFileSequenceReader reader,
+ final TsFileSequenceReaderTimeseriesMetadataIterator
timeseriesMetadataIterator,
+ final Map<String, TableSchema> tableSchemaMap)
+ throws IOException, LoadAnalyzeException {
+ // construct tsfile resource
+ final TsFileResource tsFileResource = constructTsFileResource(reader,
tsFile);
+
+ long writePointCount = 0;
+
+ if (Objects.isNull(databaseForTableData)) {
+ // If database is not specified, use the database from current session.
+ // If still not specified, throw an exception.
+ final Optional<String> dbName = context.getDatabaseName();
+ if (dbName.isPresent()) {
+ databaseForTableData = dbName.get();
+ if (isTableModelStatement) {
+ loadTsFileTableStatement.setDatabase(dbName.get());
+ } else {
+ loadTsFileTreeStatement.setDatabase(dbName.get());
+ }
+ } else {
+ throw new SemanticException(DATABASE_NOT_SPECIFIED);
+ }
+ }
+
+ autoCreateTableDatabaseIfAbsent(databaseForTableData);
+
+ getOrCreateTableSchemaCache().setDatabase(databaseForTableData);
+
getOrCreateTableSchemaCache().setCurrentModificationsAndTimeIndex(tsFileResource,
reader);
+
+ for (Map.Entry<String, org.apache.tsfile.file.metadata.TableSchema>
name2Schema :
+ tableSchemaMap.entrySet()) {
+ final
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema fileSchema
=
+ org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema
+ .fromTsFileTableSchema(name2Schema.getKey(),
name2Schema.getValue());
+ getOrCreateTableSchemaCache().createTable(fileSchema, context, metadata);
+ accessControl.checkCanInsertIntoTable(
+ context.getSession().getUserName(),
+ new QualifiedObjectName(databaseForTableData, name2Schema.getKey()));
+ }
+
+ while (timeseriesMetadataIterator.hasNext()) {
+ final Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadata
=
+ timeseriesMetadataIterator.next();
+
+ for (IDeviceID deviceId : device2TimeseriesMetadata.keySet()) {
+ getOrCreateTableSchemaCache().autoCreateAndVerify(deviceId);
+ }
+
+ if (!tsFileResource.resourceFileExists()) {
+ TsFileResourceUtils.updateTsFileResource(device2TimeseriesMetadata,
tsFileResource);
+ }
+
+ writePointCount += getWritePointCount(device2TimeseriesMetadata);
+ }
+
+ getOrCreateTableSchemaCache().flush();
+ getOrCreateTableSchemaCache().clearIdColumnMapper();
+
+
TimestampPrecisionUtils.checkTimestampPrecision(tsFileResource.getFileEndTime());
+ tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+
+ addTsFileResource(tsFileResource);
+ addWritePointCount(writePointCount);
}
- protected TsFileResource constructTsFileResource(
+ private TsFileResource constructTsFileResource(
final TsFileSequenceReader reader, final File tsFile) throws IOException
{
final TsFileResource tsFileResource = new TsFileResource(tsFile);
if (!tsFileResource.resourceFileExists()) {
@@ -235,7 +462,44 @@ public abstract class LoadTsFileAnalyzer implements
AutoCloseable {
return tsFileResource;
}
- protected void addTsFileResource(TsFileResource tsFileResource) {
+ private TreeSchemaAutoCreatorAndVerifier getOrCreateTreeSchemaVerifier() {
+ if (treeSchemaAutoCreatorAndVerifier == null) {
+ treeSchemaAutoCreatorAndVerifier = new
TreeSchemaAutoCreatorAndVerifier(this);
+ }
+ return treeSchemaAutoCreatorAndVerifier;
+ }
+
+ private LoadTsFileTableSchemaCache getOrCreateTableSchemaCache() {
+ if (tableSchemaCache == null) {
+ tableSchemaCache = new LoadTsFileTableSchemaCache(metadata, context);
+ }
+ return tableSchemaCache;
+ }
+
+ private void autoCreateTableDatabaseIfAbsent(final String database) throws
LoadAnalyzeException {
+ validateDatabaseName(database);
+ if (DataNodeTableCache.getInstance().isDatabaseExist(database)) {
+ return;
+ }
+
+ final CreateDBTask task =
+ new CreateDBTask(new TDatabaseSchema(database).setIsTableModel(true),
true);
+ try {
+ final ListenableFuture<ConfigTaskResult> future =
+ task.execute(ClusterConfigTaskExecutor.getInstance());
+ final ConfigTaskResult result = future.get();
+ if (result.getStatusCode().getStatusCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new LoadAnalyzeException(
+ String.format(
+ "Auto create database failed: %s, status code: %s",
+ database, result.getStatusCode()));
+ }
+ } catch (final Exception e) {
+ throw new LoadAnalyzeException("Auto create database failed because: " +
e.getMessage());
+ }
+ }
+
+ private void addTsFileResource(TsFileResource tsFileResource) {
if (isTableModelStatement) {
loadTsFileTableStatement.addTsFileResource(tsFileResource);
} else {
@@ -243,7 +507,15 @@ public abstract class LoadTsFileAnalyzer implements
AutoCloseable {
}
}
- protected void addWritePointCount(long writePointCount) {
+ private static long getWritePointCount(
+ Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadata) {
+ return device2TimeseriesMetadata.values().stream()
+ .flatMap(List::stream)
+ .mapToLong(t -> t.getStatistics().getCount())
+ .sum();
+ }
+
+ private void addWritePointCount(long writePointCount) {
if (isTableModelStatement) {
loadTsFileTableStatement.addWritePointCount(writePointCount);
} else {
@@ -251,51 +523,111 @@ public abstract class LoadTsFileAnalyzer implements
AutoCloseable {
}
}
- protected boolean isVerifySchema() {
- return isVerifySchema;
+ private void setRealStatement(IAnalysis analysis) {
+ if (isTableModelStatement) {
+ // Do nothing by now.
+ } else {
+ analysis.setRealStatement(loadTsFileTreeStatement);
+ }
}
- protected boolean isConvertOnTypeMismatch() {
- return isConvertOnTypeMismatch;
+ private void setTsFileModelInfoToStatement() {
+ if (isTableModelStatement) {
+ this.loadTsFileTableStatement.setIsTableModel(this.isTableModelTsFile);
+ } else {
+ this.loadTsFileTreeStatement.setIsTableModel(this.isTableModelTsFile);
+ }
}
- protected boolean isAutoCreateDatabase() {
- return isAutoCreateDatabase;
- }
+ private void executeTabletConversion(final IAnalysis analysis, final
LoadAnalyzeException e) {
+ if (isTableModelTsFileReliableIndex < tsFiles.size() - 1) {
+ try {
+ getFileModelInfoBeforeTabletConversion();
+ } catch (Exception e1) {
+ LOGGER.warn(
+ "Load: Failed to convert to tablets from statement {} because
failed to read model info from file, message: {}.",
+ isTableModelStatement ? loadTsFileTableStatement :
loadTsFileTreeStatement,
+ e1.getMessage());
+ analysis.setFailStatus(
+ new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
+ analysis.setFinishQueryAfterAnalyze(true);
+ setRealStatement(analysis);
+ return;
+ }
+ }
- protected int getDatabaseLevel() {
- return databaseLevel;
+ final LoadTsFileDataTypeConverter loadTsFileDataTypeConverter =
+ new LoadTsFileDataTypeConverter(isGeneratedByPipe);
+
+ for (int i = 0; i < tsFiles.size(); i++) {
+ try {
+ final TSStatus status =
+ (!(e instanceof LoadAnalyzeTypeMismatchException) ||
isConvertOnTypeMismatch)
+ ? (isTableModelTsFile.get(i)
+ ? loadTsFileDataTypeConverter
+ .convertForTableModel(
+ new LoadTsFile(null, tsFiles.get(i).getPath(),
Collections.emptyMap())
+ .setDatabase(databaseForTableData))
+ .orElse(null)
+ : loadTsFileDataTypeConverter
+ .convertForTreeModel(new
LoadTsFileStatement(tsFiles.get(i).getPath()))
+ .orElse(null))
+ : null;
+
+ if (status == null) {
+ LOGGER.warn(
+ "Load: Failed to convert to tablets from statement {}. Status is
null.",
+ isTableModelStatement ? loadTsFileTableStatement :
loadTsFileTreeStatement);
+ analysis.setFailStatus(
+ new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode())
+ .setMessage(e.getMessage()));
+ break;
+ } else if (!loadTsFileDataTypeConverter.isSuccessful(status)) {
+ LOGGER.warn(
+ "Load: Failed to convert to tablets from statement {}. Status:
{}",
+ isTableModelStatement ? loadTsFileTableStatement :
loadTsFileTreeStatement,
+ status);
+ analysis.setFailStatus(status);
+ break;
+ }
+ } catch (final Exception e2) {
+ LOGGER.warn(
+ "Load: Failed to convert to tablets from statement {} because
exception: {}",
+ isTableModelStatement ? loadTsFileTableStatement :
loadTsFileTreeStatement,
+ e2.getMessage());
+ analysis.setFailStatus(
+ new
TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()).setMessage(e.getMessage()));
+ break;
+ }
+ }
+
+ analysis.setFinishQueryAfterAnalyze(true);
+ setRealStatement(analysis);
}
- protected long getWritePointCount(
- Map<IDeviceID, List<TimeseriesMetadata>> device2TimeseriesMetadata) {
- return device2TimeseriesMetadata.values().stream()
- .flatMap(List::stream)
- .mapToLong(t -> t.getStatistics().getCount())
- .sum();
+ private void getFileModelInfoBeforeTabletConversion() throws IOException {
+ for (int i = isTableModelTsFileReliableIndex + 1; i < tsFiles.size(); i++)
{
+ try (final TsFileSequenceReader reader =
+ new TsFileSequenceReader(tsFiles.get(i).getAbsolutePath(), true)) {
+ final Map<String, TableSchema> tableSchemaMap =
reader.getTableSchemaMap();
+ isTableModelTsFile.set(i, Objects.nonNull(tableSchemaMap) &&
!tableSchemaMap.isEmpty());
+ isTableModelTsFileReliableIndex = i;
+ }
+ }
}
- protected void setFailAnalysisForAuthException(IAnalysis analysis,
AuthException e) {
+ private void setFailAnalysisForAuthException(IAnalysis analysis,
AuthException e) {
analysis.setFinishQueryAfterAnalyze(true);
analysis.setFailStatus(RpcUtils.getStatus(e.getCode(), e.getMessage()));
}
- protected void checkBeforeAnalyzeFileByFile(IAnalysis analysis) {
- if (TSFileDescriptor.getInstance().getConfig().getEncryptFlag()) {
- analysis.setFinishQueryAfterAnalyze(true);
- analysis.setFailStatus(
- RpcUtils.getStatus(
- TSStatusCode.LOAD_FILE_ERROR,
- "TSFile encryption is enabled, and the Load TSFile function is
disabled"));
- return;
+ @Override
+ public void close() throws Exception {
+ if (treeSchemaAutoCreatorAndVerifier != null) {
+ treeSchemaAutoCreatorAndVerifier.close();
}
-
- // check if the system is read only
- if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
- analysis.setFinishQueryAfterAnalyze(true);
- analysis.setFailStatus(
- RpcUtils.getStatus(TSStatusCode.SYSTEM_READ_ONLY,
LoadReadOnlyException.MESSAGE));
- return;
+ if (tableSchemaCache != null) {
+ tableSchemaCache.close();
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java
deleted file mode 100644
index ce4dd784c9a..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.queryengine.plan.analyze.load;
-
-import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
-import org.apache.iotdb.db.exception.load.LoadAnalyzeException;
-import org.apache.iotdb.db.exception.load.LoadEmptyFileException;
-import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
-import org.apache.iotdb.db.queryengine.plan.Coordinator;
-import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
-import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
-import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
-import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask;
-import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
-import
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
-import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
-import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
-import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
-import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
-import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.commons.io.FileUtils;
-import org.apache.tsfile.encrypt.EncryptParameter;
-import org.apache.tsfile.encrypt.EncryptUtils;
-import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.TimeseriesMetadata;
-import org.apache.tsfile.read.TsFileSequenceReader;
-import org.apache.tsfile.read.TsFileSequenceReaderTimeseriesMetadataIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutionException;
-
-import static
org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.validateDatabaseName;
-
-public class LoadTsFileToTableModelAnalyzer extends LoadTsFileAnalyzer {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(LoadTsFileToTableModelAnalyzer.class);
-
- private final Metadata metadata;
-
- private final LoadTsFileTableSchemaCache schemaCache;
- private final AccessControl accessControl =
Coordinator.getInstance().getAccessControl();
-
- public LoadTsFileToTableModelAnalyzer(
- LoadTsFileStatement loadTsFileStatement,
- boolean isGeneratedByPipe,
- Metadata metadata,
- MPPQueryContext context) {
- super(loadTsFileStatement, isGeneratedByPipe, context);
- this.metadata = metadata;
- this.schemaCache = new LoadTsFileTableSchemaCache(metadata, context);
- }
-
- public LoadTsFileToTableModelAnalyzer(
- LoadTsFile loadTsFileTableStatement,
- boolean isGeneratedByPipe,
- Metadata metadata,
- MPPQueryContext context) {
- super(loadTsFileTableStatement, isGeneratedByPipe, context);
- this.metadata = metadata;
- this.schemaCache = new LoadTsFileTableSchemaCache(metadata, context);
- }
-
- @Override
- public IAnalysis analyzeFileByFile(IAnalysis analysis) {
- checkBeforeAnalyzeFileByFile(analysis);
- if (analysis.isFinishQueryAfterAnalyze()) {
- return analysis;
- }
-
- try {
- autoCreateDatabaseIfAbsent(database);
- } catch (LoadAnalyzeException e) {
- LOGGER.warn("Auto create database failed: {}", database, e);
- analysis.setFinishQueryAfterAnalyze(true);
- analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR,
e.getMessage()));
- return analysis;
- }
-
- if (!doAnalyzeFileByFile(analysis)) {
- // return false means the analysis is failed because of exception
- return analysis;
- }
-
- LOGGER.info("Load - Analysis Stage: all tsfiles have been analyzed.");
-
- // data partition will be queried in the scheduler
- setRealStatement(analysis);
- return analysis;
- }
-
- @Override
- protected void analyzeSingleTsFile(final File tsFile) throws IOException,
LoadAnalyzeException {
- try (final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
- // can be reused when constructing tsfile resource
- final TsFileSequenceReaderTimeseriesMetadataIterator
timeseriesMetadataIterator =
- new TsFileSequenceReaderTimeseriesMetadataIterator(reader, true, 1);
- final Map<String, org.apache.tsfile.file.metadata.TableSchema>
tableSchemaMap =
- reader.getTableSchemaMap();
-
- // check if the tsfile is empty
- if (!timeseriesMetadataIterator.hasNext()) {
- throw new LoadEmptyFileException(tsFile.getAbsolutePath());
- }
-
- // check whether the tsfile is table-model or not
- if (Objects.isNull(tableSchemaMap) || tableSchemaMap.isEmpty()) {
- throw new SemanticException("Attempted to load a tree-model TsFile
into table-model.");
- }
-
- // check whether the encrypt type of the tsfile is supported
- EncryptParameter param = reader.getEncryptParam();
- if (!Objects.equals(param.getType(), EncryptUtils.encryptParam.getType())
- || !Arrays.equals(param.getKey(),
EncryptUtils.encryptParam.getKey())) {
- throw new SemanticException("The encryption way of the TsFile is not
supported.");
- }
-
- // construct tsfile resource
- final TsFileResource tsFileResource = constructTsFileResource(reader,
tsFile);
-
- schemaCache.setDatabase(database);
- schemaCache.setCurrentModificationsAndTimeIndex(tsFileResource, reader);
-
- for (Map.Entry<String, org.apache.tsfile.file.metadata.TableSchema>
name2Schema :
- tableSchemaMap.entrySet()) {
- final TableSchema fileSchema =
- TableSchema.fromTsFileTableSchema(name2Schema.getKey(),
name2Schema.getValue());
- accessControl.checkCanInsertIntoTable(
- context.getSession().getUserName(),
- new QualifiedObjectName(database, fileSchema.getTableName()));
- schemaCache.createTable(fileSchema, context, metadata);
- }
-
- long writePointCount = 0;
-
- while (timeseriesMetadataIterator.hasNext()) {
- final Map<IDeviceID, List<TimeseriesMetadata>>
device2TimeseriesMetadata =
- timeseriesMetadataIterator.next();
-
- for (IDeviceID deviceId : device2TimeseriesMetadata.keySet()) {
- schemaCache.autoCreateAndVerify(deviceId);
- }
-
- if (!tsFileResource.resourceFileExists()) {
- TsFileResourceUtils.updateTsFileResource(device2TimeseriesMetadata,
tsFileResource);
- }
-
- writePointCount += getWritePointCount(device2TimeseriesMetadata);
- }
-
-
TimestampPrecisionUtils.checkTimestampPrecision(tsFileResource.getFileEndTime());
- tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
-
- addTsFileResource(tsFileResource);
- addWritePointCount(writePointCount);
-
- schemaCache.flush();
- schemaCache.clearIdColumnMapper();
- } catch (final LoadEmptyFileException loadEmptyFileException) {
- LOGGER.warn("Failed to load empty file: {}", tsFile.getAbsolutePath());
- if (isDeleteAfterLoad) {
- FileUtils.deleteQuietly(tsFile);
- }
- }
- }
-
- private void autoCreateDatabaseIfAbsent(final String database) throws
LoadAnalyzeException {
- validateDatabaseName(database);
- if (DataNodeTableCache.getInstance().isDatabaseExist(database)) {
- return;
- }
- accessControl.checkCanCreateDatabase(context.getSession().getUserName(),
database);
- final CreateDBTask task =
- new CreateDBTask(new TDatabaseSchema(database).setIsTableModel(true),
true);
- try {
- final ListenableFuture<ConfigTaskResult> future =
- task.execute(ClusterConfigTaskExecutor.getInstance());
- final ConfigTaskResult result = future.get();
- if (result.getStatusCode().getStatusCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- throw new LoadAnalyzeException(
- String.format(
- "Auto create database failed: %s, status code: %s",
- database, result.getStatusCode()));
- }
- } catch (final ExecutionException | InterruptedException e) {
- throw new LoadAnalyzeException("Auto create database failed because: " +
e.getMessage());
- }
- }
-
- @Override
- public void close() throws Exception {
- schemaCache.close();
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java
deleted file mode 100644
index d212aec6ea5..00000000000
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.queryengine.plan.analyze.load;
-
-import org.apache.iotdb.commons.auth.AuthException;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.load.LoadAnalyzeException;
-import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException;
-import org.apache.iotdb.db.exception.load.LoadEmptyFileException;
-import org.apache.iotdb.db.exception.sql.SemanticException;
-import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
-import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
-import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
-import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
-import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
-import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
-import org.apache.iotdb.rpc.RpcUtils;
-import org.apache.iotdb.rpc.TSStatusCode;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.tsfile.encrypt.EncryptParameter;
-import org.apache.tsfile.encrypt.EncryptUtils;
-import org.apache.tsfile.file.metadata.IDeviceID;
-import org.apache.tsfile.file.metadata.TableSchema;
-import org.apache.tsfile.file.metadata.TimeseriesMetadata;
-import org.apache.tsfile.read.TsFileSequenceReader;
-import org.apache.tsfile.read.TsFileSequenceReaderTimeseriesMetadataIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-public class LoadTsFileToTreeModelAnalyzer extends LoadTsFileAnalyzer {
- private static final Logger LOGGER =
LoggerFactory.getLogger(LoadTsFileToTreeModelAnalyzer.class);
-
- private final TreeSchemaAutoCreatorAndVerifier schemaAutoCreatorAndVerifier;
-
- public LoadTsFileToTreeModelAnalyzer(
- LoadTsFileStatement loadTsFileStatement, boolean isGeneratedByPipe,
MPPQueryContext context) {
- super(loadTsFileStatement, isGeneratedByPipe, context);
- this.schemaAutoCreatorAndVerifier = new
TreeSchemaAutoCreatorAndVerifier(this);
- }
-
- public LoadTsFileToTreeModelAnalyzer(
- LoadTsFile loadTsFileTableStatement, boolean isGeneratedByPipe,
MPPQueryContext context) {
- super(loadTsFileTableStatement, isGeneratedByPipe, context);
- this.schemaAutoCreatorAndVerifier = new
TreeSchemaAutoCreatorAndVerifier(this);
- }
-
- @Override
- public IAnalysis analyzeFileByFile(IAnalysis analysis) {
- checkBeforeAnalyzeFileByFile(analysis);
- if (analysis.isFinishQueryAfterAnalyze()) {
- return analysis;
- }
-
- if (!doAnalyzeFileByFile(analysis)) {
- // return false means the analysis is failed because of exception
- return analysis;
- }
-
- try {
- schemaAutoCreatorAndVerifier.flush();
- } catch (AuthException e) {
- setFailAnalysisForAuthException(analysis, e);
- return analysis;
- } catch (LoadAnalyzeException e) {
- executeTabletConversion(analysis, e);
- return analysis;
- } catch (Exception e) {
- final String exceptionMessage =
- String.format(
- "Auto create or verify schema error when executing statement %s.
Detail: %s.",
- getStatementString(),
- e.getMessage() == null ? e.getClass().getName() :
e.getMessage());
- LOGGER.warn(exceptionMessage, e);
- analysis.setFinishQueryAfterAnalyze(true);
- analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.LOAD_FILE_ERROR,
exceptionMessage));
- return analysis;
- }
-
- LOGGER.info("Load - Analysis Stage: all tsfiles have been analyzed.");
-
- // data partition will be queried in the scheduler
- setRealStatement(analysis);
- return analysis;
- }
-
- @Override
- protected void analyzeSingleTsFile(final File tsFile)
- throws IOException, AuthException, LoadAnalyzeTypeMismatchException {
- try (final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
- // can be reused when constructing tsfile resource
- final TsFileSequenceReaderTimeseriesMetadataIterator
timeseriesMetadataIterator =
- new TsFileSequenceReaderTimeseriesMetadataIterator(reader, true, 1);
-
- // check if the tsfile is empty
- if (!timeseriesMetadataIterator.hasNext()) {
- throw new LoadEmptyFileException(tsFile.getAbsolutePath());
- }
-
- // check whether the encrypt type of the tsfile is supported
- EncryptParameter param = reader.getEncryptParam();
- if (!Objects.equals(param.getType(), EncryptUtils.encryptParam.getType())
- || !Arrays.equals(param.getKey(),
EncryptUtils.encryptParam.getKey())) {
- throw new SemanticException("The encryption way of the TsFile is not
supported.");
- }
-
- // check whether the tsfile is tree-model or not
- // TODO: currently, loading a file with both tree-model and table-model
data is not supported.
- // May need to support this and remove this check in the future.
- Map<String, TableSchema> tableSchemaMap = reader.getTableSchemaMap();
- if (Objects.nonNull(tableSchemaMap) && !tableSchemaMap.isEmpty()) {
- throw new SemanticException("Attempted to load a table-model TsFile
into tree-model.");
- }
-
- // construct tsfile resource
- final TsFileResource tsFileResource = constructTsFileResource(reader,
tsFile);
-
-
schemaAutoCreatorAndVerifier.setCurrentModificationsAndTimeIndex(tsFileResource,
reader);
-
- long writePointCount = 0;
-
- final boolean isAutoCreateSchemaOrVerifySchemaEnabled =
-
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled() ||
isVerifySchema();
- while (timeseriesMetadataIterator.hasNext()) {
- final Map<IDeviceID, List<TimeseriesMetadata>>
device2TimeseriesMetadata =
- timeseriesMetadataIterator.next();
-
- if (isAutoCreateSchemaOrVerifySchemaEnabled) {
- schemaAutoCreatorAndVerifier.autoCreateAndVerify(reader,
device2TimeseriesMetadata);
- }
-
- if (!tsFileResource.resourceFileExists()) {
- TsFileResourceUtils.updateTsFileResource(device2TimeseriesMetadata,
tsFileResource);
- }
-
- // TODO: how to get the correct write point count when
- // !isAutoCreateSchemaOrVerifySchemaEnabled
- writePointCount += getWritePointCount(device2TimeseriesMetadata);
- }
- if (isAutoCreateSchemaOrVerifySchemaEnabled) {
-
schemaAutoCreatorAndVerifier.flushAndClearDeviceIsAlignedCacheIfNecessary();
- }
-
-
TimestampPrecisionUtils.checkTimestampPrecision(tsFileResource.getFileEndTime());
- tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
-
- addTsFileResource(tsFileResource);
- addWritePointCount(writePointCount);
- } catch (final LoadEmptyFileException loadEmptyFileException) {
- LOGGER.warn("Failed to load empty file: {}", tsFile.getAbsolutePath());
- if (isDeleteAfterLoad) {
- FileUtils.deleteQuietly(tsFile);
- }
- }
- }
-
- @Override
- public void close() throws Exception {
- schemaAutoCreatorAndVerifier.close();
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
index 25604976c59..1929c23ee00 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
@@ -88,10 +88,10 @@ public class TreeSchemaAutoCreatorAndVerifier {
private static final IClientManager<ConfigRegionId, ConfigNodeClient>
CONFIG_NODE_CLIENT_MANAGER =
ConfigNodeClientManager.getInstance();
- private final LoadTsFileToTreeModelAnalyzer loadTsFileAnalyzer;
+ private final LoadTsFileAnalyzer loadTsFileAnalyzer;
private final LoadTsFileTreeSchemaCache schemaCache;
- TreeSchemaAutoCreatorAndVerifier(LoadTsFileToTreeModelAnalyzer
loadTsFileAnalyzer)
+ TreeSchemaAutoCreatorAndVerifier(LoadTsFileAnalyzer loadTsFileAnalyzer)
throws LoadRuntimeOutOfMemoryException {
this.loadTsFileAnalyzer = loadTsFileAnalyzer;
this.schemaCache = new LoadTsFileTreeSchemaCache();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 1c61bda425c..d3544229303 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -87,7 +87,9 @@ class ClusterSchemaFetchExecutor {
return coordinator.executeForTreeModel(
statement,
queryId,
- context == null ? null : context.getSession(),
+ context == null
+ ? null
+ :
SessionManager.getInstance().copySessionInfoForTreeModel(context.getSession()),
sql,
ClusterPartitionFetcher.getInstance(),
schemaFetcher,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index 3e654150252..376f582eedd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -87,7 +87,6 @@ import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement
import
org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
import org.apache.iotdb.db.schemaengine.template.Template;
-import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
@@ -517,8 +516,7 @@ public class LogicalPlanVisitor extends
StatementVisitor<PlanNode, MPPQueryConte
final LoadTsFileStatement loadTsFileStatement, final MPPQueryContext
context) {
final List<Boolean> isTableModel = new ArrayList<>();
for (int i = 0; i < loadTsFileStatement.getResources().size(); i++) {
- isTableModel.add(
-
loadTsFileStatement.getModel().equals(LoadTsFileConfigurator.MODEL_TABLE_VALUE));
+ isTableModel.add(loadTsFileStatement.getIsTableModel().get(i));
}
return new LoadTsFileNode(
context.getQueryId().genPlanNodeId(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
index d4b34dd1eaa..edf7f9980c0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
@@ -30,8 +30,6 @@ import
org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector;
import org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import org.apache.iotdb.db.queryengine.plan.analyze.load.LoadTsFileAnalyzer;
-import
org.apache.iotdb.db.queryengine.plan.analyze.load.LoadTsFileToTableModelAnalyzer;
-import
org.apache.iotdb.db.queryengine.plan.analyze.load.LoadTsFileToTreeModelAnalyzer;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
@@ -145,7 +143,6 @@ import
org.apache.iotdb.db.queryengine.plan.statement.component.FillPolicy;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.schemaengine.table.InformationSchemaUtils;
-import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator;
import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -190,7 +187,6 @@ import static java.util.Objects.requireNonNull;
import static
org.apache.iotdb.commons.schema.table.TsTable.TABLE_ALLOWED_PROPERTIES;
import static
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction.DATE_BIN;
import static
org.apache.iotdb.db.queryengine.execution.warnings.StandardWarningCode.REDUNDANT_ORDER_BY;
-import static
org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.DATABASE_NOT_SPECIFIED;
import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.AggregationAnalyzer.verifyOrderByAggregations;
import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.AggregationAnalyzer.verifySourceAggregations;
import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.CanonicalizationAware.canonicalizationAwareKey;
@@ -587,7 +583,8 @@ public class StatementAnalyzer {
queryContext.setQueryType(QueryType.WRITE);
final long startTime = System.nanoTime();
- try (final LoadTsFileAnalyzer loadTsFileAnalyzer = getAnalyzer(node)) {
+ try (final LoadTsFileAnalyzer loadTsFileAnalyzer =
+ new LoadTsFileAnalyzer(node, node.isGeneratedByPipe(),
queryContext)) {
loadTsFileAnalyzer.analyzeFileByFile(analysis);
} catch (final Exception e) {
final String exceptionMessage =
@@ -604,26 +601,6 @@ public class StatementAnalyzer {
return createAndAssignScope(node, scope);
}
- private LoadTsFileAnalyzer getAnalyzer(final LoadTsFile loadTsFile) {
- if (Objects.equals(loadTsFile.getModel(),
LoadTsFileConfigurator.MODEL_TABLE_VALUE)) {
- // Load to table-model
- if (Objects.isNull(loadTsFile.getDatabase())) {
- // If database is not specified, use the database from current
session.
- // If still not specified, throw an exception.
- if (!queryContext.getDatabaseName().isPresent()) {
- throw new SemanticException(DATABASE_NOT_SPECIFIED);
- }
- loadTsFile.setDatabase(queryContext.getDatabaseName().get());
- }
- return new LoadTsFileToTableModelAnalyzer(
- loadTsFile, loadTsFile.isGeneratedByPipe(), metadata,
queryContext);
- } else {
- // Load to tree-model
- return new LoadTsFileToTreeModelAnalyzer(
- loadTsFile, loadTsFile.isGeneratedByPipe(), queryContext);
- }
- }
-
@Override
protected Scope visitExplain(Explain node, Optional<Scope> context) {
analysis.setFinishQueryAfterAnalyze();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index cec65c2d66a..aa346416bba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -82,7 +82,6 @@ import
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
-import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -743,7 +742,7 @@ public class RelationPlanner extends
AstVisitor<RelationPlan, Void> {
protected RelationPlan visitLoadTsFile(final LoadTsFile node, final Void
context) {
final List<Boolean> isTableModel = new ArrayList<>();
for (int i = 0; i < node.getResources().size(); i++) {
-
isTableModel.add(node.getModel().equals(LoadTsFileConfigurator.MODEL_TABLE_VALUE));
+ isTableModel.add(node.getIsTableModel().get(i));
}
return new RelationPlan(
new LoadTsFileNode(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
index 4d7312a31b5..7414f1bee2b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java
@@ -46,11 +46,11 @@ public class LoadTsFile extends Statement {
private boolean autoCreateDatabase = true;
private boolean verify;
private boolean isGeneratedByPipe = false;
- private String model = LoadTsFileConfigurator.MODEL_TABLE_VALUE;
private final Map<String, String> loadAttributes;
private final List<File> tsFiles;
+ private List<Boolean> isTableModel;
private final List<TsFileResource> resources;
private final List<Long> writePointCountList;
@@ -73,6 +73,7 @@ public class LoadTsFile extends Statement {
this.tsFiles =
org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement.processTsFile(
file);
+ this.isTableModel = new
ArrayList<>(Collections.nCopies(this.tsFiles.size(), true));
} catch (FileNotFoundException e) {
throw new SemanticException(e);
}
@@ -127,8 +128,12 @@ public class LoadTsFile extends Statement {
return isGeneratedByPipe;
}
- public String getModel() {
- return model;
+ public List<Boolean> getIsTableModel() {
+ return isTableModel;
+ }
+
+ public void setIsTableModel(List<Boolean> isTableModel) {
+ this.isTableModel = isTableModel;
}
public List<File> getTsFiles() {
@@ -158,9 +163,6 @@ public class LoadTsFile extends Statement {
this.convertOnTypeMismatch =
LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes);
this.verify =
LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes);
- this.model =
- LoadTsFileConfigurator.parseOrGetDefaultModel(
- loadAttributes, LoadTsFileConfigurator.MODEL_TABLE_VALUE);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index a28e7d03779..0c9c0eaaf48 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -46,7 +46,6 @@ import java.util.Map;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_LEVEL_KEY;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_NAME_KEY;
-import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.MODEL_KEY;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_DELETE_VALUE;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_KEY;
import static
org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_NONE_VALUE;
@@ -61,11 +60,11 @@ public class LoadTsFileStatement extends Statement {
private boolean convertOnTypeMismatch = true;
private boolean autoCreateDatabase = true;
private boolean isGeneratedByPipe = false;
- private String model = LoadTsFileConfigurator.MODEL_TREE_VALUE;
private Map<String, String> loadAttributes;
private final List<File> tsFiles;
+ private List<Boolean> isTableModel;
private final List<TsFileResource> resources;
private final List<Long> writePointCountList;
@@ -81,6 +80,7 @@ public class LoadTsFileStatement extends Statement {
this.statementType = StatementType.MULTI_BATCH_INSERT;
this.tsFiles = processTsFile(file);
+ this.isTableModel = new
ArrayList<>(Collections.nCopies(this.tsFiles.size(), false));
}
public static List<File> processTsFile(final File file) throws
FileNotFoundException {
@@ -191,12 +191,12 @@ public class LoadTsFileStatement extends Statement {
return autoCreateDatabase;
}
- public void setModel(String model) {
- this.model = model;
+ public List<Boolean> getIsTableModel() {
+ return isTableModel;
}
- public String getModel() {
- return model;
+ public void setIsTableModel(List<Boolean> isTableModel) {
+ this.isTableModel = isTableModel;
}
public void markIsGeneratedByPipe() {
@@ -238,9 +238,6 @@ public class LoadTsFileStatement extends Statement {
this.deleteAfterLoad =
LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
this.convertOnTypeMismatch =
LoadTsFileConfigurator.parseOrGetDefaultConvertOnTypeMismatch(loadAttributes);
- this.model =
- LoadTsFileConfigurator.parseOrGetDefaultModel(
- loadAttributes, LoadTsFileConfigurator.MODEL_TREE_VALUE);
}
@Override
@@ -267,9 +264,6 @@ public class LoadTsFileStatement extends Statement {
loadAttributes.put(
ON_SUCCESS_KEY, deleteAfterLoad ? ON_SUCCESS_DELETE_VALUE :
ON_SUCCESS_NONE_VALUE);
loadAttributes.put(CONVERT_ON_TYPE_MISMATCH_KEY,
String.valueOf(convertOnTypeMismatch));
- if (model != null) {
- loadAttributes.put(MODEL_KEY, model);
- }
return new LoadTsFile(null, file.getAbsolutePath(), loadAttributes);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
index b1736f0f1af..9a5eef5ba0a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java
@@ -44,9 +44,6 @@ public class LoadTsFileConfigurator {
case ON_SUCCESS_KEY:
validateOnSuccessParam(value);
break;
- case MODEL_KEY:
- validateModelParam(value);
- break;
case DATABASE_NAME_KEY:
break;
case CONVERT_ON_TYPE_MISMATCH_KEY:
@@ -153,27 +150,6 @@ public class LoadTsFileConfigurator {
loadAttributes.getOrDefault(VERIFY_KEY,
String.valueOf(VERIFY_DEFAULT_VALUE)));
}
- public static final String MODEL_KEY = "model";
- public static final String MODEL_TREE_VALUE = "tree";
- public static final String MODEL_TABLE_VALUE = "table";
- public static final Set<String> MODEL_VALUE_SET =
- Collections.unmodifiableSet(
- new HashSet<>(Arrays.asList(MODEL_TREE_VALUE, MODEL_TABLE_VALUE)));
-
- public static void validateModelParam(final String model) {
- if (!MODEL_VALUE_SET.contains(model)) {
- throw new SemanticException(
- String.format(
- "Given %s value '%s' is not supported, please input a valid
value.",
- MODEL_KEY, model));
- }
- }
-
- public static @Nullable String parseOrGetDefaultModel(
- final Map<String, String> loadAttributes, final String defaultModel) {
- return loadAttributes.getOrDefault(MODEL_KEY, defaultModel);
- }
-
private LoadTsFileConfigurator() {
throw new IllegalStateException("Utility class");
}