This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ce8ffe5068d Try Opt Agg query in table model & correct the 
DataPartition Fetch & remove sync in validateSchema
ce8ffe5068d is described below

commit ce8ffe5068d24778b1dc4bf2faa824943a8d2d76
Author: Jackie Tien <[email protected]>
AuthorDate: Tue Oct 29 16:37:27 2024 +0800

    Try Opt Agg query in table model & correct the DataPartition Fetch & remove 
sync in validateSchema
---
 .../execution/operator/source/FileLoaderUtils.java |  4 +--
 .../execution/operator/source/SeriesScanUtil.java  | 19 ++++++++------
 .../TableAggregationTableScanOperator.java         |  6 ++++-
 .../source/relational/TableScanOperator.java       | 12 ++++++---
 .../db/queryengine/plan/analyze/AnalyzeUtils.java  | 30 +++++++++++++++++++---
 .../load/LoadTsFileToTableModelAnalyzer.java       | 18 +++++--------
 .../plan/planner/TableOperatorGenerator.java       | 20 ++++++++++++---
 .../planner/plan/parameter/SeriesScanOptions.java  | 11 +++++++-
 .../plan/relational/sql/ast/InsertRows.java        | 19 ++++++--------
 .../relational/sql/ast/WrappedInsertStatement.java | 11 +++-----
 .../db/storageengine/dataregion/DataRegion.java    |  2 +-
 .../dataregion/read/QueryDataSource.java           | 23 ++++++++++++++---
 .../plan/relational/analyzer/AnalyzerTest.java     |  2 +-
 .../apache/iotdb/commons/path/AlignedFullPath.java | 28 ++++++++++++++++++++
 14 files changed, 148 insertions(+), 57 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java
index e4f4459f41d..1941f3c739f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java
@@ -51,7 +51,6 @@ import org.apache.tsfile.read.reader.IPageReader;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -264,8 +263,7 @@ public class FileLoaderUtils {
     // the order of timeSeriesMetadata list is same as subSensorList's order
     TimeSeriesMetadataCache cache = TimeSeriesMetadataCache.getInstance();
     List<String> valueMeasurementList = alignedPath.getMeasurementList();
-    Set<String> allSensors = new HashSet<>(valueMeasurementList);
-    allSensors.add("");
+    Set<String> allSensors = alignedPath.getAllSensors();
     boolean isDebug = context.isDebug();
     String filePath = resource.getTsFilePath();
     IDeviceID deviceId = alignedPath.getDeviceId();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index 1389666f0b2..15fe07161a6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -183,15 +183,18 @@ public class SeriesScanUtil implements Accountable {
     // differentiate the data of tree model and table model.
     if (context.isIgnoreAllNullRows()) {
       ttl = DataNodeTTLCache.getInstance().getTTLForTree(deviceID);
+      scanOptions.setTTL(ttl);
     } else {
-      String databaseName = dataSource.getDatabaseName();
-      ttl =
-          databaseName == null
-              ? Long.MAX_VALUE
-              : DataNodeTTLCache.getInstance()
-                  .getTTLForTable(databaseName, deviceID.getTableName());
-    }
-    scanOptions.setTTL(ttl);
+      if (scanOptions.timeFilterNeedUpdatedByTll()) {
+        String databaseName = dataSource.getDatabaseName();
+        ttl =
+            databaseName == null
+                ? Long.MAX_VALUE
+                : DataNodeTTLCache.getInstance()
+                    .getTTLForTable(databaseName, deviceID.getTableName());
+        scanOptions.setTTL(ttl);
+      }
+    }
 
     // init file index
     orderUtils.setCurSeqFileIndex(dataSource);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
index e5b3e2b1b3f..96678c0752a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java
@@ -59,6 +59,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -92,6 +93,7 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
   private final SeriesScanOptions seriesScanOptions;
 
   private final List<String> measurementColumnNames;
+  private final Set<String> allSensors;
 
   private final List<IMeasurementSchema> measurementSchemas;
 
@@ -120,6 +122,7 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
       Ordering scanOrder,
       SeriesScanOptions seriesScanOptions,
       List<String> measurementColumnNames,
+      Set<String> allSensors,
       List<IMeasurementSchema> measurementSchemas,
       int maxTsBlockLineNum,
       int measurementCount,
@@ -158,6 +161,7 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
     this.scanOrder = scanOrder;
     this.seriesScanOptions = seriesScanOptions;
     this.measurementColumnNames = measurementColumnNames;
+    this.allSensors = allSensors;
     this.measurementSchemas = measurementSchemas;
     this.measurementColumnTSDataTypes =
         
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
@@ -266,7 +270,7 @@ public class TableAggregationTableScanOperator extends 
AbstractSeriesAggregation
     }
 
     AlignedFullPath alignedPath =
