This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d81972227d9 Check duplicated measurements in one row for all insert
APIs
d81972227d9 is described below
commit d81972227d9722b3565b4afc6fa94fc9214e0f9c
Author: Haonan <[email protected]>
AuthorDate: Mon May 20 16:18:46 2024 +0800
Check duplicated measurements in one row for all insert APIs
---
.../iotdb/session/it/IoTDBSessionSimpleIT.java | 134 +++++++++++++++++++++
.../queryengine/plan/analyze/AnalyzeVisitor.java | 14 +--
.../db/queryengine/plan/parser/ASTVisitor.java | 6 +-
.../plan/statement/crud/InsertBaseStatement.java | 17 ++-
.../crud/InsertMultiTabletsStatement.java | 7 ++
.../plan/statement/crud/InsertRowStatement.java | 11 ++
.../crud/InsertRowsOfOneDeviceStatement.java | 7 ++
.../plan/statement/crud/InsertRowsStatement.java | 7 ++
.../plan/statement/crud/InsertStatement.java | 35 +-----
.../plan/statement/crud/InsertTabletStatement.java | 11 ++
10 files changed, 206 insertions(+), 43 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
index e9c1d05d3a6..0a89ee56ded 100644
---
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionSimpleIT.java
@@ -652,6 +652,34 @@ public class IoTDBSessionSimpleIT {
}
}
+ @Test
+ @Category({LocalStandaloneIT.class, ClusterIT.class})
+ public void insertTabletWithDuplicatedMeasurementsTest() {
+ try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ List<MeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("s0", TSDataType.DOUBLE,
TSEncoding.RLE));
+ schemaList.add(new MeasurementSchema("s0", TSDataType.DOUBLE,
TSEncoding.RLE));
+ schemaList.add(new MeasurementSchema("s0", TSDataType.DOUBLE,
TSEncoding.RLE));
+
+ Tablet tablet = new Tablet("root.sg1.d1", schemaList);
+ for (long time = 0L; time < 10L; time++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, time);
+
+ tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex,
(double) time);
+ tablet.addValue(schemaList.get(1).getMeasurementId(), rowIndex,
(double) time);
+ tablet.addValue(schemaList.get(2).getMeasurementId(), rowIndex,
(double) time);
+ }
+
+ if (tablet.rowSize != 0) {
+ session.insertTablet(tablet);
+ tablet.reset();
+ }
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("Insertion contains duplicated
measurement: s0"));
+ }
+ }
+
@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void createTimeSeriesWithDoubleTicksTest() {
@@ -983,6 +1011,112 @@ public class IoTDBSessionSimpleIT {
}
}
+ @Test
+ @Category({LocalStandaloneIT.class, ClusterIT.class})
+ public void insertOneDeviceRecordsWithDuplicatedMeasurementsTest() {
+ try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ List<Long> times = new ArrayList<>();
+ List<List<String>> measurements = new ArrayList<>();
+ List<List<TSDataType>> datatypes = new ArrayList<>();
+ List<List<Object>> values = new ArrayList<>();
+
+ addLine(
+ times,
+ measurements,
+ datatypes,
+ values,
+ 3L,
+ "s1",
+ "s2",
+ TSDataType.INT32,
+ TSDataType.INT32,
+ 1,
+ 2);
+ addLine(
+ times,
+ measurements,
+ datatypes,
+ values,
+ 2L,
+ "s2",
+ "s2",
+ TSDataType.INT32,
+ TSDataType.INT32,
+ 3,
+ 4);
+ addLine(
+ times,
+ measurements,
+ datatypes,
+ values,
+ 1L,
+ "s4",
+ "s5",
+ TSDataType.FLOAT,
+ TSDataType.BOOLEAN,
+ 5.0f,
+ Boolean.TRUE);
+ session.insertRecordsOfOneDevice("root.sg.d1", times, measurements,
datatypes, values);
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("Insertion contains duplicated
measurement: s2"));
+ }
+ }
+
+ @Test
+ @Category({LocalStandaloneIT.class, ClusterIT.class})
+ public void insertRecordsWithDuplicatedMeasurementsTest() {
+ try (ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ List<Long> times = new ArrayList<>();
+ List<List<String>> measurements = new ArrayList<>();
+ List<List<TSDataType>> datatypes = new ArrayList<>();
+ List<List<Object>> values = new ArrayList<>();
+ List<String> devices = new ArrayList<>();
+
+ devices.add("root.sg.d1");
+ addLine(
+ times,
+ measurements,
+ datatypes,
+ values,
+ 3L,
+ "s1",
+ "s2",
+ TSDataType.INT32,
+ TSDataType.INT32,
+ 1,
+ 2);
+ devices.add("root.sg.d2");
+ addLine(
+ times,
+ measurements,
+ datatypes,
+ values,
+ 2L,
+ "s2",
+ "s2",
+ TSDataType.INT32,
+ TSDataType.INT32,
+ 3,
+ 4);
+ devices.add("root.sg.d3");
+ addLine(
+ times,
+ measurements,
+ datatypes,
+ values,
+ 1L,
+ "s4",
+ "s5",
+ TSDataType.FLOAT,
+ TSDataType.BOOLEAN,
+ 5.0f,
+ Boolean.TRUE);
+ session.insertRecords(devices, times, measurements, datatypes, values);
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains("Insertion contains duplicated
measurement: s2"));
+ }
+ }
+
@Test
@Category({LocalStandaloneIT.class, ClusterIT.class})
public void insertStringRecordsOfOneDeviceWithOrderTest() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index a7448a93529..15d05e3430b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -2212,7 +2212,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
@Override
public Analysis visitInsert(InsertStatement insertStatement, MPPQueryContext
context) {
context.setQueryType(QueryType.WRITE);
- insertStatement.semanticCheck();
long[] timeArray = insertStatement.getTimes();
PartialPath devicePath = insertStatement.getDevice();
String[] measurementList = insertStatement.getMeasurementList();
@@ -2223,9 +2222,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
insertRowStatement.setTime(timeArray[0]);
insertRowStatement.setMeasurements(measurementList);
insertRowStatement.setDataTypes(new TSDataType[measurementList.length]);
- Object[] values = new Object[measurementList.length];
- System.arraycopy(insertStatement.getValuesList().get(0), 0, values, 0,
values.length);
- insertRowStatement.setValues(values);
+ insertRowStatement.setValues(insertStatement.getValuesList().get(0));
insertRowStatement.setNeedInferType(true);
insertRowStatement.setAligned(insertStatement.isAligned());
return insertRowStatement.accept(this, context);
@@ -2257,9 +2254,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
statement.setTime(timeArray[i]);
TSDataType[] dataTypes = new TSDataType[measurementList.length];
statement.setDataTypes(dataTypes);
- Object[] values = new Object[measurementList.length];
- System.arraycopy(insertStatement.getValuesList().get(i), 0, values, 0,
values.length);
- statement.setValues(values);
+ statement.setValues(insertStatement.getValuesList().get(i));
statement.setAligned(insertStatement.isAligned());
statement.setNeedInferType(true);
insertRowStatementList.add(statement);
@@ -2541,6 +2536,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
public Analysis visitInsertTablet(
InsertTabletStatement insertTabletStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
+ insertTabletStatement.semanticCheck();
Analysis analysis = new Analysis();
validateSchema(analysis, insertTabletStatement, context);
InsertBaseStatement realStatement = removeLogicalView(analysis,
insertTabletStatement);
@@ -2572,6 +2568,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
@Override
public Analysis visitInsertRow(InsertRowStatement insertRowStatement,
MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
+ insertRowStatement.semanticCheck();
Analysis analysis = new Analysis();
validateSchema(analysis, insertRowStatement, context);
InsertBaseStatement realInsertStatement = removeLogicalView(analysis,
insertRowStatement);
@@ -2622,6 +2619,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
public Analysis visitInsertRows(
InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
+ insertRowsStatement.semanticCheck();
Analysis analysis = new Analysis();
validateSchema(analysis, insertRowsStatement, context);
InsertRowsStatement realInsertRowsStatement =
@@ -2661,6 +2659,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
public Analysis visitInsertMultiTablets(
InsertMultiTabletsStatement insertMultiTabletsStatement, MPPQueryContext
context) {
context.setQueryType(QueryType.WRITE);
+ insertMultiTabletsStatement.semanticCheck();
Analysis analysis = new Analysis();
validateSchema(analysis, insertMultiTabletsStatement, context);
InsertMultiTabletsStatement realStatement =
@@ -2678,6 +2677,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
public Analysis visitInsertRowsOfOneDevice(
InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement,
MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
+ insertRowsOfOneDeviceStatement.semanticCheck();
Analysis analysis = new Analysis();
validateSchema(analysis, insertRowsOfOneDeviceStatement, context);
InsertBaseStatement realInsertStatement =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index 3d3efc1b8c3..201b7e7302f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -1885,13 +1885,13 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
if (timeIndex == -1 && rows.size() != 1) {
throw new SemanticException("need timestamps when insert multi rows");
}
- List<String[]> valuesList = new ArrayList<>();
+ List<Object[]> valuesList = new ArrayList<>();
long[] timeArray = new long[rows.size()];
for (int i = 0, size = rows.size(); i < size; i++) {
IoTDBSqlParser.RowContext row = rows.get(i);
// parse timestamp
long timestamp;
- List<String> valueList = new ArrayList<>();
+ List<Object> valueList = new ArrayList<>();
// using now() instead
if (timeIndex == -1) {
timestamp = CommonDateTimeUtils.currentTime();
@@ -1913,7 +1913,7 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
}
}
- valuesList.add(valueList.toArray(new String[0]));
+ valuesList.add(valueList.toArray(new Object[0]));
}
insertStatement.setTimes(timeArray);
insertStatement.setValuesList(valuesList);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
index 605f72fd831..10f7b03dfc0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java
@@ -199,6 +199,21 @@ public abstract class InsertBaseStatement extends
Statement {
public abstract Object getFirstValueOfIndex(int index);
+ public void semanticCheck() {
+ Set<String> deduplicatedMeasurements = new HashSet<>();
+ for (String measurement : measurements) {
+ if (measurement == null || measurement.isEmpty()) {
+ throw new SemanticException(
+ "Measurement contains null or empty string: " +
Arrays.toString(measurements));
+ }
+ if (deduplicatedMeasurements.contains(measurement)) {
+ throw new SemanticException("Insertion contains duplicated
measurement: " + measurement);
+ } else {
+ deduplicatedMeasurements.add(measurement);
+ }
+ }
+ }
+
// region partial insert
/**
* Mark failed measurement, measurements[index], dataTypes[index] and
values/columns[index] would
@@ -310,7 +325,7 @@ public abstract class InsertBaseStatement extends Statement
{
}
}
// construct map from device to measurements and record the index of its
measurement
- // schemaengine
+ // schema
Map<PartialPath, List<Pair<String, Integer>>>
mapFromDeviceToMeasurementAndIndex =
new HashMap<>();
for (int i = 0; i < this.measurements.length; i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
index bbc00e1975b..6601af62947 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertMultiTabletsStatement.java
@@ -110,6 +110,13 @@ public class InsertMultiTabletsStatement extends
InsertBaseStatement {
return false;
}
+ @Override
+ public void semanticCheck() {
+ for (InsertTabletStatement insertTabletStatement :
insertTabletStatementList) {
+ insertTabletStatement.semanticCheck();
+ }
+ }
+
@Override
public long getMinTime() {
throw new NotImplementedException();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
index e6403c38577..d5a4cf765e6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java
@@ -255,6 +255,17 @@ public class InsertRowStatement extends
InsertBaseStatement implements ISchemaVa
values[index] = null;
}
+ @Override
+ public void semanticCheck() {
+ super.semanticCheck();
+ if (measurements.length != values.length) {
+ throw new SemanticException(
+ String.format(
+ "the measurementList's size %d is not consistent with the
valueList's size %d",
+ measurements.length, values.length));
+ }
+ }
+
public boolean isNeedSplit() {
return hasLogicalViewNeedProcess();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
index 9725e54c381..e67ef7c65c2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
@@ -129,6 +129,13 @@ public class InsertRowsOfOneDeviceStatement extends
InsertBaseStatement {
return false;
}
+ @Override
+ public void semanticCheck() {
+ for (InsertRowStatement insertRowStatement : insertRowStatementList) {
+ insertRowStatement.semanticCheck();
+ }
+ }
+
@Override
public long getMinTime() {
throw new NotImplementedException();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java
index c29610d25f4..669165798bc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowsStatement.java
@@ -128,6 +128,13 @@ public class InsertRowsStatement extends
InsertBaseStatement {
return false;
}
+ @Override
+ public void semanticCheck() {
+ for (InsertRowStatement insertRowStatement : insertRowStatementList) {
+ insertRowStatement.semanticCheck();
+ }
+ }
+
@Override
public long getMinTime() {
throw new NotImplementedException();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertStatement.java
index 3e0a9914030..f74e8682344 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertStatement.java
@@ -23,17 +23,13 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.auth.AuthorityChecker;
-import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import org.apache.iotdb.rpc.TSStatusCode;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
/** this class extends {@code Statement} and process insert statement. */
public class InsertStatement extends Statement {
@@ -43,7 +39,7 @@ public class InsertStatement extends Statement {
private long[] times;
private String[] measurementList;
- private List<String[]> valuesList;
+ private List<Object[]> valuesList;
private boolean isAligned;
@@ -90,11 +86,11 @@ public class InsertStatement extends Statement {
this.measurementList = measurementList;
}
- public List<String[]> getValuesList() {
+ public List<Object[]> getValuesList() {
return valuesList;
}
- public void setValuesList(List<String[]> valuesList) {
+ public void setValuesList(List<Object[]> valuesList) {
this.valuesList = valuesList;
}
@@ -118,29 +114,4 @@ public class InsertStatement extends Statement {
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitInsert(this, context);
}
-
- public void semanticCheck() {
- Set<String> deduplicatedMeasurements = new HashSet<>();
- for (String measurement : measurementList) {
- if (measurement == null || measurement.isEmpty()) {
- throw new SemanticException(
- "Measurement contains null or empty string: " +
Arrays.toString(measurementList));
- }
- if (deduplicatedMeasurements.contains(measurement)) {
- throw new SemanticException("Insertion contains duplicated
measurement: " + measurement);
- } else {
- deduplicatedMeasurements.add(measurement);
- }
- }
-
- int measurementsNum = measurementList.length;
- for (int i = 0; i < times.length; i++) {
- if (measurementsNum != valuesList.get(i).length) {
- throw new SemanticException(
- String.format(
- "the measurementList's size %d is not consistent with the
valueList's size %d",
- measurementsNum, valuesList.get(i).length));
- }
- }
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index 8d9bdf8d971..a5dd5fbf511 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -194,6 +194,17 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
columns[index] = null;
}
+ @Override
+ public void semanticCheck() {
+ super.semanticCheck();
+ if (measurements.length != columns.length) {
+ throw new SemanticException(
+ String.format(
+ "the measurementList's size %d is not consistent with the
columnList's size %d",
+ measurements.length, columns.length));
+ }
+ }
+
public boolean isNeedSplit() {
return hasLogicalViewNeedProcess();
}