This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c5c3c3262c4 Support not checking `isAligned` in insertion (#10141)
c5c3c3262c4 is described below

commit c5c3c3262c40fba2268a5d277a236b37ec908011
Author: 橘子 <[email protected]>
AuthorDate: Wed Jun 14 09:51:33 2023 +0800

    Support not checking `isAligned` in insertion (#10141)
---
 .../db/it/aligned/IoTDBInsertAlignedValuesIT.java  | 60 ++++++++++++++------
 .../db/metadata/cache/TimeSeriesSchemaCache.java   |  8 +--
 .../db/metadata/mtree/MTreeBelowSGCachedImpl.java  |  4 +-
 .../db/metadata/mtree/MTreeBelowSGMemoryImpl.java  |  4 +-
 .../mpp/common/schematree/ClusterSchemaTree.java   | 10 +++-
 .../mpp/plan/analyze/schema/ISchemaValidation.java |  6 ++
 .../plan/analyze/schema/NormalSchemaFetcher.java   | 66 +++++++++++++++++++---
 .../plan/statement/crud/InsertRowStatement.java    | 11 +---
 .../plan/statement/crud/InsertTabletStatement.java | 11 +---
 9 files changed, 125 insertions(+), 55 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValuesIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValuesIT.java
index d8f7ba8b6fe..786a67d9164 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValuesIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aligned/IoTDBInsertAlignedValuesIT.java
@@ -292,36 +292,62 @@ public class IoTDBInsertAlignedValuesIT {
   }
 
   @Test
-  public void testInsertAlignedTimeseriesWithoutAligned() {
+  public void testInsertAlignedTimeseriesWithoutAligned() throws SQLException {
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
       statement.execute(
           "CREATE ALIGNED TIMESERIES root.lz.dev.GPS2(latitude INT32 
encoding=PLAIN compressor=SNAPPY, longitude INT32 encoding=PLAIN 
compressor=SNAPPY) ");
-      statement.execute("insert into root.lz.dev.GPS2(time,latitude,longitude) 
values(1,1.3,6.7)");
-      fail();
-    } catch (SQLException e) {
-      assertTrue(
-          e.getMessage(),
-          e.getMessage()
-              .contains("timeseries under this device are aligned, please use 
aligned interface"));
+      statement.execute("insert into root.lz.dev.GPS2(time,latitude,longitude) 
values(1,123,456)");
+      // it's supported.
+    }
+  }
+
+  @Test
+  public void testInsertTimeseriesWithUnMatchedAlignedType() throws 
SQLException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("create ALIGNED timeseries root.db.d_aligned(s01 INT64 
encoding=RLE)");
+      statement.execute("insert into root.db.d_aligned(time, s01) aligned 
values (4000, 123)");
+      statement.execute("insert into root.db.d_aligned(time, s01) values 
(5000, 456)");
+      statement.execute("create timeseries root.db.d_not_aligned.s01 INT64 
encoding=RLE");
+      statement.execute("insert into root.db.d_not_aligned(time, s01) values 
(4000, 987)");
+      statement.execute("insert into root.db.d_not_aligned(time, s01) aligned 
values (5000, 654)");
+
+      try (ResultSet resultSet = statement.executeQuery("select s01 from 
root.db.d_aligned")) {
+        assertTrue(resultSet.next());
+        assertEquals(4000, resultSet.getLong(1));
+        assertEquals(123, resultSet.getLong(2));
+
+        assertTrue(resultSet.next());
+        assertEquals(5000, resultSet.getLong(1));
+        assertEquals(456, resultSet.getLong(2));
+
+        assertFalse(resultSet.next());
+      }
+
+      try (ResultSet resultSet = statement.executeQuery("select s01 from 
root.db.d_not_aligned")) {
+        assertTrue(resultSet.next());
+        assertEquals(4000, resultSet.getLong(1));
+        assertEquals(987, resultSet.getLong(2));
+
+        assertTrue(resultSet.next());
+        assertEquals(5000, resultSet.getLong(1));
+        assertEquals(654, resultSet.getLong(2));
+
+        assertFalse(resultSet.next());
+      }
     }
   }
 
   @Test
-  public void testInsertNonAlignedTimeseriesWithAligned() {
+  public void testInsertNonAlignedTimeseriesWithAligned() throws SQLException {
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
       statement.execute("CREATE TIMESERIES root.lz.dev.GPS3.latitude with 
datatype=INT32");
       statement.execute("CREATE TIMESERIES root.lz.dev.GPS3.longitude with 
datatype=INT32");
       statement.execute(
-          "insert into root.lz.dev.GPS3(time,latitude,longitude) aligned 
values(1,1.3,6.7)");
-      fail();
-    } catch (SQLException e) {
-      assertTrue(
-          e.getMessage(),
-          e.getMessage()
-              .contains(
-                  "timeseries under this device are not aligned, please use 
non-aligned interface"));
+          "insert into root.lz.dev.GPS3(time,latitude,longitude) aligned 
values(1,123,456)");
+      // it's supported.
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
index dbb0b035827..861f7afb580 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/cache/TimeSeriesSchemaCache.java
@@ -166,7 +166,6 @@ public class TimeSeriesSchemaCache {
       ISchemaComputation schemaComputation) {
     List<Integer> indexOfMissingMeasurements = new ArrayList<>();
     List<String> missedPathStringList = new ArrayList<>();
-    final AtomicBoolean isFirstMeasurement = new AtomicBoolean(true);
     Pair<Integer, Integer> beginToEnd = 
schemaComputation.getRangeOfLogicalViewSchemaListRecorded();
     List<LogicalViewSchema> logicalViewSchemaList = 
schemaComputation.getLogicalViewSchemaList();
     List<Integer> indexListOfLogicalViewPaths = 
schemaComputation.getIndexListOfLogicalViewPaths();
@@ -198,10 +197,9 @@ public class TimeSeriesSchemaCache {
               if (value == null) {
                 indexOfMissingMeasurements.add(recordMissingIndex);
               } else {
-                if (isFirstMeasurement.get()) {
-                  schemaComputation.computeDevice(value.isAligned());
-                  isFirstMeasurement.getAndSet(false);
-                }
+                // Can not call function computeDevice here, because the value 
is source of one
+                // view, but schemaComputation is the device in this insert 
statement. The
+                // computation between them is miss matched.
                 if (value.isLogicalView()) {
                   // does not support views in views
                   throw new RuntimeException(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
index 92400aa0563..e3a1ebcb7b8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGCachedImpl.java
@@ -289,7 +289,7 @@ public class MTreeBelowSGCachedImpl {
 
           if (device.isDevice() && device.getAsDeviceMNode().isAligned()) {
             throw new AlignedTimeseriesException(
-                "timeseries under this entity is aligned, please use 
createAlignedTimeseries or change entity.",
+                "timeseries under this device is aligned, please use 
createAlignedTimeseries or change device.",
                 device.getFullPath());
           }
 
@@ -371,7 +371,7 @@ public class MTreeBelowSGCachedImpl {
 
           if (device.isDevice() && !device.getAsDeviceMNode().isAligned()) {
             throw new AlignedTimeseriesException(
-                "Timeseries under this entity is not aligned, please use 
createTimeseries or change entity.",
+                "Timeseries under this device is not aligned, please use 
createTimeseries or change device.",
                 devicePath.getFullPath());
           }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
index 9ff493f2ee4..f7d6a3d8062 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTreeBelowSGMemoryImpl.java
@@ -238,7 +238,7 @@ public class MTreeBelowSGMemoryImpl {
 
       if (device.isDevice() && device.getAsDeviceMNode().isAligned()) {
         throw new AlignedTimeseriesException(
-            "timeseries under this entity is aligned, please use 
createAlignedTimeseries or change entity.",
+            "timeseries under this device is aligned, please use 
createAlignedTimeseries or change device.",
             device.getFullPath());
       }
 
@@ -318,7 +318,7 @@ public class MTreeBelowSGMemoryImpl {
 
       if (device.isDevice() && !device.getAsDeviceMNode().isAligned()) {
         throw new AlignedTimeseriesException(
-            "Timeseries under this entity is not aligned, please use 
createTimeseries or change entity.",
+            "Timeseries under this device is not aligned, please use 
createTimeseries or change device.",
             devicePath.getFullPath());
       }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
index 6a5532f71f4..94eb822ba5e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/schematree/ClusterSchemaTree.java
@@ -176,9 +176,7 @@ public class ClusterSchemaTree implements ISchemaTree {
     if (cur == null) {
       return indexOfTargetMeasurements;
     }
-    if (cur.isEntity()) {
-      schemaComputation.computeDevice(cur.getAsEntityNode().isAligned());
-    }
+    boolean firstNonViewMeasurement = true;
     List<Integer> indexOfMissingMeasurements = new ArrayList<>();
     SchemaNode node;
     for (int index : indexOfTargetMeasurements) {
@@ -186,6 +184,12 @@ public class ClusterSchemaTree implements ISchemaTree {
       if (node == null) {
         indexOfMissingMeasurements.add(index);
       } else {
+        if (firstNonViewMeasurement) {
+          if (!node.getAsMeasurementNode().isLogicalView()) {
+            schemaComputation.computeDevice(cur.getAsEntityNode().isAligned());
+            firstNonViewMeasurement = false;
+          }
+        }
         schemaComputation.computeMeasurement(index, 
node.getAsMeasurementNode());
       }
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaValidation.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaValidation.java
index 1d0252e92f6..4d303c3b633 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaValidation.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/ISchemaValidation.java
@@ -40,6 +40,12 @@ public interface ISchemaValidation extends 
ISchemaComputationWithAutoCreation {
     validateMeasurementSchema(index, measurementSchemaInfo, isAligned);
   }
 
+  /**
+   * Record the real value of <code>isAligned</code> of this device. This will 
change the value of
+   * <code>isAligned</code> in this insert statement.
+   *
+   * @param isAligned The real value of attribute <code>isAligned</code> of 
this device schema
+   */
   void validateDeviceSchema(boolean isAligned);
 
   void validateMeasurementSchema(int index, IMeasurementSchemaInfo 
measurementSchemaInfo);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/NormalSchemaFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/NormalSchemaFetcher.java
index 09da201f259..afb531a8aa1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/NormalSchemaFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/schema/NormalSchemaFetcher.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
 import org.apache.iotdb.db.metadata.cache.DataNodeSchemaCache;
 import org.apache.iotdb.db.mpp.common.schematree.ClusterSchemaTree;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -101,6 +102,9 @@ class NormalSchemaFetcher {
 
   List<Integer> processNormalTimeSeries(
       ISchemaComputationWithAutoCreation schemaComputationWithAutoCreation) {
+    // [Step 0] Record the input value.
+    boolean isAlignedPutIn = schemaComputationWithAutoCreation.isAligned();
+
     // [Step 1] Cache 1. compute measurements and record logical views.
     List<Integer> indexOfMissingMeasurements =
         schemaCache.computeWithoutTemplate(schemaComputationWithAutoCreation);
@@ -160,14 +164,19 @@ class NormalSchemaFetcher {
 
     // [Step 5] Auto Create and process the missing schema
     if (config.isAutoCreateSchemaEnabled()) {
+      // Check the isAligned value. If the input value is different from the 
actual value of the
+      // existing device, throw exception.
+      PartialPath devicePath = 
schemaComputationWithAutoCreation.getDevicePath();
+      validateIsAlignedValueIfAutoCreate(
+          schemaComputationWithAutoCreation.isAligned(), isAlignedPutIn, 
devicePath);
       ClusterSchemaTree schemaTree = new ClusterSchemaTree();
       autoCreateSchemaExecutor.autoCreateTimeSeries(
           schemaTree,
-          schemaComputationWithAutoCreation.getDevicePath(),
+          devicePath,
           indexOfMissingMeasurements,
           schemaComputationWithAutoCreation.getMeasurements(),
           schemaComputationWithAutoCreation::getDataType,
-          schemaComputationWithAutoCreation.isAligned());
+          isAlignedPutIn);
       indexOfMissingMeasurements =
           schemaTree.compute(schemaComputationWithAutoCreation, 
indexOfMissingMeasurements);
     }
@@ -177,6 +186,14 @@ class NormalSchemaFetcher {
 
   void processNormalTimeSeries(
       List<? extends ISchemaComputationWithAutoCreation> 
schemaComputationWithAutoCreationList) {
+    // [Step 0] Record the input value.
+    List<Boolean> isAlignedPutInList = null;
+    if (config.isAutoCreateSchemaEnabled()) {
+      isAlignedPutInList =
+          schemaComputationWithAutoCreationList.stream()
+              .map(ISchemaComputationWithAutoCreation::isAligned)
+              .collect(Collectors.toList());
+    }
 
     // [Step 1] Cache 1. compute measurements and record logical views.
     List<Integer> indexOfDevicesWithMissingMeasurements = new ArrayList<>();
@@ -296,12 +313,22 @@ class NormalSchemaFetcher {
 
     // [Step 5] Auto Create and process the missing schema
     if (config.isAutoCreateSchemaEnabled()) {
+      List<PartialPath> devicePathList =
+          schemaComputationWithAutoCreationList.stream()
+              .map(ISchemaComputationWithAutoCreation::getDevicePath)
+              .collect(Collectors.toList());
+      List<Boolean> isAlignedRealList =
+          schemaComputationWithAutoCreationList.stream()
+              .map(ISchemaComputationWithAutoCreation::isAligned)
+              .collect(Collectors.toList());
+      // Check the isAligned value. If the input value is different from the 
actual value of the
+      // existing device, throw exception.
+      validateIsAlignedValueIfAutoCreate(isAlignedRealList, 
isAlignedPutInList, devicePathList);
+
       ClusterSchemaTree schemaTree = new ClusterSchemaTree();
       autoCreateSchemaExecutor.autoCreateTimeSeries(
           schemaTree,
-          schemaComputationWithAutoCreationList.stream()
-              .map(ISchemaComputationWithAutoCreation::getDevicePath)
-              .collect(Collectors.toList()),
+          devicePathList,
           indexOfDevicesNeedAutoCreateSchema,
           indexOfMeasurementsNeedAutoCreate,
           schemaComputationWithAutoCreationList.stream()
@@ -317,9 +344,7 @@ class NormalSchemaFetcher {
                     return dataTypes;
                   })
               .collect(Collectors.toList()),
-          schemaComputationWithAutoCreationList.stream()
-              .map(ISchemaComputationWithAutoCreation::isAligned)
-              .collect(Collectors.toList()));
+          isAlignedPutInList);
       indexOfDevicesWithMissingMeasurements = new ArrayList<>();
       indexOfMissingMeasurementsList = new ArrayList<>();
       for (int i = 0; i < indexOfDevicesNeedAutoCreateSchema.size(); i++) {
@@ -352,4 +377,29 @@ class NormalSchemaFetcher {
       }
     }
   }
+
+  private void validateIsAlignedValueIfAutoCreate(
+      List<Boolean> realValueList, List<Boolean> putInValueList, 
List<PartialPath> devicePathList) {
+    int checkLen =
+        Math.min(Math.min(realValueList.size(), putInValueList.size()), 
devicePathList.size());
+    for (int i = 0; i < checkLen; i++) {
+      validateIsAlignedValueIfAutoCreate(
+          realValueList.get(i), putInValueList.get(i), devicePathList.get(i));
+    }
+  }
+
+  private void validateIsAlignedValueIfAutoCreate(
+      boolean realValue, boolean putInValue, PartialPath devicePath) {
+    if (realValue != putInValue) {
+      String msg;
+      if (realValue) {
+        msg =
+            "Timeseries under this device is aligned, please use 
createTimeseries or change device.";
+      } else {
+        msg =
+            "Timeseries under this device is not aligned, please use 
createTimeseries or change device.";
+      }
+      throw new RuntimeException(new AlignedTimeseriesException(msg, 
devicePath.getFullPath()));
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
index dbd2e23dcec..902ca3d9ac3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowStatement.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
 import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -278,6 +277,7 @@ public class InsertRowStatement extends InsertBaseStatement 
implements ISchemaVa
       statement.setTime(this.time);
       statement.setNeedInferType(this.isNeedInferType);
       statement.setDevicePath(entry.getKey());
+      statement.setAligned(this.isAligned);
       Object[] values = new Object[pairList.size()];
       String[] measurements = new String[pairList.size()];
       MeasurementSchema[] measurementSchemas = new 
MeasurementSchema[pairList.size()];
@@ -354,14 +354,7 @@ public class InsertRowStatement extends 
InsertBaseStatement implements ISchemaVa
 
   @Override
   public void validateDeviceSchema(boolean isAligned) {
-    if (this.isAligned != isAligned) {
-      throw new SemanticException(
-          new AlignedTimeseriesException(
-              String.format(
-                  "timeseries under this device are%s aligned, " + "please use 
%s interface",
-                  isAligned ? "" : " not", isAligned ? "aligned" : 
"non-aligned"),
-              devicePath.getFullPath()));
-    }
+    this.isAligned = isAligned;
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
index d515b5168ca..8223b3e8641 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertTabletStatement.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.mpp.plan.statement.crud;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
-import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
 import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
@@ -221,6 +220,7 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
       statement.setTimes(this.times);
       statement.setDevicePath(entry.getKey());
       statement.setRowCount(this.rowCount);
+      statement.setAligned(this.isAligned);
       Object[] columns = new Object[pairList.size()];
       String[] measurements = new String[pairList.size()];
       BitMap[] bitMaps = new BitMap[pairList.size()];
@@ -323,14 +323,7 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
 
   @Override
   public void validateDeviceSchema(boolean isAligned) {
-    if (this.isAligned != isAligned) {
-      throw new SemanticException(
-          new AlignedTimeseriesException(
-              String.format(
-                  "timeseries under this device are%s aligned, " + "please use 
%s interface",
-                  isAligned ? "" : " not", isAligned ? "aligned" : 
"non-aligned"),
-              devicePath.getFullPath()));
-    }
+    this.isAligned = isAligned;
   }
 
   @Override

Reply via email to