This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch force_ci/object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 842c4a5ffc8e987633594cb57e49ebab43cc97a2
Author: Caideyipi <[email protected]>
AuthorDate: Thu Nov 20 17:28:25 2025 +0800

    Fixed the bugs related to device auto-create alignment ignorance (#16780)
    
    * plan-first
    
    * refact
    
    * refa
    
    * fux
    
    * fix
    
    * ref
    
    * partial-fix
    
    * fix
    
    * may-final
    
    * fix
    
    * PBTree
    
    * fix
    
    * gras
    
    * err
    
    * bugfix
    
    (cherry picked from commit 9fd9d7e818a8fd6049b9caf3a417f698070bf429)
---
 .../it/schema/IoTDBCreateAlignedTimeseriesIT.java  | 22 +++++++++++
 .../db/it/schema/IoTDBCreateTimeseriesIT.java      |  8 ++++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  1 -
 .../schemaregion/SchemaExecutionVisitor.java       | 44 +++++++++++++--------
 .../metadata/AlignedTimeSeriesException.java       | 35 ++++++++++++++++
 .../execution/executor/RegionWriteExecutor.java    |  4 +-
 .../analyze/schema/AutoCreateSchemaExecutor.java   | 46 ++++++++++++++++------
 .../schemaregion/impl/SchemaRegionMemoryImpl.java  | 10 ++++-
 .../schemaregion/impl/SchemaRegionPBTreeImpl.java  | 10 ++++-
 .../mtree/impl/mem/MTreeBelowSGMemoryImpl.java     | 18 ++++++++-
 .../mtree/impl/pbtree/MTreeBelowSGCachedImpl.java  | 21 ++++++++--
 .../req/impl/CreateAlignedTimeSeriesPlanImpl.java  | 10 +++++
 .../write/req/impl/CreateTimeSeriesPlanImpl.java   | 10 +++++
 .../db/metadata/path/MeasurementPathTest.java      |  4 +-
 .../apache/iotdb/commons/path/MeasurementPath.java | 26 ------------
 .../org/apache/iotdb/commons/path/PartialPath.java | 26 ++++++++++++
 16 files changed, 227 insertions(+), 68 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java
index bd470cfdb4d..7b784c034ab 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.it.schema;
 
 import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.itbase.category.ClusterIT;
 import org.apache.iotdb.itbase.category.LocalStandaloneIT;
@@ -34,6 +35,9 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Collections;
+
+import static org.junit.Assert.fail;
 
 /**
  * Notice that, all test begins with "IoTDB" is integration test. All test 
which will start the
@@ -141,4 +145,22 @@ public class IoTDBCreateAlignedTimeseriesIT extends 
AbstractSchemaIT {
     }
     Assert.assertEquals(timeSeriesArray.length, count);
   }
+
+  @Test
+  public void testDifferentDeviceAlignment() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      // Should ignore the alignment difference
+      statement.execute("create aligned timeseries root.sg2.d (s2 int64, s3 
int64)");
+      // Should use the existing alignment
+      statement.execute("create timeseries root.sg2.d.s1 with datatype=INT64");
+      statement.execute("insert into root.sg2.d (time, s4) values (-1, 1)");
+      TestUtils.assertResultSetEqual(
+          statement.executeQuery("select * from root.sg2.d"),
+          "Time,root.sg2.d.s3,root.sg2.d.s4,root.sg2.d.s1,root.sg2.d.s2,",
+          Collections.singleton("-1,null,1.0,null,null,"));
+    } catch (SQLException ignored) {
+      fail();
+    }
+  }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java
index ca309274b63..b2b6f1c1078 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.it.schema;
 
 import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.itbase.category.ClusterIT;
 import org.apache.iotdb.itbase.category.LocalStandaloneIT;
@@ -35,6 +36,7 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -308,6 +310,12 @@ public class IoTDBCreateTimeseriesIT extends 
AbstractSchemaIT {
       statement.execute("create timeseries root.sg2.d.s1 with datatype=INT64");
       // Should ignore the alignment difference
       statement.execute("create aligned timeseries root.sg2.d (s2 int64, s3 
int64)");
+      // Should use the existing alignment
+      statement.execute("insert into root.sg2.d (time, s4) aligned values (-1, 
1)");
+      TestUtils.assertResultSetEqual(
+          statement.executeQuery("select * from root.sg2.d"),
+          "Time,root.sg2.d.s3,root.sg2.d.s4,root.sg2.d.s1,root.sg2.d.s2,",
+          Collections.singleton("-1,null,1.0,null,null,"));
     } catch (SQLException ignored) {
       fail();
     }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 4fd90ca4b94..246a1f66982 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -106,7 +106,6 @@ public enum TSStatusCode {
   WRITE_PROCESS_REJECT(606),
   OUT_OF_TTL(607),
   COMPACTION_ERROR(608),
-  @Deprecated
   ALIGNED_TIMESERIES_ERROR(609),
   WAL_ERROR(610),
   DISK_SPACE_INSUFFICIENT(611),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
index 8d3c2155838..28ccff2f20d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
@@ -189,7 +189,7 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
     final PartialPath devicePath = node.getDevicePath();
     final MeasurementGroup measurementGroup = node.getMeasurementGroup();
 
-    final List<TSStatus> alreadyExistingTimeSeries = new ArrayList<>();
+    final List<TSStatus> existingTimeSeriesAndAlignmentMismatch = new 
ArrayList<>();
     final List<TSStatus> failingStatus = new ArrayList<>();
 
     if (node.isAligned()) {
@@ -197,7 +197,7 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
           devicePath,
           measurementGroup,
           schemaRegion,
-          alreadyExistingTimeSeries,
+          existingTimeSeriesAndAlignmentMismatch,
           failingStatus,
           node.isGeneratedByPipe());
     } else {
@@ -205,7 +205,7 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
           devicePath,
           measurementGroup,
           schemaRegion,
-          alreadyExistingTimeSeries,
+          existingTimeSeriesAndAlignmentMismatch,
           failingStatus,
           node.isGeneratedByPipe());
     }
@@ -214,8 +214,8 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
       return RpcUtils.getStatus(failingStatus);
     }
 
-    if (!alreadyExistingTimeSeries.isEmpty()) {
-      return RpcUtils.getStatus(alreadyExistingTimeSeries);
+    if (!existingTimeSeriesAndAlignmentMismatch.isEmpty()) {
+      return RpcUtils.getStatus(existingTimeSeriesAndAlignmentMismatch);
     }
 
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute 
successfully");
@@ -227,7 +227,7 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
     PartialPath devicePath;
     MeasurementGroup measurementGroup;
 
-    final List<TSStatus> alreadyExistingTimeSeries = new ArrayList<>();
+    final List<TSStatus> existingTimeSeriesAndAlignmentMismatch = new 
ArrayList<>();
     final List<TSStatus> failingStatus = new ArrayList<>();
 
     for (final Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> 
deviceEntry :
@@ -239,7 +239,7 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
             devicePath,
             measurementGroup,
             schemaRegion,
-            alreadyExistingTimeSeries,
+            existingTimeSeriesAndAlignmentMismatch,
             failingStatus,
             node.isGeneratedByPipe());
       } else {
@@ -247,7 +247,7 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
             devicePath,
             measurementGroup,
             schemaRegion,
-            alreadyExistingTimeSeries,
+            existingTimeSeriesAndAlignmentMismatch,
             failingStatus,
             node.isGeneratedByPipe());
       }
@@ -257,8 +257,8 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
       return RpcUtils.getStatus(failingStatus);
     }
 
-    if (!alreadyExistingTimeSeries.isEmpty()) {
-      return RpcUtils.getStatus(alreadyExistingTimeSeries);
+    if (!existingTimeSeriesAndAlignmentMismatch.isEmpty()) {
+      return RpcUtils.getStatus(existingTimeSeriesAndAlignmentMismatch);
     }
 
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute 
successfully");
@@ -268,11 +268,12 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
       final PartialPath devicePath,
       final MeasurementGroup measurementGroup,
       final ISchemaRegion schemaRegion,
-      final List<TSStatus> alreadyExistingTimeSeries,
+      final List<TSStatus> existingTimeSeriesAndAlignmentMismatch,
       final List<TSStatus> failingStatus,
       final boolean withMerge) {
     final int size = measurementGroup.getMeasurements().size();
     // todo implement batch creation of one device in SchemaRegion
+    boolean alignedIsSet = false;
     for (int i = 0; i < size; i++) {
       try {
         final ICreateTimeSeriesPlan createTimeSeriesPlan =
@@ -283,11 +284,17 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
         // Thus the original ones are not altered
         ((CreateTimeSeriesPlanImpl) 
createTimeSeriesPlan).setWithMerge(withMerge);
         schemaRegion.createTimeSeries(createTimeSeriesPlan, -1);
+        if (((CreateTimeSeriesPlanImpl) 
createTimeSeriesPlan).getAligned().get() && !alignedIsSet) {
+          existingTimeSeriesAndAlignmentMismatch.add(
+              new 
TSStatus(TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode())
+                  .setMessage(PartialPath.transformDataToString(devicePath)));
+          alignedIsSet = true;
+        }
       } catch (final MeasurementAlreadyExistException e) {
         // There's no need to internal create time series.
-        alreadyExistingTimeSeries.add(
+        existingTimeSeriesAndAlignmentMismatch.add(
             RpcUtils.getStatus(
-                e.getErrorCode(), 
MeasurementPath.transformDataToString(e.getMeasurementPath())));
+                e.getErrorCode(), 
PartialPath.transformDataToString(e.getMeasurementPath())));
       } catch (final MetadataException e) {
         logger.warn("{}: MetaData error: ", e.getMessage(), e);
         failingStatus.add(RpcUtils.getStatus(e.getErrorCode(), 
e.getMessage()));
@@ -299,7 +306,7 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
       final PartialPath devicePath,
       final MeasurementGroup measurementGroup,
       final ISchemaRegion schemaRegion,
-      final List<TSStatus> alreadyExistingTimeSeries,
+      final List<TSStatus> existingTimeSeriesAndAlignmentMismatch,
       final List<TSStatus> failingStatus,
       final boolean withMerge) {
     final List<String> measurementList = measurementGroup.getMeasurements();
@@ -336,9 +343,9 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
         // The existence check will be executed before truly creation
         // There's no need to internal create time series.
         final MeasurementPath measurementPath = e.getMeasurementPath();
-        alreadyExistingTimeSeries.add(
+        existingTimeSeriesAndAlignmentMismatch.add(
             RpcUtils.getStatus(
-                e.getErrorCode(), 
MeasurementPath.transformDataToString(e.getMeasurementPath())));
+                e.getErrorCode(), 
PartialPath.transformDataToString(e.getMeasurementPath())));
 
         // remove the existing time series from plan
         final int index = 
measurementList.indexOf(measurementPath.getMeasurement());
@@ -389,6 +396,11 @@ public class SchemaExecutionVisitor extends 
PlanVisitor<TSStatus, ISchemaRegion>
         shouldRetry = false;
       }
     }
+    if (!((CreateAlignedTimeSeriesPlanImpl) 
createAlignedTimeSeriesPlan).getAligned().get()) {
+      existingTimeSeriesAndAlignmentMismatch.add(
+          new TSStatus(TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode())
+              .setMessage(PartialPath.transformDataToString(devicePath)));
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeSeriesException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeSeriesException.java
new file mode 100644
index 00000000000..f159376dfae
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeSeriesException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception.metadata;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class AlignedTimeSeriesException extends MetadataException {
+
+  public AlignedTimeSeriesException(final boolean aligned, final String path) {
+    super(
+        String.format(
+            "TimeSeries under this device is%s aligned, please use 
createTimeSeries or change device. (Path: %s)",
+            aligned ? "" : " not", path),
+        TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode(),
+        true);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index 63af0423d1a..b5a831b90b2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -676,7 +676,7 @@ public class RegionWriteExecutor {
               alreadyExistingStatus.add(
                   RpcUtils.getStatus(
                       metadataException.getErrorCode(),
-                      MeasurementPath.transformDataToString(
+                      PartialPath.transformDataToString(
                           ((MeasurementAlreadyExistException) 
metadataException)
                               .getMeasurementPath())));
             } else {
@@ -774,7 +774,7 @@ public class RegionWriteExecutor {
                 alreadyExistingStatus.add(
                     RpcUtils.getStatus(
                         metadataException.getErrorCode(),
-                        MeasurementPath.transformDataToString(
+                        PartialPath.transformDataToString(
                             ((MeasurementAlreadyExistException) 
metadataException)
                                 .getMeasurementPath())));
               } else {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
index 358614c9a58..a6a3301aea0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.AlignedTimeSeriesException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.protocol.session.SessionManager;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -188,7 +189,7 @@ class AutoCreateSchemaExecutor {
     }
 
     if (!devicesNeedAutoCreateTimeSeries.isEmpty()) {
-      internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries, 
context);
+      internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries, 
context, false);
     }
   }
 
@@ -425,7 +426,7 @@ class AutoCreateSchemaExecutor {
     }
 
     if (!devicesNeedAutoCreateTimeSeries.isEmpty()) {
-      internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries, 
context);
+      internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries, 
context, true);
     }
   }
 
@@ -463,11 +464,15 @@ class AutoCreateSchemaExecutor {
       List<CompressionType> compressors,
       boolean isAligned,
       MPPQueryContext context) {
+    final Map<PartialPath, Pair<Boolean, MeasurementGroup>> input =
+        Collections.singletonMap(devicePath, new Pair<>(isAligned, null));
     List<MeasurementPath> measurementPathList =
-        executeInternalCreateTimeseriesStatement(
+        executeInternalCreateTimeSeriesStatement(
+            input,
             new InternalCreateTimeSeriesStatement(
                 devicePath, measurements, tsDataTypes, encodings, compressors, 
isAligned),
-            context);
+            context,
+            false);
 
     Set<Integer> alreadyExistingMeasurementIndexSet =
         measurementPathList.stream()
@@ -488,13 +493,16 @@ class AutoCreateSchemaExecutor {
           null,
           null,
           null,
-          isAligned);
+          input.get(devicePath).getLeft());
     }
   }
 
-  // Auto create timeseries and return the existing timeseries info
-  private List<MeasurementPath> executeInternalCreateTimeseriesStatement(
-      final Statement statement, final MPPQueryContext context) {
+  // Auto create timeSeries and return the existing timeSeries info
+  private List<MeasurementPath> executeInternalCreateTimeSeriesStatement(
+      final Map<PartialPath, Pair<Boolean, MeasurementGroup>> 
devicesNeedAutoCreateTimeSeries,
+      final Statement statement,
+      final MPPQueryContext context,
+      final boolean isLoad) {
     final TSStatus status = AuthorityChecker.checkAuthority(statement, 
context);
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new IoTDBRuntimeException(status.getMessage(), status.getCode());
@@ -516,7 +524,18 @@ class AutoCreateSchemaExecutor {
     for (final TSStatus subStatus : executionResult.status.subStatus) {
       if (subStatus.code == 
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
         alreadyExistingMeasurements.add(
-            MeasurementPath.parseDataFromString(subStatus.getMessage()));
+            (MeasurementPath) 
PartialPath.parseDataFromString(subStatus.getMessage()));
+      } else if (subStatus.code == 
TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()) {
+        final PartialPath devicePath = 
PartialPath.parseDataFromString(subStatus.getMessage());
+        final Pair<Boolean, MeasurementGroup> pair =
+            devicesNeedAutoCreateTimeSeries.get(devicePath);
+        if (!isLoad) {
+          pair.setLeft(!pair.getLeft());
+        } else {
+          // Load does not tolerate the device alignment mismatch
+          throw new SemanticException(
+              new AlignedTimeSeriesException(!pair.getLeft(), 
devicePath.getFullPath()));
+        }
       } else {
         failedCreationSet.add(subStatus);
       }
@@ -583,11 +602,13 @@ class AutoCreateSchemaExecutor {
   private void internalCreateTimeSeries(
       ClusterSchemaTree schemaTree,
       Map<PartialPath, Pair<Boolean, MeasurementGroup>> 
devicesNeedAutoCreateTimeSeries,
-      MPPQueryContext context) {
+      MPPQueryContext context,
+      final boolean isLoad) {
 
     // Deep copy to avoid changes to the original map
     final List<MeasurementPath> measurementPathList =
-        executeInternalCreateTimeseriesStatement(
+        executeInternalCreateTimeSeriesStatement(
+            devicesNeedAutoCreateTimeSeries,
             new InternalCreateMultiTimeSeriesStatement(
                 devicesNeedAutoCreateTimeSeries.entrySet().stream()
                     .collect(
@@ -597,7 +618,8 @@ class AutoCreateSchemaExecutor {
                                 new Pair<>(
                                     entry.getValue().getLeft(),
                                     entry.getValue().getRight().deepCopy())))),
-            context);
+            context,
+            isLoad);
 
     schemaTree.appendMeasurementPaths(measurementPathList);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index 39be89f0938..5f6486f8be4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -702,7 +702,10 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
               plan instanceof CreateTimeSeriesPlanImpl
                       && ((CreateTimeSeriesPlanImpl) plan).isWithMerge()
                   || plan instanceof CreateTimeSeriesNode
-                      && ((CreateTimeSeriesNode) plan).isGeneratedByPipe());
+                      && ((CreateTimeSeriesNode) plan).isGeneratedByPipe(),
+              plan instanceof CreateTimeSeriesPlanImpl
+                  ? ((CreateTimeSeriesPlanImpl) plan).getAligned()
+                  : null);
 
       // Should merge
       if (Objects.isNull(leafMNode)) {
@@ -786,7 +789,10 @@ public class SchemaRegionMemoryImpl implements 
ISchemaRegion {
               aliasList,
               (plan instanceof CreateAlignedTimeSeriesPlanImpl
                   && ((CreateAlignedTimeSeriesPlanImpl) plan).isWithMerge()),
-              existingMeasurementIndexes);
+              existingMeasurementIndexes,
+              (plan instanceof CreateAlignedTimeSeriesPlanImpl
+                  ? ((CreateAlignedTimeSeriesPlanImpl) plan).getAligned()
+                  : null));
 
       // update statistics and schemaDataTypeNumMap
       regionStatistics.addMeasurement(measurementMNodeList.size());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
index 944ffe0ea53..a39b881fe1f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
@@ -645,7 +645,10 @@ public class SchemaRegionPBTreeImpl implements 
ISchemaRegion {
               plan.getProps(),
               plan.getAlias(),
               (plan instanceof CreateTimeSeriesPlanImpl
-                  && ((CreateTimeSeriesPlanImpl) plan).isWithMerge()));
+                  && ((CreateTimeSeriesPlanImpl) plan).isWithMerge()),
+              plan instanceof CreateTimeSeriesPlanImpl
+                  ? ((CreateTimeSeriesPlanImpl) plan).getAligned()
+                  : null);
 
       try {
         // Should merge
@@ -756,7 +759,10 @@ public class SchemaRegionPBTreeImpl implements 
ISchemaRegion {
               aliasList,
               (plan instanceof CreateAlignedTimeSeriesPlanImpl
                   && ((CreateAlignedTimeSeriesPlanImpl) plan).isWithMerge()),
-              existingMeasurementIndexes);
+              existingMeasurementIndexes,
+              (plan instanceof CreateAlignedTimeSeriesPlanImpl
+                  ? ((CreateAlignedTimeSeriesPlanImpl) plan).getAligned()
+                  : null));
 
       try {
         // Update statistics and schemaDataTypeNumMap
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
index 15d51928d64..5931023b3cf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
@@ -238,7 +238,8 @@ public class MTreeBelowSGMemoryImpl {
       final CompressionType compressor,
       final Map<String, String> props,
       final String alias,
-      final boolean withMerge)
+      final boolean withMerge,
+      final AtomicBoolean isAligned)
       throws MetadataException {
     final String[] nodeNames = path.getNodes();
     if (nodeNames.length <= 2) {
@@ -253,6 +254,12 @@ public class MTreeBelowSGMemoryImpl {
     synchronized (this) {
       final IMemMNode device = 
checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent);
 
+      if (device.isDevice()
+          && device.getAsDeviceMNode().isAlignedNullable() != null
+          && isAligned != null) {
+        isAligned.set(device.getAsDeviceMNode().isAligned());
+      }
+
       MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), props);
 
       final String leafName = path.getMeasurement();
@@ -328,7 +335,8 @@ public class MTreeBelowSGMemoryImpl {
       final List<CompressionType> compressors,
       final List<String> aliasList,
       final boolean withMerge,
-      final Set<Integer> existingMeasurementIndexes)
+      final Set<Integer> existingMeasurementIndexes,
+      final AtomicBoolean isAligned)
       throws MetadataException {
     final List<IMeasurementMNode<IMemMNode>> measurementMNodeList = new 
ArrayList<>();
     MetaFormatUtils.checkSchemaMeasurementNames(measurements);
@@ -339,6 +347,12 @@ public class MTreeBelowSGMemoryImpl {
     synchronized (this) {
       final IMemMNode device = 
checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent);
 
+      if (device.isDevice()
+          && device.getAsDeviceMNode().isAlignedNullable() != null
+          && isAligned != null) {
+        isAligned.set(device.getAsDeviceMNode().isAligned());
+      }
+
       for (int i = 0; i < measurements.size(); i++) {
         if (device.hasChild(measurements.get(i))) {
           final IMemMNode node = device.getChild(measurements.get(i));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
index d5187e14c91..5127cd9e655 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
@@ -266,7 +266,8 @@ public class MTreeBelowSGCachedImpl {
       String alias)
       throws MetadataException {
     IMeasurementMNode<ICachedMNode> measurementMNode =
-        createTimeSeriesWithPinnedReturn(path, dataType, encoding, compressor, 
props, alias, false);
+        createTimeSeriesWithPinnedReturn(
+            path, dataType, encoding, compressor, props, alias, false, null);
     unPinMNode(measurementMNode.getAsMNode());
     return measurementMNode;
   }
@@ -289,7 +290,8 @@ public class MTreeBelowSGCachedImpl {
       final CompressionType compressor,
       final Map<String, String> props,
       final String alias,
-      final boolean withMerge)
+      final boolean withMerge,
+      final AtomicBoolean isAligned)
       throws MetadataException {
     final String[] nodeNames = path.getNodes();
     if (nodeNames.length <= 2) {
@@ -305,6 +307,12 @@ public class MTreeBelowSGCachedImpl {
       synchronized (this) {
         ICachedMNode device = 
checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent);
 
+        if (device.isDevice()
+            && device.getAsDeviceMNode().isAlignedNullable() != null
+            && isAligned != null) {
+          isAligned.set(device.getAsDeviceMNode().isAligned());
+        }
+
         try {
           MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), props);
 
@@ -387,7 +395,8 @@ public class MTreeBelowSGCachedImpl {
       final List<CompressionType> compressors,
       final List<String> aliasList,
       final boolean withMerge,
-      final Set<Integer> existingMeasurementIndexes)
+      final Set<Integer> existingMeasurementIndexes,
+      final AtomicBoolean isAligned)
       throws MetadataException {
     final List<IMeasurementMNode<ICachedMNode>> measurementMNodeList = new 
ArrayList<>();
     MetaFormatUtils.checkSchemaMeasurementNames(measurements);
@@ -399,6 +408,12 @@ public class MTreeBelowSGCachedImpl {
       synchronized (this) {
         ICachedMNode device = 
checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent);
 
+        if (device.isDevice()
+            && device.getAsDeviceMNode().isAlignedNullable() != null
+            && isAligned != null) {
+          isAligned.set(device.getAsDeviceMNode().isAligned());
+        }
+
         try {
           for (int i = 0; i < measurements.size(); i++) {
             if (store.hasChild(device, measurements.get(i))) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java
index 4ffa8aca5b2..8a2c4c12e8c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java
@@ -32,6 +32,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class CreateAlignedTimeSeriesPlanImpl implements 
ICreateAlignedTimeSeriesPlan {
 
@@ -45,6 +46,7 @@ public class CreateAlignedTimeSeriesPlanImpl implements 
ICreateAlignedTimeSeries
   private List<Map<String, String>> attributesList;
   private List<Long> tagOffsets = null;
   private transient boolean withMerge;
+  private final transient AtomicBoolean aligned = new AtomicBoolean(true);
 
   public CreateAlignedTimeSeriesPlanImpl() {}
 
@@ -194,4 +196,12 @@ public class CreateAlignedTimeSeriesPlanImpl implements 
ICreateAlignedTimeSeries
       tagOffsets = Objects.nonNull(tagOffsets) ? new ArrayList<>(tagOffsets) : 
null;
     }
   }
+
+  public void setAligned(final boolean aligned) {
+    this.aligned.set(aligned);
+  }
+
+  public AtomicBoolean getAligned() {
+    return aligned;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java
index 6234cc41aff..73b47a4f516 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java
@@ -28,6 +28,7 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
 
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class CreateTimeSeriesPlanImpl implements ICreateTimeSeriesPlan {
 
@@ -41,6 +42,7 @@ public class CreateTimeSeriesPlanImpl implements 
ICreateTimeSeriesPlan {
   private Map<String, String> attributes = null;
   private long tagOffset = -1;
   private transient boolean withMerge;
+  private final transient AtomicBoolean aligned = new AtomicBoolean(false);
 
   public CreateTimeSeriesPlanImpl() {}
 
@@ -170,4 +172,12 @@ public class CreateTimeSeriesPlanImpl implements 
ICreateTimeSeriesPlan {
   public void setWithMerge(final boolean withMerge) {
     this.withMerge = withMerge;
   }
+
+  public void setAligned(final boolean aligned) {
+    this.aligned.set(aligned);
+  }
+
+  public AtomicBoolean getAligned() {
+    return aligned;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java
index cf5f8ab57ba..148a3c56bd4 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java
@@ -43,8 +43,8 @@ public class MeasurementPathTest {
         new MeasurementPath(
             new PartialPath("root.sg.d.s"), new MeasurementSchema("s", 
TSDataType.INT32), true);
     rawPath.setMeasurementAlias("alias");
-    String string = MeasurementPath.transformDataToString(rawPath);
-    MeasurementPath newPath = MeasurementPath.parseDataFromString(string);
+    String string = PartialPath.transformDataToString(rawPath);
+    MeasurementPath newPath = (MeasurementPath) 
PartialPath.parseDataFromString(string);
     Assert.assertEquals(rawPath.getFullPath(), newPath.getFullPath());
     Assert.assertEquals(rawPath.getMeasurementAlias(), 
newPath.getMeasurementAlias());
     Assert.assertEquals(rawPath.getMeasurementSchema(), 
newPath.getMeasurementSchema());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
index f74a29f1815..0718366aabf 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
@@ -33,12 +33,9 @@ import 
org.apache.tsfile.write.schema.VectorMeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.rmi.UnexpectedException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -339,23 +336,6 @@ public class MeasurementPath extends PartialPath {
     return getDevicePath().concatNode(getTailNode());
   }
 
-  /**
-   * In specific scenarios, like internal create timeseries, the message can 
only be passed as
-   * String format.
-   */
-  public static String transformDataToString(MeasurementPath measurementPath) {
-    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
-    try {
-      measurementPath.serialize(dataOutputStream);
-    } catch (IOException ignored) {
-      // this exception won't happen.
-    }
-    byte[] bytes = byteArrayOutputStream.toByteArray();
-    // must use single-byte char sets
-    return new String(bytes, StandardCharsets.ISO_8859_1);
-  }
-
   @Override
   protected IDeviceID toDeviceID(String[] nodes) {
     // remove measurement
@@ -363,12 +343,6 @@ public class MeasurementPath extends PartialPath {
     return super.toDeviceID(nodes);
   }
 
-  public static MeasurementPath parseDataFromString(String 
measurementPathData) {
-    return (MeasurementPath)
-        PathDeserializeUtil.deserialize(
-            
ByteBuffer.wrap(measurementPathData.getBytes(StandardCharsets.ISO_8859_1)));
-  }
-
   @Override
   protected PartialPath createPartialPath(String[] newPathNodes) {
     return new MeasurementPath(newPathNodes);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index ae35769a1d0..b0f2c3fe051 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -38,9 +38,12 @@ import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -1040,6 +1043,29 @@ public class PartialPath extends Path implements 
Comparable<Path>, Cloneable {
     return this;
   }
 
+  /**
+   * In specific scenarios, like internal create timeseries, the message can 
only be passed as
+   * String format.
+   */
+  public static String transformDataToString(PartialPath partialPath) {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
+    try {
+      partialPath.serialize(dataOutputStream);
+    } catch (IOException ignored) {
+      // this exception won't happen.
+    }
+    byte[] bytes = byteArrayOutputStream.toByteArray();
+    // must use single-byte char sets
+    return new String(bytes, StandardCharsets.ISO_8859_1);
+  }
+
+  public static PartialPath parseDataFromString(String measurementPathData) {
+    return (PartialPath)
+        PathDeserializeUtil.deserialize(
+            
ByteBuffer.wrap(measurementPathData.getBytes(StandardCharsets.ISO_8859_1)));
+  }
+
   /** Return true if the path ends with ** and no other nodes contain *. 
Otherwise, return false. */
   public boolean isPrefixPath() {
     if (nodes.length <= 0) {


Reply via email to