This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch checkout_dup_measurements in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4791a5d1564be9c031701596371aeb155d443a53 Author: HTHou <[email protected]> AuthorDate: Fri May 17 19:15:06 2024 +0800 Check duplicated measurements for all insert APIs --- .../iotdb/session/it/IoTDBSessionSimpleIT.java | 134 +++++++++++++++++++++ .../queryengine/plan/analyze/AnalyzeVisitor.java | 6 +- .../plan/statement/crud/InsertBaseStatement.java | 17 ++- .../crud/InsertMultiTabletsStatement.java | 7 ++ .../plan/statement/crud/InsertRowStatement.java | 11 ++ .../crud/InsertRowsOfOneDeviceStatement.java | 7 ++ .../plan/statement/crud/InsertRowsStatement.java | 7 ++ .../plan/statement/crud/InsertStatement.java | 29 ----- .../plan/statement/crud/InsertTabletStatement.java | 11 ++ 9 files changed, 198 insertions(+), 31 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java index e9c1d05d3a6..0a89ee56ded 100644 --- a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java @@ -652,6 +652,34 @@ public class IoTDBSessionSimpleIT { } } + @Test + @Category({LocalStandaloneIT.class, ClusterIT.class}) + public void insertTabletWithDuplicatedMeasurementsTest() { + try (ISession session = EnvFactory.getEnv().getSessionConnection()) { + List<MeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s0", TSDataType.DOUBLE, TSEncoding.RLE)); + schemaList.add(new MeasurementSchema("s0", TSDataType.DOUBLE, TSEncoding.RLE)); + schemaList.add(new MeasurementSchema("s0", TSDataType.DOUBLE, TSEncoding.RLE)); + + Tablet tablet = new Tablet("root.sg1.d1", schemaList); + for (long time = 0L; time < 10L; time++) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, time); + + tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, (double) time); + tablet.addValue(schemaList.get(1).getMeasurementId(), rowIndex, (double) time); + tablet.addValue(schemaList.get(2).getMeasurementId(), rowIndex, (double) time); + } + + if (tablet.rowSize != 0) { + session.insertTablet(tablet); + tablet.reset(); + } + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("Insertion contains duplicated measurement: s0")); + } + } + @Test @Category({LocalStandaloneIT.class, ClusterIT.class}) public void createTimeSeriesWithDoubleTicksTest() { @@ -983,6 +1011,112 @@ public class IoTDBSessionSimpleIT { } } + @Test + @Category({LocalStandaloneIT.class, ClusterIT.class}) + public void insertOneDeviceRecordsWithDuplicatedMeasurementsTest() { + try (ISession session = EnvFactory.getEnv().getSessionConnection()) { + List<Long> times = new ArrayList<>(); + List<List<String>> measurements = new ArrayList<>(); + List<List<TSDataType>> datatypes = new ArrayList<>(); + List<List<Object>> values = new ArrayList<>(); + + addLine( + times, + measurements, + datatypes, + values, + 3L, + "s1", + "s2", + TSDataType.INT32, + TSDataType.INT32, + 1, + 2); + addLine( + times, + measurements, + datatypes, + values, + 2L, + "s2", + "s2", + TSDataType.INT32, + TSDataType.INT32, + 3, + 4); + addLine( + times, + measurements, + datatypes, + values, + 1L, + "s4", + "s5", + TSDataType.FLOAT, + TSDataType.BOOLEAN, + 5.0f, + Boolean.TRUE); + session.insertRecordsOfOneDevice("root.sg.d1", times, measurements, datatypes, values); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("Insertion contains duplicated measurement: s2")); + } + } + + @Test + @Category({LocalStandaloneIT.class, ClusterIT.class}) + public void insertRecordsWithDuplicatedMeasurementsTest() { + try (ISession session = EnvFactory.getEnv().getSessionConnection()) { + List<Long> times = new ArrayList<>(); + List<List<String>> measurements = new ArrayList<>(); + List<List<TSDataType>> datatypes = new ArrayList<>(); + List<List<Object>> values = new ArrayList<>(); + List<String> devices = new ArrayList<>(); + + devices.add("root.sg.d1"); + addLine( + times, + measurements, + datatypes, + values, + 3L, + "s1", + "s2", + TSDataType.INT32, + TSDataType.INT32, + 1, + 2); + devices.add("root.sg.d2"); + addLine( + times, + measurements, + datatypes, + values, + 2L, + "s2", + "s2", + TSDataType.INT32, + TSDataType.INT32, + 3, + 4); + devices.add("root.sg.d3"); + addLine( + times, + measurements, + datatypes, + values, + 1L, + "s4", + "s5", + TSDataType.FLOAT, + TSDataType.BOOLEAN, + 5.0f, + Boolean.TRUE); + session.insertRecords(devices, times, measurements, datatypes, values); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("Insertion contains duplicated measurement: s2")); + } + } + @Test @Category({LocalStandaloneIT.class, ClusterIT.class}) public void insertStringRecordsOfOneDeviceWithOrderTest() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index a7448a93529..ddda0e94332 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -2212,7 +2212,6 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> @Override public Analysis visitInsert(InsertStatement insertStatement, MPPQueryContext context) { context.setQueryType(QueryType.WRITE); - insertStatement.semanticCheck(); long[] timeArray = insertStatement.getTimes(); PartialPath devicePath = insertStatement.getDevice(); String[] measurementList = insertStatement.getMeasurementList(); @@ -2541,6 +2540,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> public Analysis visitInsertTablet( InsertTabletStatement insertTabletStatement, MPPQueryContext context) { context.setQueryType(QueryType.WRITE); + insertTabletStatement.semanticCheck(); Analysis analysis = new Analysis(); validateSchema(analysis, insertTabletStatement, context); InsertBaseStatement realStatement = removeLogicalView(analysis, insertTabletStatement); @@ -2572,6 +2572,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> @Override public Analysis visitInsertRow(InsertRowStatement insertRowStatement, MPPQueryContext context) { context.setQueryType(QueryType.WRITE); + insertRowStatement.semanticCheck(); Analysis analysis = new Analysis(); validateSchema(analysis, insertRowStatement, context); InsertBaseStatement realInsertStatement = removeLogicalView(analysis, insertRowStatement); @@ -2622,6 +2623,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> public Analysis visitInsertRows( InsertRowsStatement insertRowsStatement, MPPQueryContext context) { context.setQueryType(QueryType.WRITE); + insertRowsStatement.semanticCheck(); Analysis analysis = new Analysis(); validateSchema(analysis, insertRowsStatement, context); InsertRowsStatement realInsertRowsStatement = @@ -2661,6 +2663,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> public Analysis visitInsertMultiTablets( InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext context) { context.setQueryType(QueryType.WRITE); + insertMultiTabletsStatement.semanticCheck(); Analysis analysis = new Analysis(); validateSchema(analysis, insertMultiTabletsStatement, context); InsertMultiTabletsStatement realStatement = @@ -2678,6 +2681,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> public Analysis visitInsertRowsOfOneDevice( InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement, MPPQueryContext context) { context.setQueryType(QueryType.WRITE); + insertRowsOfOneDeviceStatement.semanticCheck(); Analysis analysis = new Analysis(); validateSchema(analysis, insertRowsOfOneDeviceStatement, context); InsertBaseStatement realInsertStatement = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java index 605f72fd831..10f7b03dfc0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java @@ -199,6 +199,21 @@ public abstract class InsertBaseStatement extends Statement { public abstract Object getFirstValueOfIndex(int index); + public void semanticCheck() { + Set<String> deduplicatedMeasurements = new HashSet<>(); + for (String measurement : measurements) { + if (measurement == null || measurement.isEmpty()) { + throw new SemanticException( + "Measurement contains null or empty string: " + Arrays.toString(measurements)); + } + if (deduplicatedMeasurements.contains(measurement)) { + throw new SemanticException("Insertion contains duplicated measurement: " + measurement); + } else { + deduplicatedMeasurements.add(measurement); + } + } + } + // region partial insert /** * Mark failed measurement, measurements[index], dataTypes[index] and values/columns[index] would @@ -310,7 +325,7 @@ public abstract class InsertBaseStatement extends Statement { } } // construct map from device to measurements and record the index of its measurement - // schemaengine + // schema Map<PartialPath, List<Pair<String, Integer>>> mapFromDeviceToMeasurementAndIndex = new HashMap<>(); for (int i = 0; i < this.measurements.length; i++) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java index bbc00e1975b..6601af62947 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java @@ -110,6 +110,13 @@ public class InsertMultiTabletsStatement extends InsertBaseStatement { return false; } + @Override + public void semanticCheck() { + for (InsertTabletStatement insertTabletStatement : insertTabletStatementList) { + insertTabletStatement.semanticCheck(); + } + } + @Override public long getMinTime() { throw new NotImplementedException(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java index e6403c38577..d5a4cf765e6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java @@ -255,6 +255,17 @@ public class InsertRowStatement extends InsertBaseStatement implements ISchemaVa values[index] = null; } + @Override + public void semanticCheck() { + super.semanticCheck(); + if (measurements.length != values.length) { + throw new SemanticException( + String.format( + "the measurementList's size %d is not consistent with the valueList's size %d", + measurements.length, values.length)); + } + } + public boolean isNeedSplit() { return hasLogicalViewNeedProcess(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java index 9725e54c381..e67ef7c65c2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java @@ -129,6 +129,13 @@ public class InsertRowsOfOneDeviceStatement extends InsertBaseStatement { return false; } + @Override + public void semanticCheck() { + for (InsertRowStatement insertRowStatement : insertRowStatementList) { + insertRowStatement.semanticCheck(); + } + } + @Override public long getMinTime() { throw new NotImplementedException(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java index c29610d25f4..669165798bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java @@ -128,6 +128,13 @@ public class InsertRowsStatement extends InsertBaseStatement { return false; } + @Override + public void semanticCheck() { + for (InsertRowStatement insertRowStatement : insertRowStatementList) { + insertRowStatement.semanticCheck(); + } + } + @Override public long getMinTime() { throw new NotImplementedException(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertStatement.java index 3e0a9914030..2e366c46e47 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertStatement.java @@ -23,17 +23,13 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.auth.entity.PrivilegeType; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.auth.AuthorityChecker; -import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.StatementType; import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; import org.apache.iotdb.rpc.TSStatusCode; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; import java.util.List; -import java.util.Set; /** this class extends {@code Statement} and process insert statement. */ public class InsertStatement extends Statement { @@ -118,29 +114,4 @@ public class InsertStatement extends Statement { public <R, C> R accept(StatementVisitor<R, C> visitor, C context) { return visitor.visitInsert(this, context); } - - public void semanticCheck() { - Set<String> deduplicatedMeasurements = new HashSet<>(); - for (String measurement : measurementList) { - if (measurement == null || measurement.isEmpty()) { - throw new SemanticException( - "Measurement contains null or empty string: " + Arrays.toString(measurementList)); - } - if (deduplicatedMeasurements.contains(measurement)) { - throw new SemanticException("Insertion contains duplicated measurement: " + measurement); - } else { - deduplicatedMeasurements.add(measurement); - } - } - - int measurementsNum = measurementList.length; - for (int i = 0; i < times.length; i++) { - if (measurementsNum != valuesList.get(i).length) { - throw new SemanticException( - String.format( - "the measurementList's size %d is not consistent with the valueList's size %d", - measurementsNum, valuesList.get(i).length)); - } - } - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java index 8d9bdf8d971..a5dd5fbf511 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java @@ -194,6 +194,17 @@ public class InsertTabletStatement extends InsertBaseStatement implements ISchem columns[index] = null; } + @Override + public void semanticCheck() { + super.semanticCheck(); + if (measurements.length != columns.length) { + throw new SemanticException( + String.format( + "the measurementList's size %d is not consistent with the columnList's size %d", + measurements.length, columns.length)); + } + } + public boolean isNeedSplit() { return hasLogicalViewNeedProcess(); }
