This is an automated email from the ASF dual-hosted git repository.

zyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bf958b952ca Fix duplicate insertion bug caused by view (#10101)
bf958b952ca is described below

commit bf958b952caba8707fb98a8ad2f7b496c0679eea
Author: 橘子 <[email protected]>
AuthorDate: Sat Jun 10 00:27:40 2023 +0800

    Fix duplicate insertion bug caused by view (#10101)
---
 .../metadata/DuplicateInsertException.java         | 35 ++++++++++++++++++++++
 .../plan/statement/crud/InsertBaseStatement.java   | 27 +++++++++++++++++
 .../crud/InsertMultiTabletsStatement.java          | 31 +++++++++++++++++++
 .../crud/InsertRowsOfOneDeviceStatement.java       |  1 +
 .../plan/statement/crud/InsertRowsStatement.java   | 31 +++++++++++++++++++
 5 files changed, 125 insertions(+)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/exception/metadata/DuplicateInsertException.java
 
b/server/src/main/java/org/apache/iotdb/db/exception/metadata/DuplicateInsertException.java
new file mode 100644
index 00000000000..217fc710add
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/exception/metadata/DuplicateInsertException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.exception.metadata;
+
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class DuplicateInsertException extends MetadataException {
+
+  private static final String DUPLICATE_INSERTION_WRONG_MESSAGE =
+      "Insertion is illegal because measurement [%s] under device [%s] is 
duplicate.";
+
+  public DuplicateInsertException(String device, String measurement) {
+    super(
+        String.format(DUPLICATE_INSERTION_WRONG_MESSAGE, measurement, device),
+        TSStatusCode.UNSUPPORTED_OPERATION.getStatusCode(),
+        true);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
index 708b2be09e3..181a072bedb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertBaseStatement.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.DuplicateInsertException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
@@ -34,8 +35,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public abstract class InsertBaseStatement extends Statement {
@@ -317,7 +320,31 @@ public abstract class InsertBaseStatement extends 
Statement {
             }
           });
     }
+    // check this map, ensure that all time series (measurements in each 
device) only appear once
+    validateMapFromDeviceToMeasurement(mapFromDeviceToMeasurementAndIndex);
     return mapFromDeviceToMeasurementAndIndex;
   }
+
+  protected static void validateMapFromDeviceToMeasurement(
+      Map<PartialPath, List<Pair<String, Integer>>> map) {
+    if (map == null) {
+      return;
+    }
+    for (Map.Entry<PartialPath, List<Pair<String, Integer>>> entry : 
map.entrySet()) {
+      List<Pair<String, Integer>> measurementList = entry.getValue();
+      if (measurementList.size() <= 1) {
+        continue;
+      }
+      Set<String> measurementSet = new HashSet<>();
+      for (Pair<String, Integer> thisPair : measurementList) {
+        boolean measurementNotExists = measurementSet.add(thisPair.left);
+        if (!measurementNotExists) {
+          PartialPath devicePath = entry.getKey();
+          throw new RuntimeException(
+              new DuplicateInsertException(devicePath.getFullPath(), 
thisPair.left));
+        }
+      }
+    }
+  }
   // endregion
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
index a43bfc7afef..3d9b4f2bb14 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertMultiTabletsStatement.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.plan.statement.crud;
 
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.metadata.DuplicateInsertException;
 import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
 import org.apache.iotdb.db.mpp.plan.statement.StatementType;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
@@ -27,7 +28,11 @@ import 
org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public class InsertMultiTabletsStatement extends InsertBaseStatement {
@@ -138,8 +143,34 @@ public class InsertMultiTabletsStatement extends 
InsertBaseStatement {
     if (!needSplit) {
       return this;
     }
+    validateInsertTabletList(mergedList);
     InsertMultiTabletsStatement splitResult = new 
InsertMultiTabletsStatement();
     splitResult.setInsertTabletStatementList(mergedList);
     return splitResult;
   }
+
+  /**
+   * Check given InsertRowStatement list, make sure no duplicate time series 
in those statements. If
+   * there are duplicate measurements, throw DuplicateInsertException.
+   */
+  public static void validateInsertTabletList(List<InsertTabletStatement> 
insertTabletList) {
+    if (insertTabletList == null) {
+      return;
+    }
+    Map<String, Set<String>> mapFromDeviceToMeasurements = new HashMap<>();
+    for (InsertTabletStatement insertTablet : insertTabletList) {
+      String device = insertTablet.devicePath.getFullPath();
+      Set<String> measurementSet = mapFromDeviceToMeasurements.get(device);
+      if (measurementSet == null) {
+        measurementSet = new HashSet<>();
+      }
+      for (String measurement : insertTablet.measurements) {
+        boolean notExist = measurementSet.add(measurement);
+        if (!notExist) {
+          throw new RuntimeException(new DuplicateInsertException(device, 
measurement));
+        }
+      }
+      mapFromDeviceToMeasurements.put(device, measurementSet);
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
index a65c68eb331..4fc20487422 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsOfOneDeviceStatement.java
@@ -151,6 +151,7 @@ public class InsertRowsOfOneDeviceStatement extends 
InsertBaseStatement {
         List<InsertRowStatement> childSplitResult = child.getSplitList();
         mergedList.addAll(childSplitResult);
       }
+      InsertRowsStatement.validateInsertRowList(mergedList);
       InsertRowsStatement splitResult = new InsertRowsStatement();
       splitResult.setInsertRowStatementList(mergedList);
       return splitResult;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
index 8528c64c307..f639c95455e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/InsertRowsStatement.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.plan.statement.crud;
 
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.exception.metadata.DuplicateInsertException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.plan.analyze.schema.ISchemaValidation;
 import org.apache.iotdb.db.mpp.plan.statement.StatementType;
@@ -28,7 +29,11 @@ import 
org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public class InsertRowsStatement extends InsertBaseStatement {
@@ -149,8 +154,34 @@ public class InsertRowsStatement extends 
InsertBaseStatement {
     if (!needSplit) {
       return this;
     }
+    validateInsertRowList(mergedList);
     InsertRowsStatement splitResult = new InsertRowsStatement();
     splitResult.setInsertRowStatementList(mergedList);
     return splitResult;
   }
+
+  /**
+   * Check given InsertRowStatement list, make sure no duplicate time series 
in those statements. If
+   * there are duplicate measurements, throw DuplicateInsertException.
+   */
+  public static void validateInsertRowList(List<InsertRowStatement> 
insertRowList) {
+    if (insertRowList == null) {
+      return;
+    }
+    Map<String, Set<String>> mapFromDeviceToMeasurements = new HashMap<>();
+    for (InsertRowStatement insertRow : insertRowList) {
+      String device = insertRow.devicePath.getFullPath();
+      Set<String> measurementSet = mapFromDeviceToMeasurements.get(device);
+      if (measurementSet == null) {
+        measurementSet = new HashSet<>();
+      }
+      for (String measurement : insertRow.measurements) {
+        boolean notExist = measurementSet.add(measurement);
+        if (!notExist) {
+          throw new RuntimeException(new DuplicateInsertException(device, 
measurement));
+        }
+      }
+      mapFromDeviceToMeasurements.put(device, measurementSet);
+    }
+  }
 }

Reply via email to