This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ce8ffe5068d Try Opt Agg query in table model & correct the
DataPartition Fetch & remove sync in validateSchema
ce8ffe5068d is described below
commit ce8ffe5068d24778b1dc4bf2faa824943a8d2d76
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Oct 29 16:37:27 2024 +0800
Try Opt Agg query in table model & correct the DataPartition Fetch & remove
sync in validateSchema
---
.../execution/operator/source/FileLoaderUtils.java | 4 +--
.../execution/operator/source/SeriesScanUtil.java | 19 ++++++++------
.../TableAggregationTableScanOperator.java | 6 ++++-
.../source/relational/TableScanOperator.java | 12 ++++++---
.../db/queryengine/plan/analyze/AnalyzeUtils.java | 30 +++++++++++++++++++---
.../load/LoadTsFileToTableModelAnalyzer.java | 18 +++++--------
.../plan/planner/TableOperatorGenerator.java | 20 ++++++++++++---
.../planner/plan/parameter/SeriesScanOptions.java | 11 +++++++-
.../plan/relational/sql/ast/InsertRows.java | 19 ++++++--------
.../relational/sql/ast/WrappedInsertStatement.java | 11 +++-----
.../db/storageengine/dataregion/DataRegion.java | 2 +-
.../dataregion/read/QueryDataSource.java | 23 ++++++++++++++---
.../plan/relational/analyzer/AnalyzerTest.java | 2 +-
.../apache/iotdb/commons/path/AlignedFullPath.java | 28 ++++++++++++++++++++
14 files changed, 148 insertions(+), 57 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java
index e4f4459f41d..1941f3c739f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java
@@ -51,7 +51,6 @@ import org.apache.tsfile.read.reader.IPageReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -264,8 +263,7 @@ public class FileLoaderUtils {
// the order of timeSeriesMetadata list is same as subSensorList's order
TimeSeriesMetadataCache cache = TimeSeriesMetadataCache.getInstance();
List<String> valueMeasurementList = alignedPath.getMeasurementList();
- Set<String> allSensors = new HashSet<>(valueMeasurementList);
- allSensors.add("");
+ Set<String> allSensors = alignedPath.getAllSensors();
boolean isDebug = context.isDebug();
String filePath = resource.getTsFilePath();
IDeviceID deviceId = alignedPath.getDeviceId();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index 1389666f0b2..15fe07161a6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -183,15 +183,18 @@ public class SeriesScanUtil implements Accountable {
// differentiate the data of tree model and table model.
if (context.isIgnoreAllNullRows()) {
ttl = DataNodeTTLCache.getInstance().getTTLForTree(deviceID);
+ scanOptions.setTTL(ttl);
} else {
- String databaseName = dataSource.getDatabaseName();
- ttl =
- databaseName == null
- ? Long.MAX_VALUE
- : DataNodeTTLCache.getInstance()
- .getTTLForTable(databaseName, deviceID.getTableName());
- }
- scanOptions.setTTL(ttl);
+ if (scanOptions.timeFilterNeedUpdatedByTll()) {
+ String databaseName = dataSource.getDatabaseName();
+ ttl =
+ databaseName == null
+ ? Long.MAX_VALUE
+ : DataNodeTTLCache.getInstance()
+ .getTTLForTable(databaseName, deviceID.getTableName());
+ scanOptions.setTTL(ttl);
+ }
+ }
// init file index
orderUtils.setCurSeqFileIndex(dataSource);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
index e5b3e2b1b3f..96678c0752a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
@@ -59,6 +59,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -92,6 +93,7 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
private final SeriesScanOptions seriesScanOptions;
private final List<String> measurementColumnNames;
+ private final Set<String> allSensors;
private final List<IMeasurementSchema> measurementSchemas;
@@ -120,6 +122,7 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
Ordering scanOrder,
SeriesScanOptions seriesScanOptions,
List<String> measurementColumnNames,
+ Set<String> allSensors,
List<IMeasurementSchema> measurementSchemas,
int maxTsBlockLineNum,
int measurementCount,
@@ -158,6 +161,7 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
this.scanOrder = scanOrder;
this.seriesScanOptions = seriesScanOptions;
this.measurementColumnNames = measurementColumnNames;
+ this.allSensors = allSensors;
this.measurementSchemas = measurementSchemas;
this.measurementColumnTSDataTypes =
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
@@ -266,7 +270,7 @@ public class TableAggregationTableScanOperator extends
AbstractSeriesAggregation
}
AlignedFullPath alignedPath =
- constructAlignedPath(deviceEntry, measurementColumnNames,
measurementSchemas);
+ constructAlignedPath(deviceEntry, measurementColumnNames,
measurementSchemas, allSensors);
this.seriesScanUtil =
new AlignedSeriesScanUtil(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
index e282f1b3e63..dc04af63a96 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
@@ -49,6 +49,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -75,6 +76,8 @@ public class TableScanOperator extends
AbstractSeriesScanOperator {
private final List<String> measurementColumnNames;
+ private final Set<String> allSensors;
+
private final List<IMeasurementSchema> measurementSchemas;
private final List<TSDataType> measurementColumnTSDataTypes;
@@ -98,6 +101,7 @@ public class TableScanOperator extends
AbstractSeriesScanOperator {
Ordering scanOrder,
SeriesScanOptions seriesScanOptions,
List<String> measurementColumnNames,
+ Set<String> allSensors,
List<IMeasurementSchema> measurementSchemas,
int maxTsBlockLineNum) {
this.sourceId = sourceId;
@@ -109,6 +113,7 @@ public class TableScanOperator extends
AbstractSeriesScanOperator {
this.scanOrder = scanOrder;
this.seriesScanOptions = seriesScanOptions;
this.measurementColumnNames = measurementColumnNames;
+ this.allSensors = allSensors;
this.measurementSchemas = measurementSchemas;
this.measurementColumnTSDataTypes =
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
@@ -294,7 +299,7 @@ public class TableScanOperator extends
AbstractSeriesScanOperator {
private AlignedSeriesScanUtil constructAlignedSeriesScanUtil(DeviceEntry
deviceEntry) {
AlignedFullPath alignedPath =
- constructAlignedPath(deviceEntry, measurementColumnNames,
measurementSchemas);
+ constructAlignedPath(deviceEntry, measurementColumnNames,
measurementSchemas, allSensors);
return new AlignedSeriesScanUtil(
alignedPath,
@@ -308,9 +313,10 @@ public class TableScanOperator extends
AbstractSeriesScanOperator {
public static AlignedFullPath constructAlignedPath(
DeviceEntry deviceEntry,
List<String> measurementColumnNames,
- List<IMeasurementSchema> measurementSchemas) {
+ List<IMeasurementSchema> measurementSchemas,
+ Set<String> allSensors) {
return new AlignedFullPath(
- deviceEntry.getDeviceID(), measurementColumnNames, measurementSchemas);
+ deviceEntry.getDeviceID(), measurementColumnNames, measurementSchemas,
allSensors);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
index 9d6556b501f..bd024dd1434 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
@@ -46,6 +46,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR;
+
public class AnalyzeUtils {
private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS
=
@@ -93,6 +96,17 @@ public class AnalyzeUtils {
return null;
}
+ public static String getDatabaseNameForTableWithRoot(
+ InsertBaseStatement statement, MPPQueryContext context) {
+ if (statement.getDatabaseName().isPresent()) {
+ return PATH_ROOT + PATH_SEPARATOR + statement.getDatabaseName().get();
+ }
+ if (context != null && context.getDatabaseName().isPresent()) {
+ return PATH_ROOT + PATH_SEPARATOR + context.getDatabaseName().get();
+ }
+ return null;
+ }
+
public static List<DataPartitionQueryParam> computeTableDataPartitionParams(
InsertBaseStatement statement, MPPQueryContext context) {
if (statement instanceof InsertTabletStatement) {
@@ -103,7 +117,8 @@ public class AnalyzeUtils {
.computeIfAbsent(insertTabletStatement.getTableDeviceID(i), id ->
new HashSet<>())
.add(insertTabletStatement.getTimePartitionSlot(i));
}
- return computeDataPartitionParams(timePartitionSlotMap,
getDatabaseName(statement, context));
+ return computeDataPartitionParams(
+ timePartitionSlotMap, getDatabaseNameForTableWithRoot(statement,
context));
} else if (statement instanceof InsertMultiTabletsStatement) {
InsertMultiTabletsStatement insertMultiTabletsStatement =
(InsertMultiTabletsStatement) statement;
@@ -116,14 +131,15 @@ public class AnalyzeUtils {
.add(insertTabletStatement.getTimePartitionSlot(i));
}
}
- return computeDataPartitionParams(timePartitionSlotMap,
getDatabaseName(statement, context));
+ return computeDataPartitionParams(
+ timePartitionSlotMap, getDatabaseNameForTableWithRoot(statement,
context));
} else if (statement instanceof InsertRowStatement) {
InsertRowStatement insertRowStatement = (InsertRowStatement) statement;
return computeDataPartitionParams(
Collections.singletonMap(
insertRowStatement.getTableDeviceID(),
Collections.singleton(insertRowStatement.getTimePartitionSlot())),
- getDatabaseName(statement, context));
+ getDatabaseNameForTableWithRoot(statement, context));
} else if (statement instanceof InsertRowsStatement) {
InsertRowsStatement insertRowsStatement = (InsertRowsStatement)
statement;
Map<IDeviceID, Set<TTimePartitionSlot>> timePartitionSlotMap = new
HashMap<>();
@@ -133,7 +149,8 @@ public class AnalyzeUtils {
.computeIfAbsent(insertRowStatement.getTableDeviceID(), id -> new
HashSet<>())
.add(insertRowStatement.getTimePartitionSlot());
}
- return computeDataPartitionParams(timePartitionSlotMap,
getDatabaseName(statement, context));
+ return computeDataPartitionParams(
+ timePartitionSlotMap, getDatabaseNameForTableWithRoot(statement,
context));
}
throw new UnsupportedOperationException("computeDataPartitionParams for "
+ statement);
}
@@ -184,6 +201,11 @@ public class AnalyzeUtils {
return dataPartitionQueryParam;
}
+ /**
+ * @param dataPartitionQueryParamMap IDeviceID's first segment should be
tableName without
+ * databaseName.
+ * @param databaseName must start with root.
+ */
public static List<DataPartitionQueryParam> computeDataPartitionParams(
Map<IDeviceID, Set<TTimePartitionSlot>> dataPartitionQueryParamMap,
String databaseName) {
List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java
index 7b50770a7f9..f74c6e08515 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java
@@ -149,17 +149,13 @@ public class LoadTsFileToTableModelAnalyzer extends
LoadTsFileAnalyzer {
reader.readFileMetadata().getTableSchemaMap().entrySet()) {
final TableSchema fileSchema =
TableSchema.fromTsFileTableSchema(name2Schema.getKey(),
name2Schema.getValue());
- final TableSchema realSchema;
- // TODO: remove this synchronized block after the metadata is
thread-safe
- synchronized (metadata) {
- realSchema =
- metadata.validateTableHeaderSchema(database, fileSchema,
context, true).orElse(null);
- if (Objects.isNull(realSchema)) {
- throw new VerifyMetadataException(
- String.format(
- "Failed to validate schema for table {%s, %s}",
- name2Schema.getKey(), name2Schema.getValue()));
- }
+ final TableSchema realSchema =
+ metadata.validateTableHeaderSchema(database, fileSchema, context,
true).orElse(null);
+ if (Objects.isNull(realSchema)) {
+ throw new VerifyMetadataException(
+ String.format(
+ "Failed to validate schema for table {%s, %s}",
+ name2Schema.getKey(), name2Schema.getValue()));
}
tableIdColumnMapper.clear();
verifyTableDataTypeAndGenerateIdColumnMapper(fileSchema, realSchema);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 56d6868a24b..1e4e4c7410d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -382,6 +382,9 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
context.getTypeProvider().getTemplatedInfo().getLimitValue(),
maxTsBlockLineNum);
}
+ Set<String> allSensors = new HashSet<>(measurementColumnNames);
+ // for time column
+ allSensors.add("");
TableScanOperator tableScanOperator =
new TableScanOperator(
operatorContext,
@@ -392,6 +395,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
node.getScanOrder(),
scanOptionsBuilder.build(),
measurementColumnNames,
+ allSensors,
measurementSchemas,
maxTsBlockLineNum);
@@ -400,7 +404,10 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
for (int i = 0, size = node.getDeviceEntries().size(); i < size; i++) {
AlignedFullPath alignedPath =
constructAlignedPath(
- node.getDeviceEntries().get(i), measurementColumnNames,
measurementSchemas);
+ node.getDeviceEntries().get(i),
+ measurementColumnNames,
+ measurementSchemas,
+ allSensors);
((DataDriverContext) context.getDriverContext()).addPath(alignedPath);
}
@@ -1427,7 +1434,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
groupByChannels,
aggregatorBuilder.build(),
node.getStep(),
- 10_000,
+ 64,
Long.MAX_VALUE,
false,
Long.MAX_VALUE);
@@ -1632,6 +1639,9 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
convertPredicateToFilter(pushDownPredicate, measurementColumnNames,
columnSchemaMap));
}
+ Set<String> allSensors = new HashSet<>(measurementColumnNames);
+ // for time column
+ allSensors.add("");
TableAggregationTableScanOperator aggTableScanOperator =
new TableAggregationTableScanOperator(
node.getPlanNodeId(),
@@ -1642,6 +1652,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
scanAscending ? Ordering.ASC : Ordering.DESC,
scanOptionsBuilder.build(),
measurementColumnNames,
+ allSensors,
measurementSchemas,
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(),
measurementColumnCount,
@@ -1659,7 +1670,10 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
for (int i = 0, size = node.getDeviceEntries().size(); i < size; i++) {
AlignedFullPath alignedPath =
constructAlignedPath(
- node.getDeviceEntries().get(i), measurementColumnNames,
measurementSchemas);
+ node.getDeviceEntries().get(i),
+ measurementColumnNames,
+ measurementSchemas,
+ allSensors);
((DataDriverContext) context.getDriverContext()).addPath(alignedPath);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
index dea62b9cf05..25853c9c547 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
@@ -32,11 +32,14 @@ import
org.apache.tsfile.read.reader.series.PaginationController;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
public class SeriesScanOptions {
private Filter globalTimeFilter;
+ private final AtomicBoolean timeFilterUpdatedByTll = new
AtomicBoolean(false);
+
private final Filter pushDownFilter;
private final long pushDownLimit;
@@ -97,8 +100,14 @@ public class SeriesScanOptions {
}
}
+ public boolean timeFilterNeedUpdatedByTll() {
+ return !timeFilterUpdatedByTll.get();
+ }
+
public void setTTL(long dataTTL) {
- this.globalTimeFilter = updateFilterUsingTTL(globalTimeFilter, dataTTL);
+ if (timeFilterUpdatedByTll.compareAndSet(false, true)) {
+ this.globalTimeFilter = updateFilterUsingTTL(globalTimeFilter, dataTTL);
+ }
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java
index 75c228fb0bc..f88457cdfea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java
@@ -91,17 +91,14 @@ public class InsertRows extends WrappedInsertStatement {
for (InsertRowStatement insertRowStatement :
getInnerTreeStatement().getInsertRowStatementList()) {
final TableSchema incomingTableSchema =
toTableSchema(insertRowStatement);
- final TableSchema realSchema;
- synchronized (metadata) {
- realSchema =
- metadata
- .validateTableHeaderSchema(
- AnalyzeUtils.getDatabaseName(insertRowStatement, context),
- incomingTableSchema,
- context,
- allowCreateTable)
- .orElse(null);
- }
+ final TableSchema realSchema =
+ metadata
+ .validateTableHeaderSchema(
+ AnalyzeUtils.getDatabaseName(insertRowStatement, context),
+ incomingTableSchema,
+ context,
+ allowCreateTable)
+ .orElse(null);
if (realSchema == null) {
throw new SemanticException(
"Schema validation failed, table cannot be created: " +
incomingTableSchema);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
index b50cae15f84..b951c02cc49 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
@@ -101,13 +101,10 @@ public abstract class WrappedInsertStatement extends
WrappedStatement
public void validateTableSchema(Metadata metadata, MPPQueryContext context) {
String databaseName = getDatabase();
final TableSchema incomingSchema = getTableSchema();
- final TableSchema realSchema;
- synchronized (metadata) {
- realSchema =
- metadata
- .validateTableHeaderSchema(databaseName, incomingSchema,
context, true)
- .orElse(null);
- }
+ final TableSchema realSchema =
+ metadata
+ .validateTableHeaderSchema(databaseName, incomingSchema, context,
true)
+ .orElse(null);
if (realSchema == null) {
throw new SemanticException(
"Schema validation failed, table cannot be created: " +
incomingSchema);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 6b91addc3e1..c73d93de5da 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2028,7 +2028,7 @@ public class DataRegion implements IDataRegionForQuery {
QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE,
seqResources.size());
QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(UNSEQUENCE_TSFILE,
unseqResources.size());
- return new QueryDataSource(seqResources, unseqResources);
+ return new QueryDataSource(seqResources, unseqResources, databaseName);
} catch (MetadataException e) {
throw new QueryProcessException(e);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
index dc56ad11a12..704bdeb6902 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
@@ -64,6 +64,8 @@ public class QueryDataSource implements IQueryDataSource {
/* The traversal order of unseqResources (different for each device) */
private int[] unSeqFileOrderIndex;
+ private String databaseName = null;
+
private static final Comparator<Long> descendingComparator = (o1, o2) ->
Long.compare(o2, o1);
public QueryDataSource(List<TsFileResource> seqResources,
List<TsFileResource> unseqResources) {
@@ -71,12 +73,20 @@ public class QueryDataSource implements IQueryDataSource {
this.unseqResources = unseqResources;
}
+ public QueryDataSource(
+ List<TsFileResource> seqResources, List<TsFileResource> unseqResources,
String databaseName) {
+ this.seqResources = seqResources;
+ this.unseqResources = unseqResources;
+ this.databaseName = databaseName;
+ }
+
// used for compaction, because in compaction task(unlike query, each
QueryDataSource only serve
// for one series), we will reuse this object for multi series
public QueryDataSource(QueryDataSource other) {
this.seqResources = other.seqResources;
this.unseqResources = other.unseqResources;
this.unSeqFileOrderIndex = other.unSeqFileOrderIndex;
+ this.databaseName = other.databaseName;
}
public List<TsFileResource> getSeqResources() {
@@ -89,7 +99,8 @@ public class QueryDataSource implements IQueryDataSource {
@Override
public IQueryDataSource clone() {
- QueryDataSource queryDataSource = new QueryDataSource(getSeqResources(),
getUnseqResources());
+ QueryDataSource queryDataSource =
+ new QueryDataSource(getSeqResources(), getUnseqResources(),
databaseName);
queryDataSource.setSingleDevice(isSingleDevice());
return queryDataSource;
}
@@ -189,6 +200,9 @@ public class QueryDataSource implements IQueryDataSource {
}
public void fillOrderIndexes(IDeviceID deviceId, boolean ascending) {
+ if (unseqResources == null || unseqResources.isEmpty()) {
+ return;
+ }
TreeMap<Long, List<Integer>> orderTimeToIndexMap =
ascending ? new TreeMap<>() : new TreeMap<>(descendingComparator);
int index = 0;
@@ -226,7 +240,10 @@ public class QueryDataSource implements IQueryDataSource {
}
public String getDatabaseName() {
- List<TsFileResource> resources = !seqResources.isEmpty() ? seqResources :
unseqResources;
- return resources.isEmpty() ? null : resources.get(0).getDatabaseName();
+ if (databaseName == null) {
+ List<TsFileResource> resources = !seqResources.isEmpty() ? seqResources
: unseqResources;
+ databaseName = resources.isEmpty() ? null :
resources.get(0).getDatabaseName();
+ }
+ return databaseName;
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
index 433ebe797a4..89fa4848b0c 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
@@ -1005,7 +1005,7 @@ public class AnalyzerTest {
for (DataPartitionQueryParam dataPartitionQueryParam :
dataPartitionQueryParams) {
String databaseName = dataPartitionQueryParam.getDatabaseName();
- assertEquals(sessionInfo.getDatabaseName().get(), databaseName);
+ assertEquals("root." + sessionInfo.getDatabaseName().get(),
databaseName);
databaseName = PathUtils.qualifyDatabaseName(databaseName);
String tableName =
dataPartitionQueryParam.getDeviceID().getTableName();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedFullPath.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedFullPath.java
index 0ed64897438..581e448c253 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedFullPath.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedFullPath.java
@@ -24,8 +24,12 @@ import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.schema.IMeasurementSchema;
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
public class AlignedFullPath implements IFullPath {
@@ -38,12 +42,25 @@ public class AlignedFullPath implements IFullPath {
private final List<String> measurementList;
private final List<IMeasurementSchema> schemaList;
+ @Nullable private final Set<String> allSensors;
public AlignedFullPath(
IDeviceID deviceID, List<String> measurementList,
List<IMeasurementSchema> schemaList) {
this.deviceID = deviceID;
this.measurementList = measurementList;
this.schemaList = schemaList;
+ this.allSensors = null;
+ }
+
+ public AlignedFullPath(
+ IDeviceID deviceID,
+ List<String> measurementList,
+ List<IMeasurementSchema> schemaList,
+ Set<String> allSensors) {
+ this.deviceID = deviceID;
+ this.measurementList = measurementList;
+ this.schemaList = schemaList;
+ this.allSensors = allSensors;
}
@Override
@@ -68,6 +85,17 @@ public class AlignedFullPath implements IFullPath {
return measurementList.size();
}
+ public Set<String> getAllSensors() {
+ if (allSensors != null) {
+ return allSensors;
+ } else {
+ Set<String> res = new HashSet<>(measurementList);
+ // for time column
+ res.add("");
+ return res;
+ }
+ }
+
@Override
public long ramBytesUsed() {
return INSTANCE_SIZE