This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 90331aefdc7 [To dev/1.3] Fixed the bugs related to device auto-create
alignment ignorance (#16780) (#16782)
90331aefdc7 is described below
commit 90331aefdc767055a7b20e2ee7f0cdfaf70920b5
Author: Caideyipi <[email protected]>
AuthorDate: Mon Nov 24 15:08:55 2025 +0800
[To dev/1.3] Fixed the bugs related to device auto-create alignment
ignorance (#16780) (#16782)
* Fixed the bugs related to device auto-create alignment ignorance (#16780)
* plan-first
* refact
* refa
* fux
* fix
* ref
* partial-fix
* fix
* may-final
* fix
* PBTree
* fix
* gras
* err
* bugfix
(cherry picked from commit 9fd9d7e818a8fd6049b9caf3a417f698070bf429)
* fix
* Fixed the bug related to "Fixed the bugs related to device auto-create
alignment ignorance" (#16781)
* gsa
* ff
* ff
* fix
* fix
* fix
* f9ix
---
.../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java | 102 +++++++++++++++++++++
.../it/schema/IoTDBCreateAlignedTimeseriesIT.java | 22 +++++
.../db/it/schema/IoTDBCreateTimeseriesIT.java | 8 ++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 -
.../schemaregion/SchemaExecutionVisitor.java | 44 +++++----
.../execution/executor/RegionWriteExecutor.java | 4 +-
.../plan/analyze/load/LoadTsFileAnalyzer.java | 3 +
.../analyze/schema/AutoCreateSchemaExecutor.java | 52 ++++++++---
.../schemaregion/impl/SchemaRegionMemoryImpl.java | 10 +-
.../schemaregion/impl/SchemaRegionPBTreeImpl.java | 10 +-
.../mtree/impl/mem/MTreeBelowSGMemoryImpl.java | 18 +++-
.../mtree/impl/pbtree/MTreeBelowSGCachedImpl.java | 21 ++++-
.../req/impl/CreateAlignedTimeSeriesPlanImpl.java | 10 ++
.../write/req/impl/CreateTimeSeriesPlanImpl.java | 10 ++
.../db/metadata/path/MeasurementPathTest.java | 4 +-
.../apache/iotdb/commons/path/MeasurementPath.java | 26 ------
.../org/apache/iotdb/commons/path/PartialPath.java | 26 ++++++
17 files changed, 303 insertions(+), 68 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
index d8a51b53501..97e05f8768c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
@@ -262,6 +262,108 @@ public class IoTDBLoadTsFileIT {
}
}
+ @Test
+ // Shall succeed with tablet conversion
+ public void testLoadWithAlignmentMismatch() throws Exception {
+ registerSchema();
+
+ final long writtenPoint1;
+ // device 0, sg 0
+ try (final TsFileGenerator generator =
+ new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
+ // Wrong, with 04-07 non-exist
+ generator.registerAlignedTimeseries(
+ SchemaConfig.DEVICE_0,
+ Arrays.asList(
+ SchemaConfig.MEASUREMENT_00,
+ SchemaConfig.MEASUREMENT_01,
+ SchemaConfig.MEASUREMENT_02,
+ SchemaConfig.MEASUREMENT_03,
+ SchemaConfig.MEASUREMENT_04,
+ SchemaConfig.MEASUREMENT_05,
+ SchemaConfig.MEASUREMENT_06,
+ SchemaConfig.MEASUREMENT_07));
+ generator.generateData(SchemaConfig.DEVICE_0, 100000, PARTITION_INTERVAL
/ 10_000, true);
+ writtenPoint1 = generator.getTotalNumber();
+ }
+
+ final long writtenPoint2;
+ // device 2, device 3, device4, sg 1
+ try (final TsFileGenerator generator =
+ new TsFileGenerator(new File(tmpDir, "2-0-0-0.tsfile"))) {
+ // right
+ generator.registerTimeseries(
+ SchemaConfig.DEVICE_2,
Collections.singletonList(SchemaConfig.MEASUREMENT_20));
+ // right
+ generator.registerTimeseries(
+ SchemaConfig.DEVICE_3,
Collections.singletonList(SchemaConfig.MEASUREMENT_30));
+ // Wrong, with 06 non-exist
+ generator.registerTimeseries(
+ SchemaConfig.DEVICE_4,
+ Arrays.asList(SchemaConfig.MEASUREMENT_40,
SchemaConfig.MEASUREMENT_06));
+ generator.generateData(SchemaConfig.DEVICE_2, 10000, PARTITION_INTERVAL
/ 10_000, false);
+ generator.generateData(SchemaConfig.DEVICE_3, 10000, PARTITION_INTERVAL
/ 10_000, false);
+ generator.generateData(SchemaConfig.DEVICE_4, 10000, PARTITION_INTERVAL
/ 10_000, false);
+ for (int i = 0; i < 1000; i++) {
+ generator.generateData(SchemaConfig.DEVICE_4, 1, PARTITION_INTERVAL -
10, false);
+ }
+ writtenPoint2 = generator.getTotalNumber();
+ }
+
+ try (final Connection connection = EnvFactory.getEnv().getConnection();
+ final Statement statement = connection.createStatement()) {
+
+ try {
+ statement.execute(
+ String.format(
+ "load \"%s\" with ('database-level'='2',
'convert-on-type-mismatch'='false')",
+ tmpDir.getAbsolutePath() + File.separator + "1-0-0-0.tsfile"));
+ Assert.fail();
+ } catch (final Exception e) {
+ Assert.assertTrue(
+ e.getMessage()
+ .contains(
+ "TimeSeries under this device is not aligned, please use
createTimeSeries or change device. (Path: root.sg.test_0.d_0)."));
+ }
+
+ try {
+ statement.execute(
+ String.format(
+ "load \"%s\" with ('database-level'='2',
'convert-on-type-mismatch'='false')",
+ tmpDir.getAbsolutePath() + File.separator + "2-0-0-0.tsfile"));
+ Assert.fail();
+ } catch (final Exception e) {
+ Assert.assertTrue(
+ e.getMessage()
+ .contains(
+ "TimeSeries under this device is aligned, please use
createAlignedTimeSeries or change device. (Path: root.sg.test_1.a_4)."));
+ }
+
+ statement.execute(String.format("load \"%s\" sglevel=2",
tmpDir.getAbsolutePath()));
+
+ try (final ResultSet resultSet =
+ statement.executeQuery("select count(*) from root.sg.** group by
level=1,2")) {
+ if (resultSet.next()) {
+ long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
+ Assert.assertEquals(writtenPoint1, sg1Count);
+ long sg2Count = resultSet.getLong("count(root.sg.test_1.*.*)");
+ Assert.assertEquals(writtenPoint2, sg2Count);
+ } else {
+ Assert.fail("This ResultSet is empty.");
+ }
+ }
+ }
+
+ // Try to delete after loading. Expect no deadlock
+ try (final Connection connection = EnvFactory.getEnv().getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "delete timeseries %s.%s",
+ SchemaConfig.DEVICE_0,
SchemaConfig.MEASUREMENT_00.getMeasurementId()));
+ }
+ }
+
@Test
public void testLoadAcrossMultipleTimePartitions() throws Exception {
registerSchema();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java
index 6a00ece6783..23e2b5b99d2 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateAlignedTimeseriesIT.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.it.schema;
+import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.itbase.category.ClusterIT;
@@ -34,6 +35,9 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collections;
+
+import static org.junit.Assert.fail;
/**
* Notice that, all test begins with "IoTDB" is integration test. All test
which will start the
@@ -141,4 +145,22 @@ public class IoTDBCreateAlignedTimeseriesIT extends
AbstractSchemaIT {
}
Assert.assertEquals(timeSeriesArray.length, count);
}
+
+ @Test
+ public void testDifferentDeviceAlignment() {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ // Should ignore the alignment difference
+ statement.execute("create aligned timeseries root.sg2.d (s2 int64, s3
int64)");
+ // Should use the existing alignment
+ statement.execute("create timeseries root.sg2.d.s1 with datatype=INT64");
+ statement.execute("insert into root.sg2.d (time, s4) values (-1, 1)");
+ TestUtils.assertResultSetEqual(
+ statement.executeQuery("select * from root.sg2.d"),
+ "Time,root.sg2.d.s3,root.sg2.d.s4,root.sg2.d.s1,root.sg2.d.s2,",
+ Collections.singleton("-1,null,1.0,null,null,"));
+ } catch (SQLException ignored) {
+ fail();
+ }
+ }
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java
index 9aa0d0a132b..7d550f40214 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/schema/IoTDBCreateTimeseriesIT.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.it.schema;
+import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.itbase.category.ClusterIT;
@@ -35,6 +36,7 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -308,6 +310,12 @@ public class IoTDBCreateTimeseriesIT extends
AbstractSchemaIT {
statement.execute("create timeseries root.sg2.d.s1 with datatype=INT64");
// Should ignore the alignment difference
statement.execute("create aligned timeseries root.sg2.d (s2 int64, s3
int64)");
+ // Should use the existing alignment
+ statement.execute("insert into root.sg2.d (time, s4) aligned values (-1,
1)");
+ TestUtils.assertResultSetEqual(
+ statement.executeQuery("select * from root.sg2.d"),
+ "Time,root.sg2.d.s3,root.sg2.d.s4,root.sg2.d.s1,root.sg2.d.s2,",
+ Collections.singleton("-1,null,1.0,null,null,"));
} catch (SQLException ignored) {
fail();
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index b3939048bd6..deb486ae466 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -93,7 +93,6 @@ public enum TSStatusCode {
WRITE_PROCESS_REJECT(606),
OUT_OF_TTL(607),
COMPACTION_ERROR(608),
- @Deprecated
ALIGNED_TIMESERIES_ERROR(609),
WAL_ERROR(610),
DISK_SPACE_INSUFFICIENT(611),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
index 9698c2a463a..b392ac0f479 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
@@ -179,7 +179,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
final PartialPath devicePath = node.getDevicePath();
final MeasurementGroup measurementGroup = node.getMeasurementGroup();
- final List<TSStatus> alreadyExistingTimeSeries = new ArrayList<>();
+ final List<TSStatus> existingTimeSeriesAndAlignmentMismatch = new
ArrayList<>();
final List<TSStatus> failingStatus = new ArrayList<>();
if (node.isAligned()) {
@@ -187,7 +187,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
devicePath,
measurementGroup,
schemaRegion,
- alreadyExistingTimeSeries,
+ existingTimeSeriesAndAlignmentMismatch,
failingStatus,
node.isGeneratedByPipe());
} else {
@@ -195,7 +195,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
devicePath,
measurementGroup,
schemaRegion,
- alreadyExistingTimeSeries,
+ existingTimeSeriesAndAlignmentMismatch,
failingStatus,
node.isGeneratedByPipe());
}
@@ -204,8 +204,8 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
return RpcUtils.getStatus(failingStatus);
}
- if (!alreadyExistingTimeSeries.isEmpty()) {
- return RpcUtils.getStatus(alreadyExistingTimeSeries);
+ if (!existingTimeSeriesAndAlignmentMismatch.isEmpty()) {
+ return RpcUtils.getStatus(existingTimeSeriesAndAlignmentMismatch);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute
successfully");
@@ -217,7 +217,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
PartialPath devicePath;
MeasurementGroup measurementGroup;
- final List<TSStatus> alreadyExistingTimeSeries = new ArrayList<>();
+ final List<TSStatus> existingTimeSeriesAndAlignmentMismatch = new
ArrayList<>();
final List<TSStatus> failingStatus = new ArrayList<>();
for (final Map.Entry<PartialPath, Pair<Boolean, MeasurementGroup>>
deviceEntry :
@@ -229,7 +229,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
devicePath,
measurementGroup,
schemaRegion,
- alreadyExistingTimeSeries,
+ existingTimeSeriesAndAlignmentMismatch,
failingStatus,
node.isGeneratedByPipe());
} else {
@@ -237,7 +237,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
devicePath,
measurementGroup,
schemaRegion,
- alreadyExistingTimeSeries,
+ existingTimeSeriesAndAlignmentMismatch,
failingStatus,
node.isGeneratedByPipe());
}
@@ -247,8 +247,8 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
return RpcUtils.getStatus(failingStatus);
}
- if (!alreadyExistingTimeSeries.isEmpty()) {
- return RpcUtils.getStatus(alreadyExistingTimeSeries);
+ if (!existingTimeSeriesAndAlignmentMismatch.isEmpty()) {
+ return RpcUtils.getStatus(existingTimeSeriesAndAlignmentMismatch);
}
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute
successfully");
@@ -258,11 +258,12 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
final PartialPath devicePath,
final MeasurementGroup measurementGroup,
final ISchemaRegion schemaRegion,
- final List<TSStatus> alreadyExistingTimeSeries,
+ final List<TSStatus> existingTimeSeriesAndAlignmentMismatch,
final List<TSStatus> failingStatus,
final boolean withMerge) {
final int size = measurementGroup.getMeasurements().size();
// todo implement batch creation of one device in SchemaRegion
+ boolean alignedIsSet = false;
for (int i = 0; i < size; i++) {
try {
final ICreateTimeSeriesPlan createTimeSeriesPlan =
@@ -273,11 +274,17 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
// Thus the original ones are not altered
((CreateTimeSeriesPlanImpl)
createTimeSeriesPlan).setWithMerge(withMerge);
schemaRegion.createTimeSeries(createTimeSeriesPlan, -1);
+ if (((CreateTimeSeriesPlanImpl)
createTimeSeriesPlan).getAligned().get() && !alignedIsSet) {
+ existingTimeSeriesAndAlignmentMismatch.add(
+ new
TSStatus(TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode())
+ .setMessage(PartialPath.transformDataToString(devicePath)));
+ alignedIsSet = true;
+ }
} catch (final MeasurementAlreadyExistException e) {
// There's no need to internal create time series.
- alreadyExistingTimeSeries.add(
+ existingTimeSeriesAndAlignmentMismatch.add(
RpcUtils.getStatus(
- e.getErrorCode(),
MeasurementPath.transformDataToString(e.getMeasurementPath())));
+ e.getErrorCode(),
PartialPath.transformDataToString(e.getMeasurementPath())));
} catch (final MetadataException e) {
logger.warn("{}: MetaData error: ", e.getMessage(), e);
failingStatus.add(RpcUtils.getStatus(e.getErrorCode(),
e.getMessage()));
@@ -289,7 +296,7 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
final PartialPath devicePath,
final MeasurementGroup measurementGroup,
final ISchemaRegion schemaRegion,
- final List<TSStatus> alreadyExistingTimeSeries,
+ final List<TSStatus> existingTimeSeriesAndAlignmentMismatch,
final List<TSStatus> failingStatus,
final boolean withMerge) {
final List<String> measurementList = measurementGroup.getMeasurements();
@@ -326,9 +333,9 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
// The existence check will be executed before truly creation
// There's no need to internal create time series.
final MeasurementPath measurementPath = e.getMeasurementPath();
- alreadyExistingTimeSeries.add(
+ existingTimeSeriesAndAlignmentMismatch.add(
RpcUtils.getStatus(
- e.getErrorCode(),
MeasurementPath.transformDataToString(e.getMeasurementPath())));
+ e.getErrorCode(),
PartialPath.transformDataToString(e.getMeasurementPath())));
// remove the existing time series from plan
final int index =
measurementList.indexOf(measurementPath.getMeasurement());
@@ -379,6 +386,11 @@ public class SchemaExecutionVisitor extends
PlanVisitor<TSStatus, ISchemaRegion>
shouldRetry = false;
}
}
+ if (!((CreateAlignedTimeSeriesPlanImpl)
createAlignedTimeSeriesPlan).getAligned().get()) {
+ existingTimeSeriesAndAlignmentMismatch.add(
+ new TSStatus(TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode())
+ .setMessage(PartialPath.transformDataToString(devicePath)));
+ }
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index 61472663f56..5a63a4b4e3b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -584,7 +584,7 @@ public class RegionWriteExecutor {
alreadyExistingStatus.add(
RpcUtils.getStatus(
metadataException.getErrorCode(),
- MeasurementPath.transformDataToString(
+ PartialPath.transformDataToString(
((MeasurementAlreadyExistException)
metadataException)
.getMeasurementPath())));
} else {
@@ -674,7 +674,7 @@ public class RegionWriteExecutor {
alreadyExistingStatus.add(
RpcUtils.getStatus(
metadataException.getErrorCode(),
- MeasurementPath.transformDataToString(
+ PartialPath.transformDataToString(
((MeasurementAlreadyExistException)
metadataException)
.getMeasurementPath())));
} else {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 2ab134b3e40..5a346fdf8e0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -678,6 +678,9 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
} catch (AuthException | LoadAnalyzeTypeMismatchException e) {
throw e;
} catch (Exception e) {
+ if (e.getCause() instanceof LoadAnalyzeTypeMismatchException &&
isConvertOnTypeMismatch) {
+ throw (LoadAnalyzeTypeMismatchException) e.getCause();
+ }
LOGGER.warn("Auto create or verify schema error.", e);
throw new SemanticException(
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
index 34fa662d172..e7fa45be64a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.LoadAnalyzeTypeMismatchException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
@@ -190,7 +191,7 @@ class AutoCreateSchemaExecutor {
}
if (!devicesNeedAutoCreateTimeSeries.isEmpty()) {
- internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries,
context);
+ internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries,
context, false);
}
}
@@ -437,7 +438,7 @@ class AutoCreateSchemaExecutor {
}
if (!devicesNeedAutoCreateTimeSeries.isEmpty()) {
- internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries,
context);
+ internalCreateTimeSeries(schemaTree, devicesNeedAutoCreateTimeSeries,
context, true);
}
}
@@ -475,11 +476,15 @@ class AutoCreateSchemaExecutor {
List<CompressionType> compressors,
boolean isAligned,
MPPQueryContext context) {
+ final Map<PartialPath, Pair<Boolean, MeasurementGroup>> input =
+ Collections.singletonMap(devicePath, new Pair<>(isAligned, null));
List<MeasurementPath> measurementPathList =
- executeInternalCreateTimeseriesStatement(
+ executeInternalCreateTimeSeriesStatement(
+ input,
new InternalCreateTimeSeriesStatement(
devicePath, measurements, tsDataTypes, encodings, compressors,
isAligned),
- context);
+ context,
+ false);
Set<Integer> alreadyExistingMeasurementIndexSet =
measurementPathList.stream()
@@ -500,13 +505,16 @@ class AutoCreateSchemaExecutor {
null,
null,
null,
- isAligned);
+ input.get(devicePath).getLeft());
}
}
- // Auto create timeseries and return the existing timeseries info
- private List<MeasurementPath> executeInternalCreateTimeseriesStatement(
- final Statement statement, final MPPQueryContext context) {
+ // Auto create timeSeries and return the existing timeSeries info
+ private List<MeasurementPath> executeInternalCreateTimeSeriesStatement(
+ final Map<PartialPath, Pair<Boolean, MeasurementGroup>>
devicesNeedAutoCreateTimeSeries,
+ final Statement statement,
+ final MPPQueryContext context,
+ final boolean isLoad) {
final TSStatus status =
AuthorityChecker.checkAuthority(statement,
context.getSession().getUserName());
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -530,7 +538,23 @@ class AutoCreateSchemaExecutor {
for (final TSStatus subStatus : executionResult.status.subStatus) {
if (subStatus.code ==
TSStatusCode.TIMESERIES_ALREADY_EXIST.getStatusCode()) {
alreadyExistingMeasurements.add(
- MeasurementPath.parseDataFromString(subStatus.getMessage()));
+ (MeasurementPath)
PartialPath.parseDataFromString(subStatus.getMessage()));
+ } else if (subStatus.code ==
TSStatusCode.ALIGNED_TIMESERIES_ERROR.getStatusCode()) {
+ final PartialPath devicePath =
PartialPath.parseDataFromString(subStatus.getMessage());
+ final Pair<Boolean, MeasurementGroup> pair =
+ devicesNeedAutoCreateTimeSeries.get(devicePath);
+ if (!isLoad) {
+ pair.setLeft(!pair.getLeft());
+ } else {
+ // Load does not tolerate the device alignment mismatch
+ throw new SemanticException(
+ new LoadAnalyzeTypeMismatchException(
+ String.format(
+ "TimeSeries under this device is%s aligned, please use
create%sTimeSeries or change device. (Path: %s)",
+ !pair.getLeft() ? "" : " not",
+ !pair.getLeft() ? "Aligned" : "",
+ devicePath.getFullPath())));
+ }
} else {
failedCreationSet.add(subStatus);
}
@@ -597,11 +621,15 @@ class AutoCreateSchemaExecutor {
private void internalCreateTimeSeries(
ClusterSchemaTree schemaTree,
Map<PartialPath, Pair<Boolean, MeasurementGroup>>
devicesNeedAutoCreateTimeSeries,
- MPPQueryContext context) {
+ MPPQueryContext context,
+ final boolean isLoad) {
List<MeasurementPath> measurementPathList =
- executeInternalCreateTimeseriesStatement(
- new
InternalCreateMultiTimeSeriesStatement(devicesNeedAutoCreateTimeSeries),
context);
+ executeInternalCreateTimeSeriesStatement(
+ devicesNeedAutoCreateTimeSeries,
+ new
InternalCreateMultiTimeSeriesStatement(devicesNeedAutoCreateTimeSeries),
+ context,
+ isLoad);
schemaTree.appendMeasurementPaths(measurementPathList);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
index a789bda9290..5449d3c48bc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionMemoryImpl.java
@@ -586,7 +586,10 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
plan instanceof CreateTimeSeriesPlanImpl
&& ((CreateTimeSeriesPlanImpl) plan).isWithMerge()
|| plan instanceof CreateTimeSeriesNode
- && ((CreateTimeSeriesNode) plan).isGeneratedByPipe());
+ && ((CreateTimeSeriesNode) plan).isGeneratedByPipe(),
+ plan instanceof CreateTimeSeriesPlanImpl
+ ? ((CreateTimeSeriesPlanImpl) plan).getAligned()
+ : null);
// Should merge
if (Objects.isNull(leafMNode)) {
@@ -670,7 +673,10 @@ public class SchemaRegionMemoryImpl implements
ISchemaRegion {
aliasList,
(plan instanceof CreateAlignedTimeSeriesPlanImpl
&& ((CreateAlignedTimeSeriesPlanImpl) plan).isWithMerge()),
- existingMeasurementIndexes);
+ existingMeasurementIndexes,
+ (plan instanceof CreateAlignedTimeSeriesPlanImpl
+ ? ((CreateAlignedTimeSeriesPlanImpl) plan).getAligned()
+ : null));
// update statistics and schemaDataTypeNumMap
regionStatistics.addMeasurement(measurementMNodeList.size());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
index 78cae77750b..045d39400c7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/impl/SchemaRegionPBTreeImpl.java
@@ -634,7 +634,10 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
plan.getProps(),
plan.getAlias(),
(plan instanceof CreateTimeSeriesPlanImpl
- && ((CreateTimeSeriesPlanImpl) plan).isWithMerge()));
+ && ((CreateTimeSeriesPlanImpl) plan).isWithMerge()),
+ plan instanceof CreateTimeSeriesPlanImpl
+ ? ((CreateTimeSeriesPlanImpl) plan).getAligned()
+ : null);
try {
// Should merge
@@ -745,7 +748,10 @@ public class SchemaRegionPBTreeImpl implements
ISchemaRegion {
aliasList,
(plan instanceof CreateAlignedTimeSeriesPlanImpl
&& ((CreateAlignedTimeSeriesPlanImpl) plan).isWithMerge()),
- existingMeasurementIndexes);
+ existingMeasurementIndexes,
+ (plan instanceof CreateAlignedTimeSeriesPlanImpl
+ ? ((CreateAlignedTimeSeriesPlanImpl) plan).getAligned()
+ : null));
try {
// Update statistics and schemaDataTypeNumMap
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
index f8d2bd2316a..fd281f192a5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/mem/MTreeBelowSGMemoryImpl.java
@@ -211,7 +211,8 @@ public class MTreeBelowSGMemoryImpl {
final CompressionType compressor,
final Map<String, String> props,
final String alias,
- final boolean withMerge)
+ final boolean withMerge,
+ final AtomicBoolean isAligned)
throws MetadataException {
final String[] nodeNames = path.getNodes();
if (nodeNames.length <= 2) {
@@ -226,6 +227,12 @@ public class MTreeBelowSGMemoryImpl {
synchronized (this) {
final IMemMNode device =
checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent);
+ if (device.isDevice()
+ && device.getAsDeviceMNode().isAlignedNullable() != null
+ && isAligned != null) {
+ isAligned.set(device.getAsDeviceMNode().isAligned());
+ }
+
MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), props);
final String leafName = path.getMeasurement();
@@ -301,7 +308,8 @@ public class MTreeBelowSGMemoryImpl {
final List<CompressionType> compressors,
final List<String> aliasList,
final boolean withMerge,
- final Set<Integer> existingMeasurementIndexes)
+ final Set<Integer> existingMeasurementIndexes,
+ final AtomicBoolean isAligned)
throws MetadataException {
final List<IMeasurementMNode<IMemMNode>> measurementMNodeList = new
ArrayList<>();
MetaFormatUtils.checkSchemaMeasurementNames(measurements);
@@ -312,6 +320,12 @@ public class MTreeBelowSGMemoryImpl {
synchronized (this) {
final IMemMNode device =
checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent);
+ if (device.isDevice()
+ && device.getAsDeviceMNode().isAlignedNullable() != null
+ && isAligned != null) {
+ isAligned.set(device.getAsDeviceMNode().isAligned());
+ }
+
for (int i = 0; i < measurements.size(); i++) {
if (device.hasChild(measurements.get(i))) {
final IMemMNode node = device.getChild(measurements.get(i));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
index e00835cff03..679bf600dfa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/mtree/impl/pbtree/MTreeBelowSGCachedImpl.java
@@ -264,7 +264,8 @@ public class MTreeBelowSGCachedImpl {
String alias)
throws MetadataException {
IMeasurementMNode<ICachedMNode> measurementMNode =
- createTimeSeriesWithPinnedReturn(path, dataType, encoding, compressor,
props, alias, false);
+ createTimeSeriesWithPinnedReturn(
+ path, dataType, encoding, compressor, props, alias, false, null);
unPinMNode(measurementMNode.getAsMNode());
return measurementMNode;
}
@@ -287,7 +288,8 @@ public class MTreeBelowSGCachedImpl {
final CompressionType compressor,
final Map<String, String> props,
final String alias,
- final boolean withMerge)
+ final boolean withMerge,
+ final AtomicBoolean isAligned)
throws MetadataException {
final String[] nodeNames = path.getNodes();
if (nodeNames.length <= 2) {
@@ -303,6 +305,12 @@ public class MTreeBelowSGCachedImpl {
synchronized (this) {
ICachedMNode device =
checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent);
+ if (device.isDevice()
+ && device.getAsDeviceMNode().isAlignedNullable() != null
+ && isAligned != null) {
+ isAligned.set(device.getAsDeviceMNode().isAligned());
+ }
+
try {
MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), props);
@@ -385,7 +393,8 @@ public class MTreeBelowSGCachedImpl {
final List<CompressionType> compressors,
final List<String> aliasList,
final boolean withMerge,
- final Set<Integer> existingMeasurementIndexes)
+ final Set<Integer> existingMeasurementIndexes,
+ final AtomicBoolean isAligned)
throws MetadataException {
final List<IMeasurementMNode<ICachedMNode>> measurementMNodeList = new
ArrayList<>();
MetaFormatUtils.checkSchemaMeasurementNames(measurements);
@@ -397,6 +406,12 @@ public class MTreeBelowSGCachedImpl {
synchronized (this) {
ICachedMNode device =
checkAndAutoCreateDeviceNode(devicePath.getTailNode(), deviceParent);
+ if (device.isDevice()
+ && device.getAsDeviceMNode().isAlignedNullable() != null
+ && isAligned != null) {
+ isAligned.set(device.getAsDeviceMNode().isAligned());
+ }
+
try {
for (int i = 0; i < measurements.size(); i++) {
if (store.hasChild(device, measurements.get(i))) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java
index 4ffa8aca5b2..8a2c4c12e8c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateAlignedTimeSeriesPlanImpl.java
@@ -32,6 +32,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
public class CreateAlignedTimeSeriesPlanImpl implements
ICreateAlignedTimeSeriesPlan {
@@ -45,6 +46,7 @@ public class CreateAlignedTimeSeriesPlanImpl implements
ICreateAlignedTimeSeries
private List<Map<String, String>> attributesList;
private List<Long> tagOffsets = null;
private transient boolean withMerge;
+ private final transient AtomicBoolean aligned = new AtomicBoolean(true);
public CreateAlignedTimeSeriesPlanImpl() {}
@@ -194,4 +196,12 @@ public class CreateAlignedTimeSeriesPlanImpl implements
ICreateAlignedTimeSeries
tagOffsets = Objects.nonNull(tagOffsets) ? new ArrayList<>(tagOffsets) :
null;
}
}
+
+ public void setAligned(final boolean aligned) {
+ this.aligned.set(aligned);
+ }
+
+ public AtomicBoolean getAligned() {
+ return aligned;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java
index 1f5aa8e81ea..b011dca8435 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/write/req/impl/CreateTimeSeriesPlanImpl.java
@@ -28,6 +28,7 @@ import org.apache.tsfile.write.schema.MeasurementSchema;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
public class CreateTimeSeriesPlanImpl implements ICreateTimeSeriesPlan {
@@ -41,6 +42,7 @@ public class CreateTimeSeriesPlanImpl implements
ICreateTimeSeriesPlan {
private Map<String, String> attributes = null;
private long tagOffset = -1;
private transient boolean withMerge;
+ private final transient AtomicBoolean aligned = new AtomicBoolean(false);
public CreateTimeSeriesPlanImpl() {}
@@ -170,4 +172,12 @@ public class CreateTimeSeriesPlanImpl implements
ICreateTimeSeriesPlan {
public void setWithMerge(final boolean withMerge) {
this.withMerge = withMerge;
}
+
+ public void setAligned(final boolean aligned) {
+ this.aligned.set(aligned);
+ }
+
+ public AtomicBoolean getAligned() {
+ return aligned;
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java
index cf5f8ab57ba..148a3c56bd4 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/path/MeasurementPathTest.java
@@ -43,8 +43,8 @@ public class MeasurementPathTest {
new MeasurementPath(
new PartialPath("root.sg.d.s"), new MeasurementSchema("s",
TSDataType.INT32), true);
rawPath.setMeasurementAlias("alias");
- String string = MeasurementPath.transformDataToString(rawPath);
- MeasurementPath newPath = MeasurementPath.parseDataFromString(string);
+ String string = PartialPath.transformDataToString(rawPath);
+ MeasurementPath newPath = (MeasurementPath)
PartialPath.parseDataFromString(string);
Assert.assertEquals(rawPath.getFullPath(), newPath.getFullPath());
Assert.assertEquals(rawPath.getMeasurementAlias(),
newPath.getMeasurementAlias());
Assert.assertEquals(rawPath.getMeasurementSchema(),
newPath.getMeasurementSchema());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
index 32443cfe9f7..59d2d9d0440 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/MeasurementPath.java
@@ -33,12 +33,9 @@ import
org.apache.tsfile.write.schema.VectorMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
import java.rmi.UnexpectedException;
import java.util.HashMap;
import java.util.Map;
@@ -293,27 +290,4 @@ public class MeasurementPath extends PartialPath {
public PartialPath transformToPartialPath() {
return getDevicePath().concatNode(getTailNode());
}
-
- /**
- * In specific scenarios, like internal create timeseries, the message can
only be passed as
- * String format.
- */
- public static String transformDataToString(MeasurementPath measurementPath) {
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
- try {
- measurementPath.serialize(dataOutputStream);
- } catch (IOException ignored) {
- // this exception won't happen.
- }
- byte[] bytes = byteArrayOutputStream.toByteArray();
- // must use single-byte char sets
- return new String(bytes, StandardCharsets.ISO_8859_1);
- }
-
- public static MeasurementPath parseDataFromString(String
measurementPathData) {
- return (MeasurementPath)
- PathDeserializeUtil.deserialize(
-
ByteBuffer.wrap(measurementPathData.getBytes(StandardCharsets.ISO_8859_1)));
- }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
index fc04c94ba5b..354724bd487 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PartialPath.java
@@ -36,9 +36,12 @@ import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -964,6 +967,29 @@ public class PartialPath extends Path implements
Comparable<Path>, Cloneable {
return this;
}
+ /**
+ * In specific scenarios, like internal create timeseries, the message can
only be passed as
+ * String format.
+ */
+ public static String transformDataToString(PartialPath partialPath) {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
+ try {
+ partialPath.serialize(dataOutputStream);
+ } catch (IOException ignored) {
+ // this exception won't happen.
+ }
+ byte[] bytes = byteArrayOutputStream.toByteArray();
+ // must use single-byte char sets
+ return new String(bytes, StandardCharsets.ISO_8859_1);
+ }
+
+ public static PartialPath parseDataFromString(String measurementPathData) {
+ return (PartialPath)
+ PathDeserializeUtil.deserialize(
+
ByteBuffer.wrap(measurementPathData.getBytes(StandardCharsets.ISO_8859_1)));
+ }
+
/** Return true if the path ends with ** and no other nodes contain *.
Otherwise, return false. */
public boolean isPrefixPath() {
if (nodes.length <= 0) {