This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch force_ci/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 842c4a5ffc8e987633594cb57e49ebab43cc97a2 Author: Caideyipi <[email protected]> AuthorDate: Thu Nov 20 17:28:25 2025 +0800 Fixed the bugs related to device auto-create alignment ignorance (#16780) * plan-first * refact * refa * fux * fix * ref * partial-fix * fix * may-final * fix * PBTree * fix * gras * err * bugfix (cherry picked from commit 9fd9d7e818a8fd6049b9caf3a417f698070bf429) --- .../it/schema/IoTDBCreateAlignedTimeseriesIT.java | 22 +++++++++++ .../db/it/schema/IoTDBCreateTimeseriesIT.java | 8 ++++ .../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 - .../schemaregion/SchemaExecutionVisitor.java | 44 +++++++++++++-------- .../metadata/AlignedTimeSeriesException.java | 35 ++++++++++++++++ .../execution/executor/RegionWriteExecutor.java | 4 +- .../analyze/schema/AutoCreateSchemaExecutor.java | 46 ++++++++++++++++------ .../schemaregion/impl/SchemaRegionMemoryImpl.java | 10 ++++- .../schemaregion/impl/SchemaRegionPBTreeImpl.java | 10 ++++- .../mtree/impl/mem/MTreeBelowSGMemoryImpl.java | 18 ++++++++- .../mtree/impl/pbtree/MTreeBelowSGCachedImpl.java | 21 ++++++++-- .../req/impl/CreateAlignedTimeSeriesPlanImpl.java | 10 +++++ .../write/req/impl/CreateTimeSeriesPlanImpl.java | 10 +++++ .../db/metadata/path/MeasurementPathTest.java | 4 +- .../apache/iotdb/commons/path/MeasurementPath.java | 26 ------------ .../org/apache/iotdb/commons/path/PartialPath.java | 26 ++++++++++++ 16 files changed, 227 insertions(+), 68 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java index bd470cfdb4d..7b784c034ab 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.it.schema; import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.db.it.utils.TestUtils; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.itbase.category.LocalStandaloneIT; @@ -34,6 +35,9 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Collections; + +import static org.junit.Assert.fail; /** * Notice that, all test begins with "IoTDB" is integration test. All test which will start the @@ -141,4 +145,22 @@ public class IoTDBCreateAlignedTimeseriesIT extends AbstractSchemaIT { } Assert.assertEquals(timeSeriesArray.length, count); } + + @Test + public void testDifferentDeviceAlignment() { + try (Connection connection = EnvFactory.getEnv().getConnection(); + Statement statement = connection.createStatement()) { + // Should ignore the alignment difference + statement.execute("create aligned timeseries root.sg2.d (s2 int64, s3 int64)"); + // Should use the existing alignment + statement.execute("create timeseries root.sg2.d.s1 with datatype=INT64"); + statement.execute("insert into root.sg2.d (time, s4) values (-1, 1)"); + TestUtils.assertResultSetEqual( + statement.executeQuery("select * from root.sg2.d"), + "Time,root.sg2.d.s3,root.sg2.d.s4,root.sg2.d.s1,root.sg2.d.s2,", + Collections.singleton("-1,null,1.0,null,null,")); + } catch (SQLException ignored) { + fail(); + } + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java index ca309274b63..b2b6f1c1078 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.it.schema; import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.db.it.utils.TestUtils; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.itbase.category.LocalStandaloneIT; @@ -35,6 +36,7 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Collections; import java.util.HashSet; import java.util.Set; @@ -308,6 +310,12 @@ public class IoTDBCreateTimeseriesIT extends AbstractSchemaIT { statement.execute("create timeseries root.sg2.d.s1 with datatype=INT64"); // Should ignore the alignment difference statement.execute("create aligned timeseries root.sg2.d (s2 int64, s3 int64)"); + // Should use the existing alignment + statement.execute("insert into root.sg2.d (time, s4) aligned values (-1, 1)"); + TestUtils.assertResultSetEqual( + statement.executeQuery("select * from root.sg2.d"), + "Time,root.sg2.d.s3,root.sg2.d.s4,root.sg2.d.s1,root.sg2.d.s2,", + Collections.singleton("-1,null,1.0,null,null,")); } catch (SQLException ignored) { fail(); } diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 4fd90ca4b94..246a1f66982 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -106,7 +106,6 @@ public enum TSStatusCode { WRITE_PROCESS_REJECT(606), OUT_OF_TTL(607), COMPACTION_ERROR(608), - @Deprecated ALIGNED_TIMESERIES_ERROR(609), WAL_ERROR(610), DISK_SPACE_INSUFFICIENT(611), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java index 8d3c2155838..28ccff2f20d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java @@ -189,7 +189,7 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> final PartialPath devicePath = node.getDevicePath(); final MeasurementGroup measurementGroup = node.getMeasurementGroup(); - final List<TSStatus> alreadyExistingTimeSeries = new ArrayList<>(); + final List<TSStatus> existingTimeSeriesAndAlignmentMismatch = new ArrayList<>(); final List<TSStatus> failingStatus = new ArrayList<>(); if (node.isAligned()) { @@ -197,7 +197,7 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> devicePath, measurementGroup, schemaRegion, - alreadyExistingTimeSeries, + existingTimeSeriesAndAlignmentMismatch, failingStatus, node.isGeneratedByPipe()); } else { @@ -205,7 +205,7 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> devicePath, measurementGroup, schemaRegion, - alreadyExistingTimeSeries, + existingTimeSeriesAndAlignmentMismatch, failingStatus, node.isGeneratedByPipe()); } @@ -214,8 +214,8 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> return RpcUtils.getStatus(failingStatus); } - if (!alreadyExistingTimeSeries.isEmpty()) { - return RpcUtils.getStatus(alreadyExistingTimeSeries); + if (!existingTimeSeriesAndAlignmentMismatch.isEmpty()) { + return RpcUtils.getStatus(existingTimeSeriesAndAlignmentMismatch); } return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully"); @@ -227,7 +227,7 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> PartialPath devicePath; MeasurementGroup measurementGroup; - final List<TSStatus> alreadyExistingTimeSeries = new ArrayList<>(); + final List<TSStatus> existingTimeSeriesAndAlignmentMismatch = new ArrayList<>(); final List<TSStatus> failingStatus = new ArrayList<>(); for (final Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>> deviceEntry : @@ -239,7 +239,7 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> devicePath, measurementGroup, schemaRegion, - alreadyExistingTimeSeries, + existingTimeSeriesAndAlignmentMismatch, failingStatus, node.isGeneratedByPipe()); } else { @@ -247,7 +247,7 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> devicePath, measurementGroup, schemaRegion, - alreadyExistingTimeSeries, + existingTimeSeriesAndAlignmentMismatch, failingStatus, node.isGeneratedByPipe()); } @@ -257,8 +257,8 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> return RpcUtils.getStatus(failingStatus); } - if (!alreadyExistingTimeSeries.isEmpty()) { - return RpcUtils.getStatus(alreadyExistingTimeSeries); + if (!existingTimeSeriesAndAlignmentMismatch.isEmpty()) { + return RpcUtils.getStatus(existingTimeSeriesAndAlignmentMismatch); } return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully"); @@ -268,11 +268,12 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> final PartialPath devicePath, final MeasurementGroup measurementGroup, final ISchemaRegion schemaRegion, - final List<TSStatus> alreadyExistingTimeSeries, + final List<TSStatus> existingTimeSeriesAndAlignmentMismatch, final List<TSStatus> failingStatus, final boolean withMerge) { final int size = measurementGroup.getMeasurements().size(); // todo implement batch creation of one device in SchemaRegion + boolean alignedIsSet = false; for (int i = 0; i < size; i++) { try { final ICreateTimeSeriesPlan createTimeSeriesPlan = @@ -283,11 +284,17 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> // Thus the original ones are not altered ((CreateTimeSeriesPlanImpl) createTimeSeriesPlan).setWithMerge(withMerge); schemaRegion.createTimeSeries(createTimeSeriesPlan, -1); + if (((CreateTimeSeriesPlanImpl) createTimeSeriesPlan).getAligned().get() && !alignedIsSet) { + existingTimeSeriesAndAlignmentMismatch.add( + new TSStatus(TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()) + .setMessage(PartialPath.transformDataToString(devicePath))); + alignedIsSet = true; + } } catch (final MeasurementAlreadyExistException e) { // There's no need to internal create time series. - alreadyExistingTimeSeries.add( + existingTimeSeriesAndAlignmentMismatch.add( RpcUtils.getStatus( - e.getErrorCode(), MeasurementPath.transformDataToString(e.getMeasurementPath()))); + e.getErrorCode(), PartialPath.transformDataToString(e.getMeasurementPath()))); } catch (final MetadataException e) { logger.warn("{}: MetaData error: ", e.getMessage(), e); failingStatus.add(RpcUtils.getStatus(e.getErrorCode(), e.getMessage())); @@ -299,7 +306,7 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> final PartialPath devicePath, final MeasurementGroup measurementGroup, final ISchemaRegion schemaRegion, - final List<TSStatus> alreadyExistingTimeSeries, + final List<TSStatus> existingTimeSeriesAndAlignmentMismatch, final List<TSStatus> failingStatus, final boolean withMerge) { final List<String> measurementList = measurementGroup.getMeasurements(); @@ -336,9 +343,9 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> // The existence check will be executed before truly creation // There's no need to internal create time series. final MeasurementPath measurementPath = e.getMeasurementPath(); - alreadyExistingTimeSeries.add( + existingTimeSeriesAndAlignmentMismatch.add( RpcUtils.getStatus( - e.getErrorCode(), MeasurementPath.transformDataToString(e.getMeasurementPath()))); + e.getErrorCode(), PartialPath.transformDataToString(e.getMeasurementPath()))); // remove the existing time series from plan final int index = measurementList.indexOf(measurementPath.getMeasurement()); @@ -389,6 +396,11 @@ public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> shouldRetry = false; } } + if (!((CreateAlignedTimeSeriesPlanImpl) createAlignedTimeSeriesPlan).getAligned().get()) { + existingTimeSeriesAndAlignmentMismatch.add( + new TSStatus(TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()) + .setMessage(PartialPath.transformDataToString(devicePath))); + } } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeSeriesException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeSeriesException.java new file mode 100644 index 00000000000..f159376dfae --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/metadata/AlignedTimeSeriesException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.exception.metadata; + +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.rpc.TSStatusCode; + +public class AlignedTimeSeriesException extends MetadataException { + + public AlignedTimeSeriesException(final boolean aligned, final String path) { + super( + String.format( + "TimeSeries under this device is%s aligned, please use createTimeSeries or change device. (Path: %s)", + aligned ? "" : " not", path), + TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode(), + true); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java index 63af0423d1a..b5a831b90b2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java @@ -676,7 +676,7 @@ public class RegionWriteExecutor { alreadyExistingStatus.add( RpcUtils.getStatus( metadataException.getErrorCode(), - MeasurementPath.transformDataToString( + PartialPath.transformDataToString( ((MeasurementAlreadyExistException) metadataException) .getMeasurementPath()))); } else { @@ -774,7 +774,7 @@ public class RegionWriteExecutor { alreadyExistingStatus.add( RpcUtils.getStatus( metadataException.getErrorCode(), - MeasurementPath.transformDataToString( + PartialPath.transformDataToString( ((MeasurementAlreadyExistException) metadataException) .getMeasurementPath()))); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java index 358614c9a58..a6a3301aea0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java @@ -29,6 +29,7 @@ import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.metadata.AlignedTimeSeriesException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; @@ -188,7 +189,7 @@ class AutoCreateSchemaExecutor { } if (!devicesNeedAutoCreateTimeSeries.isEmpty()) { - internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries, context); + internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries, context, false); } } @@ -425,7 +426,7 @@ class AutoCreateSchemaExecutor { } if (!devicesNeedAutoCreateTimeSeries.isEmpty()) { - internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries, context); + internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries, context, true); } } @@ -463,11 +464,15 @@ class AutoCreateSchemaExecutor { List<CompressionType> compressors, boolean isAligned, MPPQueryContext context) { + final Map<PartialPath, Pair<Boolean, MeasurementGroup>> input = + Collections.singletonMap(devicePath, new Pair<>(isAligned, null)); List<MeasurementPath> measurementPathList = - executeInternalCreateTimeseriesStatement( + executeInternalCreateTimeSeriesStatement( + input, new InternalCreateTimeSeriesStatement( devicePath, measurements, tsDataTypes, encodings, compressors, isAligned), - context); + context, + false); Set<Integer> alreadyExistingMeasurementIndexSet = measurementPathList.stream() @@ -488,13 +493,16 @@ class AutoCreateSchemaExecutor { null, null, null, - isAligned); + input.get(devicePath).getLeft()); } } - // Auto create timeseries and return the existing timeseries info - private List<MeasurementPath> executeInternalCreateTimeseriesStatement( - final Statement statement, final MPPQueryContext context) { + // Auto create timeSeries and return the existing timeSeries info + private List<MeasurementPath> executeInternalCreateTimeSeriesStatement( + final Map<PartialPath, Pair<Boolean, MeasurementGroup>> devicesNeedAutoCreateTimeSeries, + final Statement statement, + final MPPQueryContext context, + final boolean isLoad) { final TSStatus status = AuthorityChecker.checkAuthority(statement, context); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { throw new IoTDBRuntimeException(status.getMessage(), status.getCode()); @@ -516,7 +524,18 @@ class AutoCreateSchemaExecutor { for (final TSStatus subStatus : executionResult.status.subStatus) { if (subStatus.code == TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) { alreadyExistingMeasurements.add( - MeasurementPath.parseDataFromString(subStatus.getMessage())); + (MeasurementPath) PartialPath.parseDataFromString(subStatus.getMessage())); + } else if (subStatus.code == TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()) { + final PartialPath devicePath = PartialPath.parseDataFromString(subStatus.getMessage()); + final Pair<Boolean, MeasurementGroup> pair = + devicesNeedAutoCreateTimeSeries.get(devicePath); + if (!isLoad) { + pair.setLeft(!pair.getLeft()); + } else { + // Load does not tolerate the device alignment mismatch + throw new SemanticException( + new AlignedTimeSeriesException(!pair.getLeft(), devicePath.getFullPath())); + } } else { failedCreationSet.add(subStatus); } @@ -583,11 +602,13 @@ class AutoCreateSchemaExecutor { private void internalCreateTimeSeries( ClusterSchemaTree schemaTree, Map<PartialPath, Pair<Boolean, MeasurementGroup>> devicesNeedAutoCreateTimeSeries, - MPPQueryContext context) { + MPPQueryContext context, + final boolean isLoad) { // Deep copy to avoid changes to the original map final List<MeasurementPath> measurementPathList = - executeInternalCreateTimeseriesStatement( + executeInternalCreateTimeSeriesStatement( + devicesNeedAutoCreateTimeSeries, new InternalCreateMultiTimeSeriesStatement( devicesNeedAutoCreateTimeSeries.entrySet().stream() .collect( @@ -597,7 +618,8 @@ class AutoCreateSchemaExecutor { new Pair<>( entry.getValue().getLeft(), entry.getValue().getRight().deepCopy())))), - context); + context, + isLoad); schemaTree.appendMeasurementPaths(measurementPathList); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java index 39be89f0938..5f6486f8be4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java @@ -702,7 +702,10 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { plan instanceof CreateTimeSeriesPlanImpl && ((CreateTimeSeriesPlanImpl) plan).isWithMerge() || plan instanceof CreateTimeSeriesNode - && ((CreateTimeSeriesNode) plan).isGeneratedByPipe()); + && ((CreateTimeSeriesNode) plan).isGeneratedByPipe(), + plan instanceof CreateTimeSeriesPlanImpl + ? ((CreateTimeSeriesPlanImpl) plan).getAligned() + : null); // Should merge if (Objects.isNull(leafMNode)) { @@ -786,7 +789,10 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { aliasList, (plan instanceof CreateAlignedTimeSeriesPlanImpl && ((CreateAlignedTimeSeriesPlanImpl) plan).isWithMerge()), - existingMeasurementIndexes); + existingMeasurementIndexes, + (plan instanceof CreateAlignedTimeSeriesPlanImpl + ? ((CreateAlignedTimeSeriesPlanImpl) plan).getAligned() + : null)); // update statistics and schemaDataTypeNumMap regionStatistics.addMeasurement(measurementMNodeList.size()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java index 944ffe0ea53..a39b881fe1f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java @@ -645,7 +645,10 @@ public class SchemaRegionPBTreeImpl implements ISchemaRegion { plan.getProps(), plan.getAlias(), (plan instanceof CreateTimeSeriesPlanImpl - && ((CreateTimeSeriesPlanImpl) plan).isWithMerge())); + && ((CreateTimeSeriesPlanImpl) plan).isWithMerge()), + plan instanceof CreateTimeSeriesPlanImpl + ? ((CreateTimeSeriesPlanImpl) plan).getAligned() + : null); try { // Should merge @@ -756,7 +759,10 @@ public class SchemaRegionPBTreeImpl implements ISchemaRegion { aliasList, (plan instanceof CreateAlignedTimeSeriesPlanImpl && ((CreateAlignedTimeSeriesPlanImpl) plan).isWithMerge()), - existingMeasurementIndexes); + existingMeasurementIndexes, + (plan instanceof CreateAlignedTimeSeriesPlanImpl + ? ((CreateAlignedTimeSeriesPlanImpl) plan).getAligned() + : null)); try { // Update statistics and schemaDataTypeNumMap diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java index 15d51928d64..5931023b3cf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java @@ -238,7 +238,8 @@ public class MTreeBelowSGMemoryImpl { final CompressionType compressor, final Map<String, String> props, final String alias, - final boolean withMerge) + final boolean withMerge, + final AtomicBoolean isAligned) throws MetadataException { final String[] nodeNames = path.getNodes(); if (nodeNames.length <= 2) { @@ -253,6 +254,12 @@ public class MTreeBelowSGMemoryImpl { synchronized (this) { final IMemMNode device = checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent); + if (device.isDevice() + && device.getAsDeviceMNode().isAlignedNullable() != null + && isAligned != null) { + isAligned.set(device.getAsDeviceMNode().isAligned()); + } + MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), props); final String leafName = path.getMeasurement(); @@ -328,7 +335,8 @@ public class MTreeBelowSGMemoryImpl { final List<CompressionType> compressors, final List<String> aliasList, final boolean withMerge, - final Set<Integer> existingMeasurementIndexes) + final Set<Integer> existingMeasurementIndexes, + final AtomicBoolean isAligned) throws MetadataException { final List<IMeasurementMNode<IMemMNode>> measurementMNodeList = new ArrayList<>(); MetaFormatUtils.checkSchemaMeasurementNames(measurements); @@ -339,6 +347,12 @@ public class MTreeBelowSGMemoryImpl { synchronized (this) { final IMemMNode device = checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent); + if (device.isDevice() + && device.getAsDeviceMNode().isAlignedNullable() != null + && isAligned != null) { + isAligned.set(device.getAsDeviceMNode().isAligned()); + } + for (int i = 0; i < measurements.size(); i++) { if (device.hasChild(measurements.get(i))) { final IMemMNode node = device.getChild(measurements.get(i)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java index d5187e14c91..5127cd9e655 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java @@ -266,7 +266,8 @@ public class MTreeBelowSGCachedImpl { String alias) throws MetadataException { IMeasurementMNode<ICachedMNode> measurementMNode = - createTimeSeriesWithPinnedReturn(path, dataType, encoding, compressor, props, alias, false); + createTimeSeriesWithPinnedReturn( + path, dataType, encoding, compressor, props, alias, false, null); unPinMNode(measurementMNode.getAsMNode()); return measurementMNode; } @@ -289,7 +290,8 @@ public class MTreeBelowSGCachedImpl { final CompressionType compressor, final Map<String, String> props, final String alias, - final boolean withMerge) + final boolean withMerge, + final AtomicBoolean isAligned) throws MetadataException { final String[] nodeNames = path.getNodes(); if (nodeNames.length <= 2) { @@ -305,6 +307,12 @@ public class MTreeBelowSGCachedImpl { synchronized (this) { ICachedMNode device = checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent); + if (device.isDevice() + && device.getAsDeviceMNode().isAlignedNullable() != null + && isAligned != null) { + isAligned.set(device.getAsDeviceMNode().isAligned()); + } + try { MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), props); @@ -387,7 +395,8 @@ public class MTreeBelowSGCachedImpl { final List<CompressionType> compressors, final List<String> aliasList, final boolean withMerge, - final Set<Integer> existingMeasurementIndexes) + final Set<Integer> existingMeasurementIndexes, + final AtomicBoolean isAligned) throws MetadataException { final List<IMeasurementMNode<ICachedMNode>> measurementMNodeList = new ArrayList<>(); MetaFormatUtils.checkSchemaMeasurementNames(measurements); @@ -399,6 +408,12 @@ public class MTreeBelowSGCachedImpl { synchronized (this) { ICachedMNode device = checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent); + if (device.isDevice() + && device.getAsDeviceMNode().isAlignedNullable() != null + && isAligned != null) { + isAligned.set(device.getAsDeviceMNode().isAligned()); + } + try { for (int i = 0; i < measurements.size(); i++) { if (store.hasChild(device, measurements.get(i))) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java index 4ffa8aca5b2..8a2c4c12e8c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java @@ -32,6 +32,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; public class CreateAlignedTimeSeriesPlanImpl implements ICreateAlignedTimeSeriesPlan { @@ -45,6 +46,7 @@ public class CreateAlignedTimeSeriesPlanImpl implements ICreateAlignedTimeSeries private List<Map<String, String>> attributesList; private List<Long> tagOffsets = null; private transient boolean withMerge; + private final transient AtomicBoolean aligned = new AtomicBoolean(true); public CreateAlignedTimeSeriesPlanImpl() {} @@ -194,4 +196,12 @@ public class CreateAlignedTimeSeriesPlanImpl implements ICreateAlignedTimeSeries tagOffsets = Objects.nonNull(tagOffsets) ? new ArrayList<>(tagOffsets) : null; } } + + public void setAligned(final boolean aligned) { + this.aligned.set(aligned); + } + + public AtomicBoolean getAligned() { + return aligned; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java index 6234cc41aff..73b47a4f516 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java @@ -28,6 +28,7 @@ import org.apache.tsfile.write.schema.MeasurementSchema; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; public class CreateTimeSeriesPlanImpl implements ICreateTimeSeriesPlan { @@ -41,6 +42,7 @@ public class CreateTimeSeriesPlanImpl implements ICreateTimeSeriesPlan { private Map<String, String> attributes = null; private long tagOffset = -1; private transient boolean withMerge; + private final transient AtomicBoolean aligned = new AtomicBoolean(false); public CreateTimeSeriesPlanImpl() {} @@ -170,4 +172,12 @@ public class CreateTimeSeriesPlanImpl implements ICreateTimeSeriesPlan { public void setWithMerge(final boolean withMerge) { this.withMerge = withMerge; } + + public void setAligned(final boolean aligned) { + this.aligned.set(aligned); + } + + public AtomicBoolean getAligned() { + return aligned; + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java index cf5f8ab57ba..148a3c56bd4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java @@ -43,8 +43,8 @@ public class MeasurementPathTest { new MeasurementPath( new PartialPath("root.sg.d.s"), new MeasurementSchema("s", TSDataType.INT32), true); rawPath.setMeasurementAlias("alias"); - String string = MeasurementPath.transformDataToString(rawPath); - MeasurementPath newPath = MeasurementPath.parseDataFromString(string); + String string = PartialPath.transformDataToString(rawPath); + MeasurementPath newPath = (MeasurementPath) PartialPath.parseDataFromString(string); Assert.assertEquals(rawPath.getFullPath(), newPath.getFullPath()); Assert.assertEquals(rawPath.getMeasurementAlias(), newPath.getMeasurementAlias()); Assert.assertEquals(rawPath.getMeasurementSchema(), newPath.getMeasurementSchema()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java index f74a29f1815..0718366aabf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java @@ -33,12 +33,9 @@ import org.apache.tsfile.write.schema.VectorMeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.rmi.UnexpectedException; import java.util.ArrayList; import java.util.Arrays; @@ -339,23 +336,6 @@ public class MeasurementPath extends PartialPath { return getDevicePath().concatNode(getTailNode()); } - /** - * In specific scenarios, like internal create timeseries, the message can only be passed as - * String format. - */ - public static String transformDataToString(MeasurementPath measurementPath) { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); - try { - measurementPath.serialize(dataOutputStream); - } catch (IOException ignored) { - // this exception won't happen. - } - byte[] bytes = byteArrayOutputStream.toByteArray(); - // must use single-byte char sets - return new String(bytes, StandardCharsets.ISO_8859_1); - } - @Override protected IDeviceID toDeviceID(String[] nodes) { // remove measurement @@ -363,12 +343,6 @@ public class MeasurementPath extends PartialPath { return super.toDeviceID(nodes); } - public static MeasurementPath parseDataFromString(String measurementPathData) { - return (MeasurementPath) - PathDeserializeUtil.deserialize( - ByteBuffer.wrap(measurementPathData.getBytes(StandardCharsets.ISO_8859_1))); - } - @Override protected PartialPath createPartialPath(String[] newPathNodes) { return new MeasurementPath(newPathNodes); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java index ae35769a1d0..b0f2c3fe051 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java @@ -38,9 +38,12 @@ import org.apache.tsfile.write.schema.IMeasurementSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -1040,6 +1043,29 @@ public class PartialPath extends Path implements Comparable<Path>, Cloneable { return this; } + /** + * In specific scenarios, like internal create timeseries, the message can only be passed as + * String format. + */ + public static String transformDataToString(PartialPath partialPath) { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); + try { + partialPath.serialize(dataOutputStream); + } catch (IOException ignored) { + // this exception won't happen. + } + byte[] bytes = byteArrayOutputStream.toByteArray(); + // must use single-byte char sets + return new String(bytes, StandardCharsets.ISO_8859_1); + } + + public static PartialPath parseDataFromString(String measurementPathData) { + return (PartialPath) + PathDeserializeUtil.deserialize( + ByteBuffer.wrap(measurementPathData.getBytes(StandardCharsets.ISO_8859_1))); + } + /** Return true if the path ends with ** and no other nodes contain *. Otherwise, return false. */ public boolean isPrefixPath() { if (nodes.length <= 0) {
