This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.10
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/rel/0.10 by this push:
     new 509aa01  fix deadlock of insert and show latest timeseries
509aa01 is described below

commit 509aa015808a04586523195d8ae0462af3ece7e9
Author: qiaojialin <[email protected]>
AuthorDate: Fri Jul 10 19:43:28 2020 +0800

    fix deadlock of insert and show latest timeseries
---
 .../engine/storagegroup/StorageGroupProcessor.java | 44 +++++++---------------
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |  2 +
 .../iotdb/db/qp/physical/crud/InsertPlan.java      | 11 ++++++
 .../db/qp/physical/crud/InsertTabletPlan.java      | 11 ++++++
 4 files changed, 38 insertions(+), 30 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index b82bcce..eef78cf 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -761,21 +761,13 @@ public class StorageGroupProcessor {
 
   private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long 
latestFlushedTime)
       throws WriteProcessException {
-    MNode node = null;
-    try {
-      MManager manager = MManager.getInstance();
-      node = 
manager.getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId());
-      String[] measurementList = plan.getMeasurements();
-      for (int i = 0; i < measurementList.length; i++) {
-        // Update cached last value with high priority
-        ((LeafMNode) manager.getChild(node, measurementList[i]))
-            .updateCachedLast(plan.composeLastTimeValuePair(i), true, 
latestFlushedTime);
-      }
-    } catch (MetadataException e) {
-      throw new WriteProcessException(e);
-    } finally {
+    MNode node = plan.getDeviceMNode();
+    String[] measurementList = plan.getMeasurements();
+    for (int i = 0; i < measurementList.length; i++) {
+      // Update cached last value with high priority
       if (node != null) {
-        ((InternalMNode) node).readUnlock();
+        ((LeafMNode) node.getChild(measurementList[i]))
+            .updateCachedLast(plan.composeLastTimeValuePair(i), true, 
latestFlushedTime);
       }
     }
   }
@@ -813,24 +805,16 @@ public class StorageGroupProcessor {
 
   private void tryToUpdateInsertLastCache(InsertPlan plan, Long 
latestFlushedTime)
       throws WriteProcessException {
-    MNode node = null;
-    try {
-      MManager manager = MManager.getInstance();
-      node = 
manager.getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId());
-      String[] measurementList = plan.getMeasurements();
-      for (int i = 0; i < measurementList.length; i++) {
-        if (plan.getValues()[i] == null) {
-          continue;
-        }
-        // Update cached last value with high priority
-        ((LeafMNode) manager.getChild(node, measurementList[i]))
-            .updateCachedLast(plan.composeTimeValuePair(i), true, 
latestFlushedTime);
+    MNode node = plan.getDeviceMNode();
+    String[] measurementList = plan.getMeasurements();
+    for (int i = 0; i < measurementList.length; i++) {
+      if (plan.getValues()[i] == null) {
+        continue;
       }
-    } catch (MetadataException e) {
-      throw new WriteProcessException(e);
-    } finally {
+      // Update cached last value with high priority
       if (node != null) {
-        ((InternalMNode) node).readUnlock();
+        ((LeafMNode) node.getChild(measurementList[i]))
+            .updateCachedLast(plan.composeTimeValuePair(i), true, 
latestFlushedTime);
       }
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java 
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 27c31e6..efa68f8 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -916,6 +916,7 @@ public class PlanExecutor implements IPlanExecutor {
 
       insertPlan.setMeasurements(measurementList);
       insertPlan.setSchemasAndTransferType(schemas);
+      insertPlan.setDeviceMNode(node);
       StorageEngine.getInstance().insert(insertPlan);
       if (insertPlan.getFailedMeasurements() != null) {
         throw new StorageEngineException(
@@ -1055,6 +1056,7 @@ public class PlanExecutor implements IPlanExecutor {
         measurementList[i] = measurementNode.getName();
       }
       insertTabletPlan.setSchemas(schemas);
+      insertTabletPlan.setDeviceMNode(node);
       return StorageEngine.getInstance().insertTablet(insertTabletPlan);
     } catch (StorageEngineException | MetadataException e) {
       throw new QueryProcessException(e);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
index f617a25..611aa31 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java
@@ -29,6 +29,7 @@ import java.util.Objects;
 import java.util.Set;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -57,6 +58,8 @@ public class InsertPlan extends PhysicalPlan {
   private TSDataType[] types;
   private MeasurementSchema[] schemas;
 
+  private MNode deviceMNode;
+
   // if inferType is false, use the type of values directly
   // if inferType is true, values is String[], and infer types from them
   private boolean inferType = false;
@@ -362,6 +365,14 @@ public class InsertPlan extends PhysicalPlan {
     return failedMeasurements == null ? 0 : failedMeasurements.size();
   }
 
+  public MNode getDeviceMNode() {
+    return deviceMNode;
+  }
+
+  public void setDeviceMNode(MNode deviceMNode) {
+    this.deviceMNode = deviceMNode;
+  }
+
   public TSDataType[] getTypes() {
     return types;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
index 2928116..97e5781 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertTabletPlan.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.iotdb.db.metadata.mnode.MNode;
 import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.utils.QueryDataSetUtils;
@@ -56,6 +57,8 @@ public class InsertTabletPlan extends PhysicalPlan {
   private long[] times; // times should be sorted. It is done in the session 
API.
   private ByteBuffer timeBuffer;
 
+  private MNode deviceMNode;
+
   private Object[] columns;
   private ByteBuffer valueBuffer;
   private Set<Integer> index;
@@ -356,6 +359,14 @@ public class InsertTabletPlan extends PhysicalPlan {
     this.measurements = measurements;
   }
 
+  public MNode getDeviceMNode() {
+    return deviceMNode;
+  }
+
+  public void setDeviceMNode(MNode deviceMNode) {
+    this.deviceMNode = deviceMNode;
+  }
+
   public TSDataType[] getDataTypes() {
     return dataTypes;
   }

Reply via email to