This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 491246db1fc fix insert&delete cases dead data
491246db1fc is described below

commit 491246db1fc4f07572e30d698ff0cf451a41fdaa
Author: Colin Li <[email protected]>
AuthorDate: Thu Jan 4 09:13:01 2024 +0800

    fix insert&delete cases dead data
---
 .../impl/DataNodeInternalRPCServiceImpl.java       |  2 ++
 .../db/queryengine/common/MPPQueryContext.java     | 10 ++++++++++
 .../execution/executor/RegionWriteExecutor.java    |  3 ++-
 .../iotdb/db/queryengine/plan/Coordinator.java     |  8 +++++++-
 .../analyze/cache/schema/DataNodeSchemaCache.java  | 22 ++++++++++++++++++++++
 .../plan/analyze/schema/ClusterSchemaFetcher.java  |  9 +++++++++
 6 files changed, 52 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index dc496b01034..a3ec3892697 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -513,11 +513,13 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
   @Override
   public TSStatus 
invalidateMatchedSchemaCache(TInvalidateMatchedSchemaCacheReq req) {
     DataNodeSchemaCache cache = DataNodeSchemaCache.getInstance();
+    cache.takeDeleteLock();
     cache.takeWriteLock();
     try {
       
cache.invalidate(PathPatternTree.deserialize(req.pathPatternTree).getAllPathPatterns());
     } finally {
       cache.releaseWriteLock();
+      cache.releaseDeleteLock();
     }
     return RpcUtils.SUCCESS_STATUS;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 1fda5d75385..16f0dcd8b51 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -59,6 +59,8 @@ public class MPPQueryContext {
 
   private Filter globalTimeFilter;
 
+  private boolean acquiredLock;
+
   public MPPQueryContext(QueryId queryId) {
     this.queryId = queryId;
     this.endPointBlackList = new LinkedList<>();
@@ -155,6 +157,14 @@ public class MPPQueryContext {
     return sql;
   }
 
+  public boolean getAcquiredLock() {
+    return acquiredLock;
+  }
+
+  public void setAcquiredLock(boolean acuqired) {
+    acquiredLock = acuqired;
+  }
+
   public void generateGlobalTimeFilter(Analysis analysis) {
     this.globalTimeFilter =
         
PredicateUtils.convertPredicateToTimeFilter(analysis.getGlobalTimePredicate());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index eba3e73f42f..784105dd092 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -305,7 +305,8 @@ public class RegionWriteExecutor {
     @Override
     public RegionExecutionResult visitDeleteData(
         DeleteDataNode node, WritePlanNodeExecutionContext context) {
-      // data deletion should block data insertion, especially when executed 
for deleting timeseries
+      // data deletion don't need to block data insertion, but there are some 
creation operation
+      // require write lock on data region.
       context.getRegionWriteValidationRWLock().writeLock().lock();
       try {
         return super.visitDeleteData(node, context);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 1487a0f9932..abd0c356910 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.execution.QueryIdGenerator;
 import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
 import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
@@ -132,11 +133,12 @@ public class Coordinator {
       long timeOut) {
     long startTime = System.currentTimeMillis();
     QueryId globalQueryId = queryIdGenerator.createNextQueryId();
+    MPPQueryContext queryContext = null;
     try (SetThreadName queryName = new SetThreadName(globalQueryId.getId())) {
       if (sql != null && sql.length() > 0) {
         LOGGER.debug("[QueryStart] sql: {}", sql);
       }
-      MPPQueryContext queryContext =
+      queryContext =
           new MPPQueryContext(
               sql,
               globalQueryId,
@@ -159,6 +161,10 @@ public class Coordinator {
       }
       execution.start();
       return execution.getStatus();
+    } finally {
+      if (queryContext != null && queryContext.getAcquiredLock()) {
+        DataNodeSchemaCache.getInstance().releaseInsertLock();
+      }
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
index 3f860f74d0b..f8198e4216b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
@@ -63,6 +63,12 @@ public class DataNodeSchemaCache {
   // cache update or clean have higher priority than cache read
   private final ReentrantReadWriteLock readWriteLock = new 
ReentrantReadWriteLock(false);
 
+  // To make insert and delete executive, I import a new rwlock to datanode 
schema.
+  // The reason is that some operations [loadtsfiles] will hold readlock then 
acquire writelock
+  // before
+  // closing the tsfile and resting the datanodeCache.
+  private final ReentrantReadWriteLock insertDeletionLock = new 
ReentrantReadWriteLock(false);
+
   private DataNodeSchemaCache() {
     deviceUsingTemplateSchemaCache = new 
DeviceUsingTemplateSchemaCache(templateManager);
     timeSeriesSchemaCache = new TimeSeriesSchemaCache();
@@ -104,6 +110,22 @@ public class DataNodeSchemaCache {
     readWriteLock.writeLock().unlock();
   }
 
+  public void takeInsertLock() {
+    insertDeletionLock.readLock().lock();
+  }
+
+  public void releaseInsertLock() {
+    insertDeletionLock.readLock().unlock();
+  }
+
+  public void takeDeleteLock() {
+    insertDeletionLock.writeLock().lock();
+  }
+
+  public void releaseDeleteLock() {
+    insertDeletionLock.writeLock().unlock();
+  }
+
   /**
    * Get SchemaEntity info without auto create schema
    *
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
index cc13537fbc9..538bda1c69b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -52,6 +52,9 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   private final Coordinator coordinator = Coordinator.getInstance();
+
+  // DataNodeSchemaCache's rwlock is used to block deletion when we insert the 
same timeseries
+  // and will be released after coordinator's execute().
   private final DataNodeSchemaCache schemaCache = 
DataNodeSchemaCache.getInstance();
   private final ITemplateManager templateManager = 
ClusterTemplateManager.getInstance();
 
@@ -162,6 +165,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher 
{
       MPPQueryContext context) {
     // The schema cache R/W and fetch operation must be locked together thus 
the cache clean
     // operation executed by delete timeseries will be effective.
+    schemaCache.takeInsertLock();
+    context.setAcquiredLock(true);
     schemaCache.takeReadLock();
     try {
       Pair<Template, PartialPath> templateSetInfo =
@@ -198,6 +203,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher 
{
       MPPQueryContext context) {
     // The schema cache R/W and fetch operation must be locked together thus 
the cache clean
     // operation executed by delete timeseries will be effective.
+    schemaCache.takeInsertLock();
+    context.setAcquiredLock(true);
     schemaCache.takeReadLock();
     try {
 
@@ -240,6 +247,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher 
{
       MPPQueryContext context) {
     // The schema cache R/W and fetch operation must be locked together thus 
the cache clean
     // operation executed by delete timeseries will be effective.
+    schemaCache.takeInsertLock();
+    context.setAcquiredLock(true);
     schemaCache.takeReadLock();
     try {
       ClusterSchemaTree schemaTree = new ClusterSchemaTree();

Reply via email to