This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new 9fc107e763a [To rel/1.2] Support not checking `isAligned` in insertion
(#10142)
9fc107e763a is described below
commit 9fc107e763a1e972a59abb08117a0d547ee3908e
Author: 橘子 <[email protected]>
AuthorDate: Wed Jun 14 09:51:15 2023 +0800
[To rel/1.2] Support not checking `isAligned` in insertion (#10142)
---
.../db/it/aligned/IoTDBInsertAlignedValuesIT.java | 60 ++++++++++++++------
.../db/metadata/cache/TimeSeriesSchemaCache.java | 8 +--
.../db/metadata/mtree/MTreeBelowSGCachedImpl.java | 4 +-
.../db/metadata/mtree/MTreeBelowSGMemoryImpl.java | 4 +-
.../mpp/common/schematree/ClusterSchemaTree.java | 10 +++-
.../mpp/plan/analyze/schema/ISchemaValidation.java | 6 ++
.../plan/analyze/schema/NormalSchemaFetcher.java | 66 +++++++++++++++++++---
.../plan/statement/crud/InsertRowStatement.java | 11 +---
.../plan/statement/crud/InsertTabletStatement.java | 11 +---
9 files changed, 125 insertions(+), 55 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValuesIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValuesIT.java
index d8f7ba8b6fe..786a67d9164 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValuesIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValuesIT.java
@@ -292,36 +292,62 @@ public class IoTDBInsertAlignedValuesIT {
}
@Test
- public void testInsertAlignedTimeseriesWithoutAligned() {
+ public void testInsertAlignedTimeseriesWithoutAligned() throws SQLException {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute(
"CREATE ALIGNED TIMESERIES root.lz.dev.GPS2(latitude INT32
encoding=PLAIN compressor=SNAPPY, longitude INT32 encoding=PLAIN
compressor=SNAPPY) ");
- statement.execute("insert into root.lz.dev.GPS2(time,latitude,longitude)
values(1,1.3,6.7)");
- fail();
- } catch (SQLException e) {
- assertTrue(
- e.getMessage(),
- e.getMessage()
- .contains("timeseries under this device are aligned, please use
aligned interface"));
+ statement.execute("insert into root.lz.dev.GPS2(time,latitude,longitude)
values(1,123,456)");
+ // it's supported.
+ }
+ }
+
+ @Test
+ public void testInsertTimeseriesWithUnMatchedAlignedType() throws
SQLException {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("create ALIGNED timeseries root.db.d_aligned(s01 INT64
encoding=RLE)");
+ statement.execute("insert into root.db.d_aligned(time, s01) aligned
values (4000, 123)");
+ statement.execute("insert into root.db.d_aligned(time, s01) values
(5000, 456)");
+ statement.execute("create timeseries root.db.d_not_aligned.s01 INT64
encoding=RLE");
+ statement.execute("insert into root.db.d_not_aligned(time, s01) values
(4000, 987)");
+ statement.execute("insert into root.db.d_not_aligned(time, s01) aligned
values (5000, 654)");
+
+ try (ResultSet resultSet = statement.executeQuery("select s01 from
root.db.d_aligned")) {
+ assertTrue(resultSet.next());
+ assertEquals(4000, resultSet.getLong(1));
+ assertEquals(123, resultSet.getLong(2));
+
+ assertTrue(resultSet.next());
+ assertEquals(5000, resultSet.getLong(1));
+ assertEquals(456, resultSet.getLong(2));
+
+ assertFalse(resultSet.next());
+ }
+
+ try (ResultSet resultSet = statement.executeQuery("select s01 from
root.db.d_not_aligned")) {
+ assertTrue(resultSet.next());
+ assertEquals(4000, resultSet.getLong(1));
+ assertEquals(987, resultSet.getLong(2));
+
+ assertTrue(resultSet.next());
+ assertEquals(5000, resultSet.getLong(1));
+ assertEquals(654, resultSet.getLong(2));
+
+ assertFalse(resultSet.next());
+ }
}
}
@Test
- public void testInsertNonAlignedTimeseriesWithAligned() {
+ public void testInsertNonAlignedTimeseriesWithAligned() throws SQLException {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
statement.execute("CREATE TIMESERIES root.lz.dev.GPS3.latitude with
datatype=INT32");
statement.execute("CREATE TIMESERIES root.lz.dev.GPS3.longitude with
datatype=INT32");
statement.execute(
- "insert into root.lz.dev.GPS3(time,latitude,longitude) aligned
values(1,1.3,6.7)");
- fail();
- } catch (SQLException e) {
- assertTrue(
- e.getMessage(),
- e.getMessage()
- .contains(
- "timeseries under this device are not aligned, please use
non-aligned interface"));
+ "insert into root.lz.dev.GPS3(time,latitude,longitude) aligned
values(1,123,456)");
+ // it's supported.
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
index dbb0b035827..861f7afb580 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
@@ -166,7 +166,6 @@ public class TimeSeriesSchemaCache {
ISchemaComputation schemaComputation) {
List<Integer> indexOfMissingMeasurements = new ArrayList<>();
List<String> missedPathStringList = new ArrayList<>();
- final AtomicBoolean isFirstMeasurement = new AtomicBoolean(true);
Pair<Integer, Integer> beginToEnd =
schemaComputation.getRangeOfLogicalViewSchemaListRecorded();
List<LogicalViewSchema> logicalViewSchemaList =
schemaComputation.getLogicalViewSchemaList();
List<Integer> indexListOfLogicalViewPaths =
schemaComputation.getIndexListOfLogicalViewPaths();
@@ -198,10 +197,9 @@ public class TimeSeriesSchemaCache {
if (value == null) {
indexOfMissingMeasurements.add(recordMissingIndex);
} else {
- if (isFirstMeasurement.get()) {
- schemaComputation.computeDevice(value.isAligned());
- isFirstMeasurement.getAndSet(false);
- }
+ // Can not call function computeDevice here, because the value
is source of one
+ // view, but schemaComputation is the device in this insert
statement. The
+ // computation between them is miss matched.
if (value.isLogicalView()) {
// does not support views in views
throw new RuntimeException(
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index 92400aa0563..e3a1ebcb7b8 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -289,7 +289,7 @@ public class MTreeBelowSGCachedImpl {
if (device.isDevice() && device.getAsDeviceMNode().isAligned()) {
throw new AlignedTimeseriesException(
- "timeseries under this entity is aligned, please use
createAlignedTimeseries or change entity.",
+ "timeseries under this device is aligned, please use
createAlignedTimeseries or change device.",
device.getFullPath());
}
@@ -371,7 +371,7 @@ public class MTreeBelowSGCachedImpl {
if (device.isDevice() && !device.getAsDeviceMNode().isAligned()) {
throw new AlignedTimeseriesException(
- "Timeseries under this entity is not aligned, please use
createTimeseries or change entity.",
+ "Timeseries under this device is not aligned, please use
createTimeseries or change device.",
devicePath.getFullPath());
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index 9ff493f2ee4..f7d6a3d8062 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -238,7 +238,7 @@ public class MTreeBelowSGMemoryImpl {
if (device.isDevice() && device.getAsDeviceMNode().isAligned()) {
throw new AlignedTimeseriesException(
- "timeseries under this entity is aligned, please use
createAlignedTimeseries or change entity.",
+ "timeseries under this device is aligned, please use
createAlignedTimeseries or change device.",
device.getFullPath());
}
@@ -318,7 +318,7 @@ public class MTreeBelowSGMemoryImpl {
if (device.isDevice() && !device.getAsDeviceMNode().isAligned()) {
throw new AlignedTimeseriesException(
- "Timeseries under this entity is not aligned, please use
createTimeseries or change entity.",
+ "Timeseries under this device is not aligned, please use
createTimeseries or change device.",
devicePath.getFullPath());
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
index 6a5532f71f4..94eb822ba5e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
@@ -176,9 +176,7 @@ public class ClusterSchemaTree implements ISchemaTree {
if (cur == null) {
return indexOfTargetMeasurements;
}
- if (cur.isEntity()) {
- schemaComputation.computeDevice(cur.getAsEntityNode().isAligned());
- }
+ boolean firstNonViewMeasurement = true;
List<Integer> indexOfMissingMeasurements = new ArrayList<>();
SchemaNode node;
for (int index : indexOfTargetMeasurements) {
@@ -186,6 +184,12 @@ public class ClusterSchemaTree implements ISchemaTree {
if (node == null) {
indexOfMissingMeasurements.add(index);
} else {
+ if (firstNonViewMeasurement) {
+ if (!node.getAsMeasurementNode().isLogicalView()) {
+ schemaComputation.computeDevice(cur.getAsEntityNode().isAligned());
+ firstNonViewMeasurement = false;
+ }
+ }
schemaComputation.computeMeasurement(index,
node.getAsMeasurementNode());
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaValidation.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaValidation.java
index 1d0252e92f6..4d303c3b633 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaValidation.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaValidation.java
@@ -40,6 +40,12 @@ public interface ISchemaValidation extends
ISchemaComputationWithAutoCreation {
validateMeasurementSchema(index, measurementSchemaInfo, isAligned);
}
+ /**
+ * Record the real value of <code>isAligned</code> of this device. This will
change the value of
+ * <code>isAligned</code> in this insert statement.
+ *
+ * @param isAligned The real value of attribute <code>isAligned</code> of
this device schema
+ */
void validateDeviceSchema(boolean isAligned);
void validateMeasurementSchema(int index, IMeasurementSchemaInfo
measurementSchemaInfo);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/NormalSchemaFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/NormalSchemaFetcher.java
index 09da201f259..afb531a8aa1 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/NormalSchemaFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/NormalSchemaFetcher.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -101,6 +102,9 @@ class NormalSchemaFetcher {
List<Integer> processNormalTimeSeries(
ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation) {
+ // [Step 0] Record the input value.
+ boolean isAlignedPutIn = schemaComputationWithAutoCreation.isAligned();
+
// [Step 1] Cache 1. compute measurements and record logical views.
List<Integer> indexOfMissingMeasurements =
schemaCache.computeWithoutTemplate(schemaComputationWithAutoCreation);
@@ -160,14 +164,19 @@ class NormalSchemaFetcher {
// [Step 5] Auto Create and process the missing schema
if (config.isAutoCreateSchemaEnabled()) {
+ // Check the isAligned value. If the input value is different from the
actual value of the
+ // existing device, throw exception.
+ PartialPath devicePath =
schemaComputationWithAutoCreation.getDevicePath();
+ validateIsAlignedValueIfAutoCreate(
+ schemaComputationWithAutoCreation.isAligned(), isAlignedPutIn,
devicePath);
ClusterSchemaTree schemaTree = new ClusterSchemaTree();
autoCreateSchemaExecutor.autoCreateTimeSeries(
schemaTree,
- schemaComputationWithAutoCreation.getDevicePath(),
+ devicePath,
indexOfMissingMeasurements,
schemaComputationWithAutoCreation.getMeasurements(),
schemaComputationWithAutoCreation::getDataType,
- schemaComputationWithAutoCreation.isAligned());
+ isAlignedPutIn);
indexOfMissingMeasurements =
schemaTree.compute(schemaComputationWithAutoCreation,
indexOfMissingMeasurements);
}
@@ -177,6 +186,14 @@ class NormalSchemaFetcher {
void processNormalTimeSeries(
List<? extends ISchemaComputationWithAutoCreation>
schemaComputationWithAutoCreationList) {
+ // [Step 0] Record the input value.
+ List<Boolean> isAlignedPutInList = null;
+ if (config.isAutoCreateSchemaEnabled()) {
+ isAlignedPutInList =
+ schemaComputationWithAutoCreationList.stream()
+ .map(ISchemaComputationWithAutoCreation::isAligned)
+ .collect(Collectors.toList());
+ }
// [Step 1] Cache 1. compute measurements and record logical views.
List<Integer> indexOfDevicesWithMissingMeasurements = new ArrayList<>();
@@ -296,12 +313,22 @@ class NormalSchemaFetcher {
// [Step 5] Auto Create and process the missing schema
if (config.isAutoCreateSchemaEnabled()) {
+ List<PartialPath> devicePathList =
+ schemaComputationWithAutoCreationList.stream()
+ .map(ISchemaComputationWithAutoCreation::getDevicePath)
+ .collect(Collectors.toList());
+ List<Boolean> isAlignedRealList =
+ schemaComputationWithAutoCreationList.stream()
+ .map(ISchemaComputationWithAutoCreation::isAligned)
+ .collect(Collectors.toList());
+ // Check the isAligned value. If the input value is different from the
actual value of the
+ // existing device, throw exception.
+ validateIsAlignedValueIfAutoCreate(isAlignedRealList,
isAlignedPutInList, devicePathList);
+
ClusterSchemaTree schemaTree = new ClusterSchemaTree();
autoCreateSchemaExecutor.autoCreateTimeSeries(
schemaTree,
- schemaComputationWithAutoCreationList.stream()
- .map(ISchemaComputationWithAutoCreation::getDevicePath)
- .collect(Collectors.toList()),
+ devicePathList,
indexOfDevicesNeedAutoCreateSchema,
indexOfMeasurementsNeedAutoCreate,
schemaComputationWithAutoCreationList.stream()
@@ -317,9 +344,7 @@ class NormalSchemaFetcher {
return dataTypes;
})
.collect(Collectors.toList()),
- schemaComputationWithAutoCreationList.stream()
- .map(ISchemaComputationWithAutoCreation::isAligned)
- .collect(Collectors.toList()));
+ isAlignedPutInList);
indexOfDevicesWithMissingMeasurements = new ArrayList<>();
indexOfMissingMeasurementsList = new ArrayList<>();
for (int i = 0; i < indexOfDevicesNeedAutoCreateSchema.size(); i++) {
@@ -352,4 +377,29 @@ class NormalSchemaFetcher {
}
}
}
+
+ private void validateIsAlignedValueIfAutoCreate(
+ List<Boolean> realValueList, List<Boolean> putInValueList,
List<PartialPath> devicePathList) {
+ int checkLen =
+ Math.min(Math.min(realValueList.size(), putInValueList.size()),
devicePathList.size());
+ for (int i = 0; i < checkLen; i++) {
+ validateIsAlignedValueIfAutoCreate(
+ realValueList.get(i), putInValueList.get(i), devicePathList.get(i));
+ }
+ }
+
+ private void validateIsAlignedValueIfAutoCreate(
+ boolean realValue, boolean putInValue, PartialPath devicePath) {
+ if (realValue != putInValue) {
+ String msg;
+ if (realValue) {
+ msg =
+ "Timeseries under this device is aligned, please use
createTimeseries or change device.";
+ } else {
+ msg =
+ "Timeseries under this device is not aligned, please use
createTimeseries or change device.";
+ }
+ throw new RuntimeException(new AlignedTimeseriesException(msg,
devicePath.getFullPath()));
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
index dbd2e23dcec..902ca3d9ac3 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -278,6 +277,7 @@ public class InsertRowStatement extends InsertBaseStatement
implements ISchemaVa
statement.setTime(this.time);
statement.setNeedInferType(this.isNeedInferType);
statement.setDevicePath(entry.getKey());
+ statement.setAligned(this.isAligned);
Object[] values = new Object[pairList.size()];
String[] measurements = new String[pairList.size()];
MeasurementSchema[] measurementSchemas = new
MeasurementSchema[pairList.size()];
@@ -354,14 +354,7 @@ public class InsertRowStatement extends
InsertBaseStatement implements ISchemaVa
@Override
public void validateDeviceSchema(boolean isAligned) {
- if (this.isAligned != isAligned) {
- throw new SemanticException(
- new AlignedTimeseriesException(
- String.format(
- "timeseries under this device are%s aligned, " + "please use
%s interface",
- isAligned ? "" : " not", isAligned ? "aligned" :
"non-aligned"),
- devicePath.getFullPath()));
- }
+ this.isAligned = isAligned;
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
index d515b5168ca..8223b3e8641 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.mpp.plan.statement.crud;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
-import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.sql.SemanticException;
@@ -221,6 +220,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
statement.setTimes(this.times);
statement.setDevicePath(entry.getKey());
statement.setRowCount(this.rowCount);
+ statement.setAligned(this.isAligned);
Object[] columns = new Object[pairList.size()];
String[] measurements = new String[pairList.size()];
BitMap[] bitMaps = new BitMap[pairList.size()];
@@ -323,14 +323,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
@Override
public void validateDeviceSchema(boolean isAligned) {
- if (this.isAligned != isAligned) {
- throw new SemanticException(
- new AlignedTimeseriesException(
- String.format(
- "timeseries under this device are%s aligned, " + "please use
%s interface",
- isAligned ? "" : " not", isAligned ? "aligned" :
"non-aligned"),
- devicePath.getFullPath()));
- }
+ this.isAligned = isAligned;
}
@Override