-        constructAlignedPath(deviceEntry, measurementColumnNames, 
measurementSchemas);
+        constructAlignedPath(deviceEntry, measurementColumnNames, 
measurementSchemas, allSensors);
 
     this.seriesScanUtil =
         new AlignedSeriesScanUtil(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
index e282f1b3e63..dc04af63a96 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
@@ -49,6 +49,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -75,6 +76,8 @@ public class TableScanOperator extends 
AbstractSeriesScanOperator {
 
   private final List<String> measurementColumnNames;
 
+  private final Set<String> allSensors;
+
   private final List<IMeasurementSchema> measurementSchemas;
 
   private final List<TSDataType> measurementColumnTSDataTypes;
@@ -98,6 +101,7 @@ public class TableScanOperator extends 
AbstractSeriesScanOperator {
       Ordering scanOrder,
       SeriesScanOptions seriesScanOptions,
       List<String> measurementColumnNames,
+      Set<String> allSensors,
       List<IMeasurementSchema> measurementSchemas,
       int maxTsBlockLineNum) {
     this.sourceId = sourceId;
@@ -109,6 +113,7 @@ public class TableScanOperator extends 
AbstractSeriesScanOperator {
     this.scanOrder = scanOrder;
     this.seriesScanOptions = seriesScanOptions;
     this.measurementColumnNames = measurementColumnNames;
+    this.allSensors = allSensors;
     this.measurementSchemas = measurementSchemas;
     this.measurementColumnTSDataTypes =
         
measurementSchemas.stream().map(IMeasurementSchema::getType).collect(Collectors.toList());
@@ -294,7 +299,7 @@ public class TableScanOperator extends 
AbstractSeriesScanOperator {
 
   private AlignedSeriesScanUtil constructAlignedSeriesScanUtil(DeviceEntry 
deviceEntry) {
     AlignedFullPath alignedPath =
-        constructAlignedPath(deviceEntry, measurementColumnNames, 
measurementSchemas);
+        constructAlignedPath(deviceEntry, measurementColumnNames, 
measurementSchemas, allSensors);
 
     return new AlignedSeriesScanUtil(
         alignedPath,
@@ -308,9 +313,10 @@ public class TableScanOperator extends 
AbstractSeriesScanOperator {
   public static AlignedFullPath constructAlignedPath(
       DeviceEntry deviceEntry,
       List<String> measurementColumnNames,
-      List<IMeasurementSchema> measurementSchemas) {
+      List<IMeasurementSchema> measurementSchemas,
+      Set<String> allSensors) {
     return new AlignedFullPath(
-        deviceEntry.getDeviceID(), measurementColumnNames, measurementSchemas);
+        deviceEntry.getDeviceID(), measurementColumnNames, measurementSchemas, 
allSensors);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
index 9d6556b501f..bd024dd1434 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
@@ -46,6 +46,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_ROOT;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR;
+
 public class AnalyzeUtils {
 
   private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS 
=
@@ -93,6 +96,17 @@ public class AnalyzeUtils {
     return null;
   }
 
+  public static String getDatabaseNameForTableWithRoot(
+      InsertBaseStatement statement, MPPQueryContext context) {
+    if (statement.getDatabaseName().isPresent()) {
+      return PATH_ROOT + PATH_SEPARATOR + statement.getDatabaseName().get();
+    }
+    if (context != null && context.getDatabaseName().isPresent()) {
+      return PATH_ROOT + PATH_SEPARATOR + context.getDatabaseName().get();
+    }
+    return null;
+  }
+
   public static List<DataPartitionQueryParam> computeTableDataPartitionParams(
       InsertBaseStatement statement, MPPQueryContext context) {
     if (statement instanceof InsertTabletStatement) {
@@ -103,7 +117,8 @@ public class AnalyzeUtils {
             .computeIfAbsent(insertTabletStatement.getTableDeviceID(i), id -> 
new HashSet<>())
             .add(insertTabletStatement.getTimePartitionSlot(i));
       }
-      return computeDataPartitionParams(timePartitionSlotMap, 
getDatabaseName(statement, context));
+      return computeDataPartitionParams(
+          timePartitionSlotMap, getDatabaseNameForTableWithRoot(statement, 
context));
     } else if (statement instanceof InsertMultiTabletsStatement) {
       InsertMultiTabletsStatement insertMultiTabletsStatement =
           (InsertMultiTabletsStatement) statement;
@@ -116,14 +131,15 @@ public class AnalyzeUtils {
               .add(insertTabletStatement.getTimePartitionSlot(i));
         }
       }
-      return computeDataPartitionParams(timePartitionSlotMap, 
getDatabaseName(statement, context));
+      return computeDataPartitionParams(
+          timePartitionSlotMap, getDatabaseNameForTableWithRoot(statement, 
context));
     } else if (statement instanceof InsertRowStatement) {
       InsertRowStatement insertRowStatement = (InsertRowStatement) statement;
       return computeDataPartitionParams(
           Collections.singletonMap(
               insertRowStatement.getTableDeviceID(),
               
Collections.singleton(insertRowStatement.getTimePartitionSlot())),
-          getDatabaseName(statement, context));
+          getDatabaseNameForTableWithRoot(statement, context));
     } else if (statement instanceof InsertRowsStatement) {
       InsertRowsStatement insertRowsStatement = (InsertRowsStatement) 
statement;
       Map<IDeviceID, Set<TTimePartitionSlot>> timePartitionSlotMap = new 
HashMap<>();
@@ -133,7 +149,8 @@ public class AnalyzeUtils {
             .computeIfAbsent(insertRowStatement.getTableDeviceID(), id -> new 
HashSet<>())
             .add(insertRowStatement.getTimePartitionSlot());
       }
-      return computeDataPartitionParams(timePartitionSlotMap, 
getDatabaseName(statement, context));
+      return computeDataPartitionParams(
+          timePartitionSlotMap, getDatabaseNameForTableWithRoot(statement, 
context));
     }
     throw new UnsupportedOperationException("computeDataPartitionParams for " 
+ statement);
   }
@@ -184,6 +201,11 @@ public class AnalyzeUtils {
     return dataPartitionQueryParam;
   }
 
+  /**
+   * @param dataPartitionQueryParamMap IDeviceID's first segment should be 
tableName without
+   *     databaseName.
+   * @param databaseName must start with root.
+   */
   public static List<DataPartitionQueryParam> computeDataPartitionParams(
       Map<IDeviceID, Set<TTimePartitionSlot>> dataPartitionQueryParamMap, 
String databaseName) {
     List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java
index 7b50770a7f9..f74c6e08515 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java
@@ -149,17 +149,13 @@ public class LoadTsFileToTableModelAnalyzer extends 
LoadTsFileAnalyzer {
           reader.readFileMetadata().getTableSchemaMap().entrySet()) {
         final TableSchema fileSchema =
             TableSchema.fromTsFileTableSchema(name2Schema.getKey(), 
name2Schema.getValue());
-        final TableSchema realSchema;
-        // TODO: remove this synchronized block after the metadata is 
thread-safe
-        synchronized (metadata) {
-          realSchema =
-              metadata.validateTableHeaderSchema(database, fileSchema, 
context, true).orElse(null);
-          if (Objects.isNull(realSchema)) {
-            throw new VerifyMetadataException(
-                String.format(
-                    "Failed to validate schema for table {%s, %s}",
-                    name2Schema.getKey(), name2Schema.getValue()));
-          }
+        final TableSchema realSchema =
+            metadata.validateTableHeaderSchema(database, fileSchema, context, 
true).orElse(null);
+        if (Objects.isNull(realSchema)) {
+          throw new VerifyMetadataException(
+              String.format(
+                  "Failed to validate schema for table {%s, %s}",
+                  name2Schema.getKey(), name2Schema.getValue()));
         }
         tableIdColumnMapper.clear();
         verifyTableDataTypeAndGenerateIdColumnMapper(fileSchema, realSchema);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 56d6868a24b..1e4e4c7410d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -382,6 +382,9 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
                   
context.getTypeProvider().getTemplatedInfo().getLimitValue(), 
maxTsBlockLineNum);
     }
 
+    Set<String> allSensors = new HashSet<>(measurementColumnNames);
+    // for time column
+    allSensors.add("");
     TableScanOperator tableScanOperator =
         new TableScanOperator(
             operatorContext,
@@ -392,6 +395,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
             node.getScanOrder(),
             scanOptionsBuilder.build(),
             measurementColumnNames,
+            allSensors,
             measurementSchemas,
             maxTsBlockLineNum);
 
@@ -400,7 +404,10 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     for (int i = 0, size = node.getDeviceEntries().size(); i < size; i++) {
       AlignedFullPath alignedPath =
           constructAlignedPath(
-              node.getDeviceEntries().get(i), measurementColumnNames, 
measurementSchemas);
+              node.getDeviceEntries().get(i),
+              measurementColumnNames,
+              measurementSchemas,
+              allSensors);
       ((DataDriverContext) context.getDriverContext()).addPath(alignedPath);
     }
 
@@ -1427,7 +1434,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
         groupByChannels,
         aggregatorBuilder.build(),
         node.getStep(),
-        10_000,
+        64,
         Long.MAX_VALUE,
         false,
         Long.MAX_VALUE);
@@ -1632,6 +1639,9 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
           convertPredicateToFilter(pushDownPredicate, measurementColumnNames, 
columnSchemaMap));
     }
 
+    Set<String> allSensors = new HashSet<>(measurementColumnNames);
+    // for time column
+    allSensors.add("");
     TableAggregationTableScanOperator aggTableScanOperator =
         new TableAggregationTableScanOperator(
             node.getPlanNodeId(),
@@ -1642,6 +1652,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
             scanAscending ? Ordering.ASC : Ordering.DESC,
             scanOptionsBuilder.build(),
             measurementColumnNames,
+            allSensors,
             measurementSchemas,
             
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(),
             measurementColumnCount,
@@ -1659,7 +1670,10 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     for (int i = 0, size = node.getDeviceEntries().size(); i < size; i++) {
       AlignedFullPath alignedPath =
           constructAlignedPath(
-              node.getDeviceEntries().get(i), measurementColumnNames, 
measurementSchemas);
+              node.getDeviceEntries().get(i),
+              measurementColumnNames,
+              measurementSchemas,
+              allSensors);
       ((DataDriverContext) context.getDriverContext()).addPath(alignedPath);
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
index dea62b9cf05..25853c9c547 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
@@ -32,11 +32,14 @@ import 
org.apache.tsfile.read.reader.series.PaginationController;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class SeriesScanOptions {
 
   private Filter globalTimeFilter;
 
+  private final AtomicBoolean timeFilterUpdatedByTll = new 
AtomicBoolean(false);
+
   private final Filter pushDownFilter;
 
   private final long pushDownLimit;
@@ -97,8 +100,14 @@ public class SeriesScanOptions {
     }
   }
 
+  public boolean timeFilterNeedUpdatedByTll() {
+    return !timeFilterUpdatedByTll.get();
+  }
+
   public void setTTL(long dataTTL) {
-    this.globalTimeFilter = updateFilterUsingTTL(globalTimeFilter, dataTTL);
+    if (timeFilterUpdatedByTll.compareAndSet(false, true)) {
+      this.globalTimeFilter = updateFilterUsingTTL(globalTimeFilter, dataTTL);
+    }
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java
index 75c228fb0bc..f88457cdfea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java
@@ -91,17 +91,14 @@ public class InsertRows extends WrappedInsertStatement {
     for (InsertRowStatement insertRowStatement :
         getInnerTreeStatement().getInsertRowStatementList()) {
       final TableSchema incomingTableSchema = 
toTableSchema(insertRowStatement);
-      final TableSchema realSchema;
-      synchronized (metadata) {
-        realSchema =
-            metadata
-                .validateTableHeaderSchema(
-                    AnalyzeUtils.getDatabaseName(insertRowStatement, context),
-                    incomingTableSchema,
-                    context,
-                    allowCreateTable)
-                .orElse(null);
-      }
+      final TableSchema realSchema =
+          metadata
+              .validateTableHeaderSchema(
+                  AnalyzeUtils.getDatabaseName(insertRowStatement, context),
+                  incomingTableSchema,
+                  context,
+                  allowCreateTable)
+              .orElse(null);
       if (realSchema == null) {
         throw new SemanticException(
             "Schema validation failed, table cannot be created: " + 
incomingTableSchema);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
index b50cae15f84..b951c02cc49 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
@@ -101,13 +101,10 @@ public abstract class WrappedInsertStatement extends 
WrappedStatement
   public void validateTableSchema(Metadata metadata, MPPQueryContext context) {
     String databaseName = getDatabase();
     final TableSchema incomingSchema = getTableSchema();
-    final TableSchema realSchema;
-    synchronized (metadata) {
-      realSchema =
-          metadata
-              .validateTableHeaderSchema(databaseName, incomingSchema, 
context, true)
-              .orElse(null);
-    }
+    final TableSchema realSchema =
+        metadata
+            .validateTableHeaderSchema(databaseName, incomingSchema, context, 
true)
+            .orElse(null);
     if (realSchema == null) {
       throw new SemanticException(
           "Schema validation failed, table cannot be created: " + 
incomingSchema);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 6b91addc3e1..c73d93de5da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2028,7 +2028,7 @@ public class DataRegion implements IDataRegionForQuery {
       QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(SEQUENCE_TSFILE, 
seqResources.size());
       QUERY_RESOURCE_METRIC_SET.recordQueryResourceNum(UNSEQUENCE_TSFILE, 
unseqResources.size());
 
-      return new QueryDataSource(seqResources, unseqResources);
+      return new QueryDataSource(seqResources, unseqResources, databaseName);
     } catch (MetadataException e) {
       throw new QueryProcessException(e);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
index dc56ad11a12..704bdeb6902 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/QueryDataSource.java
@@ -64,6 +64,8 @@ public class QueryDataSource implements IQueryDataSource {
   /* The traversal order of unseqResources (different for each device) */
   private int[] unSeqFileOrderIndex;
 
+  private String databaseName = null;
+
   private static final Comparator<Long> descendingComparator = (o1, o2) -> 
Long.compare(o2, o1);
 
   public QueryDataSource(List<TsFileResource> seqResources, 
List<TsFileResource> unseqResources) {
@@ -71,12 +73,20 @@ public class QueryDataSource implements IQueryDataSource {
     this.unseqResources = unseqResources;
   }
 
+  public QueryDataSource(
+      List<TsFileResource> seqResources, List<TsFileResource> unseqResources, 
String databaseName) {
+    this.seqResources = seqResources;
+    this.unseqResources = unseqResources;
+    this.databaseName = databaseName;
+  }
+
   // used for compaction, because in compaction task(unlike query, each 
QueryDataSource only serve
   // for one series), we will reuse this object for multi series
   public QueryDataSource(QueryDataSource other) {
     this.seqResources = other.seqResources;
     this.unseqResources = other.unseqResources;
     this.unSeqFileOrderIndex = other.unSeqFileOrderIndex;
+    this.databaseName = other.databaseName;
   }
 
   public List<TsFileResource> getSeqResources() {
@@ -89,7 +99,8 @@ public class QueryDataSource implements IQueryDataSource {
 
   @Override
   public IQueryDataSource clone() {
-    QueryDataSource queryDataSource = new QueryDataSource(getSeqResources(), 
getUnseqResources());
+    QueryDataSource queryDataSource =
+        new QueryDataSource(getSeqResources(), getUnseqResources(), 
databaseName);
     queryDataSource.setSingleDevice(isSingleDevice());
     return queryDataSource;
   }
@@ -189,6 +200,9 @@ public class QueryDataSource implements IQueryDataSource {
   }
 
   public void fillOrderIndexes(IDeviceID deviceId, boolean ascending) {
+    if (unseqResources == null || unseqResources.isEmpty()) {
+      return;
+    }
     TreeMap<Long, List<Integer>> orderTimeToIndexMap =
         ascending ? new TreeMap<>() : new TreeMap<>(descendingComparator);
     int index = 0;
@@ -226,7 +240,10 @@ public class QueryDataSource implements IQueryDataSource {
   }
 
   public String getDatabaseName() {
-    List<TsFileResource> resources = !seqResources.isEmpty() ? seqResources : 
unseqResources;
-    return resources.isEmpty() ? null : resources.get(0).getDatabaseName();
+    if (databaseName == null) {
+      List<TsFileResource> resources = !seqResources.isEmpty() ? seqResources 
: unseqResources;
+      databaseName = resources.isEmpty() ? null : 
resources.get(0).getDatabaseName();
+    }
+    return databaseName;
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
index 433ebe797a4..89fa4848b0c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
@@ -1005,7 +1005,7 @@ public class AnalyzerTest {
 
         for (DataPartitionQueryParam dataPartitionQueryParam : 
dataPartitionQueryParams) {
           String databaseName = dataPartitionQueryParam.getDatabaseName();
-          assertEquals(sessionInfo.getDatabaseName().get(), databaseName);
+          assertEquals("root." + sessionInfo.getDatabaseName().get(), 
databaseName);
           databaseName = PathUtils.qualifyDatabaseName(databaseName);
 
           String tableName = 
dataPartitionQueryParam.getDeviceID().getTableName();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedFullPath.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedFullPath.java
index 0ed64897438..581e448c253 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedFullPath.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/AlignedFullPath.java
@@ -24,8 +24,12 @@ import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 
+import javax.annotation.Nullable;
+
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 
 public class AlignedFullPath implements IFullPath {
 
@@ -38,12 +42,25 @@ public class AlignedFullPath implements IFullPath {
 
   private final List<String> measurementList;
   private final List<IMeasurementSchema> schemaList;
+  @Nullable private final Set<String> allSensors;
 
   public AlignedFullPath(
       IDeviceID deviceID, List<String> measurementList, 
List<IMeasurementSchema> schemaList) {
     this.deviceID = deviceID;
     this.measurementList = measurementList;
     this.schemaList = schemaList;
+    this.allSensors = null;
+  }
+
+  public AlignedFullPath(
+      IDeviceID deviceID,
+      List<String> measurementList,
+      List<IMeasurementSchema> schemaList,
+      Set<String> allSensors) {
+    this.deviceID = deviceID;
+    this.measurementList = measurementList;
+    this.schemaList = schemaList;
+    this.allSensors = allSensors;
   }
 
   @Override
@@ -68,6 +85,17 @@ public class AlignedFullPath implements IFullPath {
     return measurementList.size();
   }
 
+  public Set<String> getAllSensors() {
+    if (allSensors != null) {
+      return allSensors;
+    } else {
+      Set<String> res = new HashSet<>(measurementList);
+      // for time column
+      res.add("");
+      return res;
+    }
+  }
+
   @Override
   public long ramBytesUsed() {
     return INSTANCE_SIZE

Reply via email to