This is an automated email from the ASF dual-hosted git repository. qiaojialin pushed a commit to branch fix_deadlock in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 099a1afa8322f65d4e1ff7216fa2e867374d1164 Author: qiaojialin <[email protected]> AuthorDate: Fri Jul 10 19:20:08 2020 +0800 do not get MManager.readLock when inserting --- .../main/java/org/apache/iotdb/SessionExample.java | 2 +- .../org/apache/iotdb/db/engine/StorageEngine.java | 8 +-- .../engine/storagegroup/StorageGroupProcessor.java | 62 +++++++--------------- .../org/apache/iotdb/db/metadata/MManager.java | 3 ++ .../iotdb/db/qp/physical/crud/InsertPlan.java | 12 +++++ 5 files changed, 36 insertions(+), 51 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index 3a63fec..06a5f21 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -40,7 +40,7 @@ public class SessionExample { private static Session session; public static void main(String[] args) - throws IoTDBConnectionException, StatementExecutionException, BatchExecutionException { + throws IoTDBConnectionException, StatementExecutionException { session = new Session("127.0.0.1", 6667, "root", "root"); session.open(false); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index 34bd8c0..9249e6c 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -300,8 +300,6 @@ public class StorageEngine implements IService { /** * insert a InsertTabletPlan to a storage group - * - * @return result of each row */ public void insertTablet(InsertTabletPlan insertTabletPlan) throws StorageEngineException, BatchInsertionException { @@ -314,11 +312,7 @@ public class StorageEngine implements IService { } // TODO monitor: update statistics - try { - storageGroupProcessor.insertTablet(insertTabletPlan); - } catch (WriteProcessException e) { - throw new StorageEngineException(e); - } + storageGroupProcessor.insertTablet(insertTabletPlan); } /** diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index a02cbcb..cbd798a 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -632,12 +632,9 @@ public class StorageGroupProcessor { /** * Insert a tablet (rows belonging to the same devices) into this storage group. - * @param insertTabletPlan - * @throws WriteProcessException when update last cache failed * @throws BatchInsertionException if some of the rows failed to be inserted */ - public void insertTablet(InsertTabletPlan insertTabletPlan) throws WriteProcessException, - BatchInsertionException { + public void insertTablet(InsertTabletPlan insertTabletPlan) throws BatchInsertionException { writeLock(); try { TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()]; @@ -780,26 +777,17 @@ public class StorageGroupProcessor { return true; } - private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestFlushedTime) - throws WriteProcessException { - MNode node = null; - try { - MManager manager = IoTDB.metaManager; - node = manager.getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId()); - String[] measurementList = plan.getMeasurements(); - for (int i = 0; i < measurementList.length; i++) { - if (plan.getColumns()[i] == null) { - continue; - } - // Update cached last value with high priority - ((MeasurementMNode) manager.getChild(node, measurementList[i])) - .updateCachedLast(plan.composeLastTimeValuePair(i), true, latestFlushedTime); + private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestFlushedTime) { + MNode node = plan.getDeviceMNode(); + String[] measurementList = plan.getMeasurements(); + for (int i = 0; i < measurementList.length; i++) { + if (plan.getColumns()[i] == null) { + continue; } - } catch (MetadataException e) { - throw new WriteProcessException(e); - } finally { if (node != null) { - node.readUnlock(); + // Update cached last value with high priority + ((MeasurementMNode) node.getChild(measurementList[i])) + .updateCachedLast(plan.composeLastTimeValuePair(i), true, latestFlushedTime); } } } @@ -835,29 +823,17 @@ public class StorageGroupProcessor { } } - private void tryToUpdateInsertLastCache(InsertRowPlan plan, Long latestFlushedTime) - throws WriteProcessException { - MNode node = null; - try { - MManager manager = IoTDB.metaManager; - node = manager.getDeviceNodeWithAutoCreateAndReadLock(plan.getDeviceId()); - String[] measurementList = plan.getMeasurements(); - for (int i = 0; i < measurementList.length; i++) { - if (plan.getValues()[i] == null) { - continue; - } - // Update cached last value with high priority - MNode measurementNode = manager.getChild(node, measurementList[i]); - if (measurementNode != null) { - ((MeasurementMNode) measurementNode) - .updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime); - } + private void tryToUpdateInsertLastCache(InsertRowPlan plan, Long latestFlushedTime) { + MNode node = plan.getDeviceMNode(); + String[] measurementList = plan.getMeasurements(); + for (int i = 0; i < measurementList.length; i++) { + if (plan.getValues()[i] == null) { + continue; } - } catch (MetadataException e) { - // skip last cache update if the local MTree does not contain the schema - } finally { + // Update cached last value with high priority if (node != null) { - node.readUnlock(); + ((MeasurementMNode) node.getChild(measurementList[i])). + updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index c0efdfc..471ab9c 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -1969,6 +1969,9 @@ public class MManager { } } } + + plan.setDeviceMNode(deviceNode); + return schemas; } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java index 2914779..36a1174 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertPlan.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.qp.physical.crud; import java.util.ArrayList; import java.util.List; +import org.apache.iotdb.db.metadata.mnode.MNode; import org.apache.iotdb.db.qp.logical.Operator; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -33,6 +34,9 @@ abstract public class InsertPlan extends PhysicalPlan { protected TSDataType[] dataTypes; protected MeasurementSchema[] schemas; + // for updating last cache + private MNode deviceMNode; + // record the failed measurements protected List<String> failedMeasurements; @@ -81,6 +85,14 @@ abstract public class InsertPlan extends PhysicalPlan { return failedMeasurements == null ? 0 : failedMeasurements.size(); } + public MNode getDeviceMNode() { + return deviceMNode; + } + + public void setDeviceMNode(MNode deviceMNode) { + this.deviceMNode = deviceMNode; + } + /** * @param index failed measurement index */
