This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9fd9d7e818a Fixed the bugs related to device auto-create alignment
ignorance (#16780)
9fd9d7e818a is described below
commit 9fd9d7e818a8fd6049b9caf3a417f698070bf429
Author: Caideyipi <[email protected]>
AuthorDate: Thu Nov 20 17:28:25 2025 +0800
Fixed the bugs related to device auto-create alignment ignorance (#16780)
* plan-first
* refact
* refa
* fux
* fix
* ref
* partial-fix
* fix
* may-final
* fix
* PBTree
* fix
* gras
* err
* bugfix
---
.../it/schema/IoTDBCreateAlignedTimeseriesIT.java | 22 +++++++++++
.../db/it/schema/IoTDBCreateTimeseriesIT.java | 8 ++++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 -
.../schemaregion/SchemaExecutionVisitor.java | 44 +++++++++++++--------
.../metadata/AlignedTimeSeriesException.java | 35 ++++++++++++++++
.../execution/executor/RegionWriteExecutor.java | 4 +-
.../analyze/schema/AutoCreateSchemaExecutor.java | 46 ++++++++++++++++------
.../schemaregion/impl/SchemaRegionMemoryImpl.java | 10 ++++-
.../schemaregion/impl/SchemaRegionPBTreeImpl.java | 10 ++++-
.../mtree/impl/mem/MTreeBelowSGMemoryImpl.java | 18 ++++++++-
.../mtree/impl/pbtree/MTreeBelowSGCachedImpl.java | 21 ++++++++--
.../req/impl/CreateAlignedTimeSeriesPlanImpl.java | 10 +++++
.../write/req/impl/CreateTimeSeriesPlanImpl.java | 10 +++++
.../db/metadata/path/MeasurementPathTest.java | 4 +-
.../apache/iotdb/commons/path/MeasurementPath.java | 26 ------------
.../org/apache/iotdb/commons/path/PartialPath.java | 26 ++++++++++++
16 files changed, 227 insertions(+), 68 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java
index bd470cfdb4d..7b784c034ab 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.it.schema;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
@@ -34,6 +35,9 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collections;
+
+import static org.junit.Assert.fail;
/**
* Notice that, all test begins with "IoTDB" is integration test. All test
which will start the
@@ -141,4 +145,22 @@ public class IoTDBCreateAlignedTimeseriesIT extends
AbstractSchemaIT {
}
Assert.assertEquals(timeSeriesArray.length, count);
}
+
+ @Test
+ public void testDifferentDeviceAlignment() {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ // Should ignore the alignment difference
+ statement.execute("create aligned timeseries root.sg2.d (s2 int64, s3
int64)");
+ // Should use the existing alignment
+ statement.execute("create timeseries root.sg2.d.s1 with datatype=INT64");
+ statement.execute("insert into root.sg2.d (time, s4) values (-1, 1)");
+ TestUtils.assertResultSetEqual(
+ statement.executeQuery("select * from root.sg2.d"),
+ "Time,root.sg2.d.s3,root.sg2.d.s4,root.sg2.d.s1,root.sg2.d.s2,",
+ Collections.singleton("-1,null,1.0,null,null,"));
+ } catch (SQLException ignored) {
+ fail();
+ }
+ }
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java
index ca309274b63..b2b6f1c1078 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.it.schema;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
@@ -35,6 +36,7 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -308,6 +310,12 @@ public class IoTDBCreateTimeseriesIT extends
AbstractSchemaIT {
statement.execute("create timeseries root.sg2.d.s1 with datatype=INT64");
// Should ignore the alignment difference
statement.execute("create aligned timeseries root.sg2.d (s2 int64, s3
int64)");
+ // Should use the existing alignment
+ statement.execute("insert into root.sg2.d (time, s4) aligned values (-1,
1)");
+ TestUtils.assertResultSetEqual(
+ statement.executeQuery("select * from root.sg2.d"),
+ "Time,root.sg2.d.s3,root.sg2.d.s4,root.sg2.d.s1,root.sg2.d.s2,",
+ Collections.singleton("-1,null,1.0,null,null,"));
} catch (SQLException ignored) {
fail();
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index a4efbf99870..7f94dd696ac 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -106,7 +106,6 @@ public enum TSStatusCode {
WRITE_PROCESS_REJECT(606),
OUT_OF_TTL(607),
COMPACTION_ERROR(608),
- @Deprecated
ALIGNED_TIMESERIES_ERROR(609),
WAL_ERROR(610),
DISK_SPACE_INSUFFICIENT(611),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
index 8d3c2155838..28ccff2f20d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
@@ -189,7 +189,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
final PartialPath devicePath = node.getDevicePath();
final MeasurementGroup measurementGroup = node.getMeasurementGroup();
- final List<TSStatus> alreadyExistingTimeSeries = new ArrayList<>();
+ final List<TSStatus> existingTimeSeriesAndAlignmentMismatch = new
ArrayList<>();
final List<TSStatus> failingStatus = new ArrayList<>();
if (node.isAligned()) {
@@ -197,7 +197,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
devicePath,
measurementGroup,
schemaRegion,
- alreadyExistingTimeSeries,
+ existingTimeSeriesAndAlignmentMismatch,
failingStatus,
node.isGeneratedByPipe());
} else {
@@ -205,7 +205,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
devicePath,
measurementGroup,
schemaRegion,
- alreadyExistingTimeSeries,
+ existingTimeSeriesAndAlignmentMismatch,
failingStatus,
node.isGeneratedByPipe());
}
@@ -214,8 +214,8 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
return RpcUtils.getStatus(failingStatus);
}
- if (!alreadyExistingTimeSeries.isEmpty()) {
- return RpcUtils.getStatus(alreadyExistingTimeSeries);
+ if (!existingTimeSeriesAndAlignmentMismatch.isEmpty()) {
+ return RpcUtils.getStatus(existingTimeSeriesAndAlignmentMismatch);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute
successfully");
@@ -227,7 +227,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
PartialPath devicePath;
MeasurementGroup measurementGroup;
- final List<TSStatus> alreadyExistingTimeSeries = new ArrayList<>();
+ final List<TSStatus> existingTimeSeriesAndAlignmentMismatch = new
ArrayList<>();
final List<TSStatus> failingStatus = new ArrayList<>();
for (final Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>>
deviceEntry :
@@ -239,7 +239,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
devicePath,
measurementGroup,
schemaRegion,
- alreadyExistingTimeSeries,
+ existingTimeSeriesAndAlignmentMismatch,
failingStatus,
node.isGeneratedByPipe());
} else {
@@ -247,7 +247,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
devicePath,
measurementGroup,
schemaRegion,
- alreadyExistingTimeSeries,
+ existingTimeSeriesAndAlignmentMismatch,
failingStatus,
node.isGeneratedByPipe());
}
@@ -257,8 +257,8 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
return RpcUtils.getStatus(failingStatus);
}
- if (!alreadyExistingTimeSeries.isEmpty()) {
- return RpcUtils.getStatus(alreadyExistingTimeSeries);
+ if (!existingTimeSeriesAndAlignmentMismatch.isEmpty()) {
+ return RpcUtils.getStatus(existingTimeSeriesAndAlignmentMismatch);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute
successfully");
@@ -268,11 +268,12 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
final PartialPath devicePath,
final MeasurementGroup measurementGroup,
final ISchemaRegion schemaRegion,
- final List<TSStatus> alreadyExistingTimeSeries,
+ final List<TSStatus> existingTimeSeriesAndAlignmentMismatch,
final List<TSStatus> failingStatus,
final boolean withMerge) {
final int size = measurementGroup.getMeasurements().size();
// todo implement batch creation of one device in SchemaRegion
+ boolean alignedIsSet = false;
for (int i = 0; i < size; i++) {
try {
final ICreateTimeSeriesPlan createTimeSeriesPlan =
@@ -283,11 +284,17 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
// Thus the original ones are not altered
((CreateTimeSeriesPlanImpl)
createTimeSeriesPlan).setWithMerge(withMerge);
schemaRegion.createTimeSeries(createTimeSeriesPlan, -1);
+ if (((CreateTimeSeriesPlanImpl)
createTimeSeriesPlan).getAligned().get() && !alignedIsSet) {
+ existingTimeSeriesAndAlignmentMismatch.add(
+ new
TSStatus(TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode())
+ .setMessage(PartialPath.transformDataToString(devicePath)));
+ alignedIsSet = true;
+ }
} catch (final MeasurementAlreadyExistException e) {
// There's no need to internal create time series.
- alreadyExistingTimeSeries.add(
+ existingTimeSeriesAndAlignmentMismatch.add(
RpcUtils.getStatus(
- e.getErrorCode(),
MeasurementPath.transformDataToString(e.getMeasurementPath())));
+ e.getErrorCode(),
PartialPath.transformDataToString(e.getMeasurementPath())));
} catch (final MetadataException e) {
logger.warn("{}: MetaData error: ", e.getMessage(), e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(),
e.getMessage()));
@@ -299,7 +306,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
final PartialPath devicePath,
final MeasurementGroup measurementGroup,
final ISchemaRegion schemaRegion,
- final List<TSStatus> alreadyExistingTimeSeries,
+ final List<TSStatus> existingTimeSeriesAndAlignmentMismatch,
final List<TSStatus> failingStatus,
final boolean withMerge) {
final List<String> measurementList = measurementGroup.getMeasurements();
@@ -336,9 +343,9 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
// The existence check will be executed before truly creation
// There's no need to internal create time series.
final MeasurementPath measurementPath = e.getMeasurementPath();
- alreadyExistingTimeSeries.add(
+ existingTimeSeriesAndAlignmentMismatch.add(
RpcUtils.getStatus(
- e.getErrorCode(),
MeasurementPath.transformDataToString(e.getMeasurementPath())));
+ e.getErrorCode(),
PartialPath.transformDataToString(e.getMeasurementPath())));
// remove the existing time series from plan
final int index =
measurementList.indexOf(measurementPath.getMeasurement());
@@ -389,6 +396,11 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
shouldRetry = false;
}
}
+ if (!((CreateAlignedTimeSeriesPlanImpl)
createAlignedTimeSeriesPlan).getAligned().get()) {
+ existingTimeSeriesAndAlignmentMismatch.add(
+ new TSStatus(TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode())
+ .setMessage(PartialPath.transformDataToString(devicePath)));
+ }
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeSeriesException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeSeriesException.java
new file mode 100644
index 00000000000..f159376dfae
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeSeriesException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception.metadata;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class AlignedTimeSeriesException extends MetadataException {
+
+ public AlignedTimeSeriesException(final boolean aligned, final String path) {
+ super(
+ String.format(
+ "TimeSeries under this device is%s aligned, please use
createTimeSeries or change device. (Path: %s)",
+ aligned ? "" : " not", path),
+ TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode(),
+ true);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index f4aca5e157e..f45e71110d5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -662,7 +662,7 @@ public class RegionWriteExecutor {
alreadyExistingStatus.add(
RpcUtils.getStatus(
metadataException.getErrorCode(),
- MeasurementPath.transformDataToString(
+ PartialPath.transformDataToString(
((MeasurementAlreadyExistException)
metadataException)
.getMeasurementPath())));
} else {
@@ -760,7 +760,7 @@ public class RegionWriteExecutor {
alreadyExistingStatus.add(
RpcUtils.getStatus(
metadataException.getErrorCode(),
- MeasurementPath.transformDataToString(
+ PartialPath.transformDataToString(
((MeasurementAlreadyExistException)
metadataException)
.getMeasurementPath())));
} else {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
index 358614c9a58..a6a3301aea0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.AlignedTimeSeriesException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -188,7 +189,7 @@ class AutoCreateSchemaExecutor {
}
if (!devicesNeedAutoCreateTimeSeries.isEmpty()) {
- internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries,
context);
+ internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries,
context, false);
}
}
@@ -425,7 +426,7 @@ class AutoCreateSchemaExecutor {
}
if (!devicesNeedAutoCreateTimeSeries.isEmpty()) {
- internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries,
context);
+ internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries,
context, true);
}
}
@@ -463,11 +464,15 @@ class AutoCreateSchemaExecutor {
List<CompressionType> compressors,
boolean isAligned,
MPPQueryContext context) {
+ final Map<PartialPath, Pair<Boolean, MeasurementGroup>> input =
+ Collections.singletonMap(devicePath, new Pair<>(isAligned, null));
List<MeasurementPath> measurementPathList =
- executeInternalCreateTimeseriesStatement(
+ executeInternalCreateTimeSeriesStatement(
+ input,
new InternalCreateTimeSeriesStatement(
devicePath, measurements, tsDataTypes, encodings, compressors,
isAligned),
- context);
+ context,
+ false);
Set<Integer> alreadyExistingMeasurementIndexSet =
measurementPathList.stream()
@@ -488,13 +493,16 @@ class AutoCreateSchemaExecutor {
null,
null,
null,
- isAligned);
+ input.get(devicePath).getLeft());
}
}
- // Auto create timeseries and return the existing timeseries info
- private List<MeasurementPath> executeInternalCreateTimeseriesStatement(
- final Statement statement, final MPPQueryContext context) {
+ // Auto create timeSeries and return the existing timeSeries info
+ private List<MeasurementPath> executeInternalCreateTimeSeriesStatement(
+ final Map<PartialPath, Pair<Boolean, MeasurementGroup>>
devicesNeedAutoCreateTimeSeries,
+ final Statement statement,
+ final MPPQueryContext context,
+ final boolean isLoad) {
final TSStatus status = AuthorityChecker.checkAuthority(statement,
context);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new IoTDBRuntimeException(status.getMessage(), status.getCode());
@@ -516,7 +524,18 @@ class AutoCreateSchemaExecutor {
for (final TSStatus subStatus : executionResult.status.subStatus) {
if (subStatus.code ==
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
alreadyExistingMeasurements.add(
- MeasurementPath.parseDataFromString(subStatus.getMessage()));
+ (MeasurementPath)
PartialPath.parseDataFromString(subStatus.getMessage()));
+ } else if (subStatus.code ==
TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()) {
+ final PartialPath devicePath =
PartialPath.parseDataFromString(subStatus.getMessage());
+ final Pair<Boolean, MeasurementGroup> pair =
+ devicesNeedAutoCreateTimeSeries.get(devicePath);
+ if (!isLoad) {
+ pair.setLeft(!pair.getLeft());
+ } else {
+ // Load does not tolerate the device alignment mismatch
+ throw new SemanticException(
+ new AlignedTimeSeriesException(!pair.getLeft(),
devicePath.getFullPath()));
+ }
} else {
failedCreationSet.add(subStatus);
}
@@ -583,11 +602,13 @@ class AutoCreateSchemaExecutor {
private void internalCreateTimeSeries(
ClusterSchemaTree schemaTree,
Map<PartialPath, Pair<Boolean, MeasurementGroup>>
devicesNeedAutoCreateTimeSeries,
- MPPQueryContext context) {
+ MPPQueryContext context,
+ final boolean isLoad) {
// Deep copy to avoid changes to the original map
final List<MeasurementPath> measurementPathList =
- executeInternalCreateTimeseriesStatement(
+ executeInternalCreateTimeSeriesStatement(
+ devicesNeedAutoCreateTimeSeries,
new InternalCreateMultiTimeSeriesStatement(
devicesNeedAutoCreateTimeSeries.entrySet().stream()
.collect(
@@ -597,7 +618,8 @@ class AutoCreateSchemaExecutor {
new Pair<>(
entry.getValue().getLeft(),
entry.getValue().getRight().deepCopy())))),
- context);
+ context,
+ isLoad);
schemaTree.appendMeasurementPaths(measurementPathList);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index 3ef9190ab44..9b125f4f111 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -702,7 +702,10 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
plan instanceof CreateTimeSeriesPlanImpl
&& ((CreateTimeSeriesPlanImpl) plan).isWithMerge()
|| plan instanceof CreateTimeSeriesNode
- && ((CreateTimeSeriesNode) plan).isGeneratedByPipe());
+ && ((CreateTimeSeriesNode) plan).isGeneratedByPipe(),
+ plan instanceof CreateTimeSeriesPlanImpl
+ ? ((CreateTimeSeriesPlanImpl) plan).getAligned()
+ : null);
// Should merge
if (Objects.isNull(leafMNode)) {
@@ -786,7 +789,10 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
aliasList,
(plan instanceof CreateAlignedTimeSeriesPlanImpl
&& ((CreateAlignedTimeSeriesPlanImpl) plan).isWithMerge()),
- existingMeasurementIndexes);
+ existingMeasurementIndexes,
+ (plan instanceof CreateAlignedTimeSeriesPlanImpl
+ ? ((CreateAlignedTimeSeriesPlanImpl) plan).getAligned()
+ : null));
// update statistics and schemaDataTypeNumMap
regionStatistics.addMeasurement(measurementMNodeList.size());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
index 944ffe0ea53..a39b881fe1f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
@@ -645,7 +645,10 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
plan.getProps(),
plan.getAlias(),
(plan instanceof CreateTimeSeriesPlanImpl
- && ((CreateTimeSeriesPlanImpl) plan).isWithMerge()));
+ && ((CreateTimeSeriesPlanImpl) plan).isWithMerge()),
+ plan instanceof CreateTimeSeriesPlanImpl
+ ? ((CreateTimeSeriesPlanImpl) plan).getAligned()
+ : null);
try {
// Should merge
@@ -756,7 +759,10 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
aliasList,
(plan instanceof CreateAlignedTimeSeriesPlanImpl
&& ((CreateAlignedTimeSeriesPlanImpl) plan).isWithMerge()),
- existingMeasurementIndexes);
+ existingMeasurementIndexes,
+ (plan instanceof CreateAlignedTimeSeriesPlanImpl
+ ? ((CreateAlignedTimeSeriesPlanImpl) plan).getAligned()
+ : null));
try {
// Update statistics and schemaDataTypeNumMap
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
index 15d51928d64..5931023b3cf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
@@ -238,7 +238,8 @@ public class MTreeBelowSGMemoryImpl {
final CompressionType compressor,
final Map<String, String> props,
final String alias,
- final boolean withMerge)
+ final boolean withMerge,
+ final AtomicBoolean isAligned)
throws MetadataException {
final String[] nodeNames = path.getNodes();
if (nodeNames.length <= 2) {
@@ -253,6 +254,12 @@ public class MTreeBelowSGMemoryImpl {
synchronized (this) {
final IMemMNode device =
checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent);
+ if (device.isDevice()
+ && device.getAsDeviceMNode().isAlignedNullable() != null
+ && isAligned != null) {
+ isAligned.set(device.getAsDeviceMNode().isAligned());
+ }
+
MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), props);
final String leafName = path.getMeasurement();
@@ -328,7 +335,8 @@ public class MTreeBelowSGMemoryImpl {
final List<CompressionType> compressors,
final List<String> aliasList,
final boolean withMerge,
- final Set<Integer> existingMeasurementIndexes)
+ final Set<Integer> existingMeasurementIndexes,
+ final AtomicBoolean isAligned)
throws MetadataException {
final List<IMeasurementMNode<IMemMNode>> measurementMNodeList = new
ArrayList<>();
MetaFormatUtils.checkSchemaMeasurementNames(measurements);
@@ -339,6 +347,12 @@ public class MTreeBelowSGMemoryImpl {
synchronized (this) {
final IMemMNode device =
checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent);
+ if (device.isDevice()
+ && device.getAsDeviceMNode().isAlignedNullable() != null
+ && isAligned != null) {
+ isAligned.set(device.getAsDeviceMNode().isAligned());
+ }
+
for (int i = 0; i < measurements.size(); i++) {
if (device.hasChild(measurements.get(i))) {
final IMemMNode node = device.getChild(measurements.get(i));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
index d5187e14c91..5127cd9e655 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
@@ -266,7 +266,8 @@ public class MTreeBelowSGCachedImpl {
String alias)
throws MetadataException {
IMeasurementMNode<ICachedMNode> measurementMNode =
- createTimeSeriesWithPinnedReturn(path, dataType, encoding, compressor,
props, alias, false);
+ createTimeSeriesWithPinnedReturn(
+ path, dataType, encoding, compressor, props, alias, false, null);
unPinMNode(measurementMNode.getAsMNode());
return measurementMNode;
}
@@ -289,7 +290,8 @@ public class MTreeBelowSGCachedImpl {
final CompressionType compressor,
final Map<String, String> props,
final String alias,
- final boolean withMerge)
+ final boolean withMerge,
+ final AtomicBoolean isAligned)
throws MetadataException {
final String[] nodeNames = path.getNodes();
if (nodeNames.length <= 2) {
@@ -305,6 +307,12 @@ public class MTreeBelowSGCachedImpl {
synchronized (this) {
ICachedMNode device =
checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent);
+ if (device.isDevice()
+ && device.getAsDeviceMNode().isAlignedNullable() != null
+ && isAligned != null) {
+ isAligned.set(device.getAsDeviceMNode().isAligned());
+ }
+
try {
MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), props);
@@ -387,7 +395,8 @@ public class MTreeBelowSGCachedImpl {
final List<CompressionType> compressors,
final List<String> aliasList,
final boolean withMerge,
- final Set<Integer> existingMeasurementIndexes)
+ final Set<Integer> existingMeasurementIndexes,
+ final AtomicBoolean isAligned)
throws MetadataException {
final List<IMeasurementMNode<ICachedMNode>> measurementMNodeList = new
ArrayList<>();
MetaFormatUtils.checkSchemaMeasurementNames(measurements);
@@ -399,6 +408,12 @@ public class MTreeBelowSGCachedImpl {
synchronized (this) {
ICachedMNode device =
checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent);
+ if (device.isDevice()
+ && device.getAsDeviceMNode().isAlignedNullable() != null
+ && isAligned != null) {
+ isAligned.set(device.getAsDeviceMNode().isAligned());
+ }
+
try {
for (int i = 0; i < measurements.size(); i++) {
if (store.hasChild(device, measurements.get(i))) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java
index 4ffa8aca5b2..8a2c4c12e8c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java
@@ -32,6 +32,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
public class CreateAlignedTimeSeriesPlanImpl implements
ICreateAlignedTimeSeriesPlan {
@@ -45,6 +46,7 @@ public class CreateAlignedTimeSeriesPlanImpl implements
ICreateAlignedTimeSeries
private List<Map<String, String>> attributesList;
private List<Long> tagOffsets = null;
private transient boolean withMerge;
+ private final transient AtomicBoolean aligned = new AtomicBoolean(true);
public CreateAlignedTimeSeriesPlanImpl() {}
@@ -194,4 +196,12 @@ public class CreateAlignedTimeSeriesPlanImpl implements
ICreateAlignedTimeSeries
tagOffsets = Objects.nonNull(tagOffsets) ? new ArrayList<>(tagOffsets) :
null;
}
}
+
+ public void setAligned(final boolean aligned) {
+ this.aligned.set(aligned);
+ }
+
+ public AtomicBoolean getAligned() {
+ return aligned;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java
index 6234cc41aff..73b47a4f516 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java
@@ -28,6 +28,7 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
public class CreateTimeSeriesPlanImpl implements ICreateTimeSeriesPlan {
@@ -41,6 +42,7 @@ public class CreateTimeSeriesPlanImpl implements
ICreateTimeSeriesPlan {
private Map<String, String> attributes = null;
private long tagOffset = -1;
private transient boolean withMerge;
+ private final transient AtomicBoolean aligned = new AtomicBoolean(false);
public CreateTimeSeriesPlanImpl() {}
@@ -170,4 +172,12 @@ public class CreateTimeSeriesPlanImpl implements
ICreateTimeSeriesPlan {
public void setWithMerge(final boolean withMerge) {
this.withMerge = withMerge;
}
+
+ public void setAligned(final boolean aligned) {
+ this.aligned.set(aligned);
+ }
+
+ public AtomicBoolean getAligned() {
+ return aligned;
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java
index cf5f8ab57ba..148a3c56bd4 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java
@@ -43,8 +43,8 @@ public class MeasurementPathTest {
new MeasurementPath(
new PartialPath("root.sg.d.s"), new MeasurementSchema("s",
TSDataType.INT32), true);
rawPath.setMeasurementAlias("alias");
- String string = MeasurementPath.transformDataToString(rawPath);
- MeasurementPath newPath = MeasurementPath.parseDataFromString(string);
+ String string = PartialPath.transformDataToString(rawPath);
+ MeasurementPath newPath = (MeasurementPath)
PartialPath.parseDataFromString(string);
Assert.assertEquals(rawPath.getFullPath(), newPath.getFullPath());
Assert.assertEquals(rawPath.getMeasurementAlias(),
newPath.getMeasurementAlias());
Assert.assertEquals(rawPath.getMeasurementSchema(),
newPath.getMeasurementSchema());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
index f74a29f1815..0718366aabf 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
@@ -33,12 +33,9 @@ import
org.apache.tsfile.write.schema.VectorMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
import java.rmi.UnexpectedException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -339,23 +336,6 @@ public class MeasurementPath extends PartialPath {
return getDevicePath().concatNode(getTailNode());
}
- /**
- * In specific scenarios, like internal create timeseries, the message can
only be passed as
- * String format.
- */
- public static String transformDataToString(MeasurementPath measurementPath) {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
- try {
- measurementPath.serialize(dataOutputStream);
- } catch (IOException ignored) {
- // this exception won't happen.
- }
- byte[] bytes = byteArrayOutputStream.toByteArray();
- // must use single-byte char sets
- return new String(bytes, StandardCharsets.ISO_8859_1);
- }
-
@Override
protected IDeviceID toDeviceID(String[] nodes) {
// remove measurement
@@ -363,12 +343,6 @@ public class MeasurementPath extends PartialPath {
return super.toDeviceID(nodes);
}
- public static MeasurementPath parseDataFromString(String
measurementPathData) {
- return (MeasurementPath)
- PathDeserializeUtil.deserialize(
-
ByteBuffer.wrap(measurementPathData.getBytes(StandardCharsets.ISO_8859_1)));
- }
-
@Override
protected PartialPath createPartialPath(String[] newPathNodes) {
return new MeasurementPath(newPathNodes);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index ae35769a1d0..b0f2c3fe051 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -38,9 +38,12 @@ import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -1040,6 +1043,29 @@ public class PartialPath extends Path implements
Comparable<Path>, Cloneable {
return this;
}
+ /**
+ * In specific scenarios, like internal create timeseries, the message can
only be passed as
+ * String format.
+ */
+ public static String transformDataToString(PartialPath partialPath) {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
+ try {
+ partialPath.serialize(dataOutputStream);
+ } catch (IOException ignored) {
+ // this exception won't happen.
+ }
+ byte[] bytes = byteArrayOutputStream.toByteArray();
+ // must use single-byte char sets
+ return new String(bytes, StandardCharsets.ISO_8859_1);
+ }
+
+ public static PartialPath parseDataFromString(String measurementPathData) {
+ return (PartialPath)
+ PathDeserializeUtil.deserialize(
+
ByteBuffer.wrap(measurementPathData.getBytes(StandardCharsets.ISO_8859_1)));
+ }
+
/** Return true if the path ends with ** and no other nodes contain *.
Otherwise, return false. */
public boolean isPrefixPath() {
if (nodes.length <= 0) {