This is an automated email from the ASF dual-hosted git repository.
zyk pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new c8c34ad42d2 [To rel/1.2] Fix duplicate insertion bug caused by view
(#10102)
c8c34ad42d2 is described below
commit c8c34ad42d2d6066572ccce06d168c3510facd10
Author: 橘子 <[email protected]>
AuthorDate: Sat Jun 10 00:27:30 2023 +0800
[To rel/1.2] Fix duplicate insertion bug caused by view (#10102)
---
.../metadata/DuplicateInsertException.java | 35 ++++++++++++++++++++++
.../plan/statement/crud/InsertBaseStatement.java | 27 +++++++++++++++++
.../crud/InsertMultiTabletsStatement.java | 31 +++++++++++++++++++
.../crud/InsertRowsOfOneDeviceStatement.java | 1 +
.../plan/statement/crud/InsertRowsStatement.java | 31 +++++++++++++++++++
5 files changed, 125 insertions(+)
diff --git
a/server/src/main/java/org/apache/iotdb/db/exception/metadata/DuplicateInsertException.java
b/server/src/main/java/org/apache/iotdb/db/exception/metadata/DuplicateInsertException.java
new file mode 100644
index 00000000000..217fc710add
--- /dev/null
+++
b/server/src/main/java/org/apache/iotdb/db/exception/metadata/DuplicateInsertException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.exception.metadata;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class DuplicateInsertException extends MetadataException {
+
+ private static final String DUPLICATE_INSERTION_WRONG_MESSAGE =
+ "Insertion is illegal because measurement [%s] under device [%s] is
duplicate.";
+
+ public DuplicateInsertException(String device, String measurement) {
+ super(
+ String.format(DUPLICATE_INSERTION_WRONG_MESSAGE, measurement, device),
+ TSStatusCode.UNSUPPORTED_OPERATION.getStatusCode(),
+ true);
+ }
+}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
index 708b2be09e3..181a072bedb 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.DuplicateInsertException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
@@ -34,8 +35,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
public abstract class InsertBaseStatement extends Statement {
@@ -317,7 +320,31 @@ public abstract class InsertBaseStatement extends
Statement {
}
});
}
+ // check this map, ensure that all time series (measurements in each
device) only appear once
+ validateMapFromDeviceToMeasurement(mapFromDeviceToMeasurementAndIndex);
return mapFromDeviceToMeasurementAndIndex;
}
+
+ protected static void validateMapFromDeviceToMeasurement(
+ Map<PartialPath, List<Pair<String, Integer>>> map) {
+ if (map == null) {
+ return;
+ }
+ for (Map.Entry<PartialPath, List<Pair<String, Integer>>> entry :
map.entrySet()) {
+ List<Pair<String, Integer>> measurementList = entry.getValue();
+ if (measurementList.size() <= 1) {
+ continue;
+ }
+ Set<String> measurementSet = new HashSet<>();
+ for (Pair<String, Integer> thisPair : measurementList) {
+ boolean measurementNotExists = measurementSet.add(thisPair.left);
+ if (!measurementNotExists) {
+ PartialPath devicePath = entry.getKey();
+ throw new RuntimeException(
+ new DuplicateInsertException(devicePath.getFullPath(),
thisPair.left));
+ }
+ }
+ }
+ }
// endregion
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
index a43bfc7afef..3d9b4f2bb14 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.plan.statement.crud;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.metadata.DuplicateInsertException;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.mpp.plan.statement.StatementType;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
@@ -27,7 +28,11 @@ import
org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
public class InsertMultiTabletsStatement extends InsertBaseStatement {
@@ -138,8 +143,34 @@ public class InsertMultiTabletsStatement extends
InsertBaseStatement {
if (!needSplit) {
return this;
}
+ validateInsertTabletList(mergedList);
InsertMultiTabletsStatement splitResult = new
InsertMultiTabletsStatement();
splitResult.setInsertTabletStatementList(mergedList);
return splitResult;
}
+
+ /**
+ * Check given InsertRowStatement list, make sure no duplicate time series
in those statements. If
+ * there are duplicate measurements, throw DuplicateInsertException.
+ */
+ public static void validateInsertTabletList(List<InsertTabletStatement>
insertTabletList) {
+ if (insertTabletList == null) {
+ return;
+ }
+ Map<String, Set<String>> mapFromDeviceToMeasurements = new HashMap<>();
+ for (InsertTabletStatement insertTablet : insertTabletList) {
+ String device = insertTablet.devicePath.getFullPath();
+ Set<String> measurementSet = mapFromDeviceToMeasurements.get(device);
+ if (measurementSet == null) {
+ measurementSet = new HashSet<>();
+ }
+ for (String measurement : insertTablet.measurements) {
+ boolean notExist = measurementSet.add(measurement);
+ if (!notExist) {
+ throw new RuntimeException(new DuplicateInsertException(device,
measurement));
+ }
+ }
+ mapFromDeviceToMeasurements.put(device, measurementSet);
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
index a65c68eb331..4fc20487422 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
@@ -151,6 +151,7 @@ public class InsertRowsOfOneDeviceStatement extends
InsertBaseStatement {
List<InsertRowStatement> childSplitResult = child.getSplitList();
mergedList.addAll(childSplitResult);
}
+ InsertRowsStatement.validateInsertRowList(mergedList);
InsertRowsStatement splitResult = new InsertRowsStatement();
splitResult.setInsertRowStatementList(mergedList);
return splitResult;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
index 8528c64c307..f639c95455e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.plan.statement.crud;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.metadata.DuplicateInsertException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.mpp.plan.statement.StatementType;
@@ -28,7 +29,11 @@ import
org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
public class InsertRowsStatement extends InsertBaseStatement {
@@ -149,8 +154,34 @@ public class InsertRowsStatement extends
InsertBaseStatement {
if (!needSplit) {
return this;
}
+ validateInsertRowList(mergedList);
InsertRowsStatement splitResult = new InsertRowsStatement();
splitResult.setInsertRowStatementList(mergedList);
return splitResult;
}
+
+ /**
+ * Check given InsertRowStatement list, make sure no duplicate time series
in those statements. If
+ * there are duplicate measurements, throw DuplicateInsertException.
+ */
+ public static void validateInsertRowList(List<InsertRowStatement>
insertRowList) {
+ if (insertRowList == null) {
+ return;
+ }
+ Map<String, Set<String>> mapFromDeviceToMeasurements = new HashMap<>();
+ for (InsertRowStatement insertRow : insertRowList) {
+ String device = insertRow.devicePath.getFullPath();
+ Set<String> measurementSet = mapFromDeviceToMeasurements.get(device);
+ if (measurementSet == null) {
+ measurementSet = new HashSet<>();
+ }
+ for (String measurement : insertRow.measurements) {
+ boolean notExist = measurementSet.add(measurement);
+ if (!notExist) {
+ throw new RuntimeException(new DuplicateInsertException(device,
measurement));
+ }
+ }
+ mapFromDeviceToMeasurements.put(device, measurementSet);
+ }
+ }
}