This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 491246db1fc fix insert&delete cases dead data
491246db1fc is described below
commit 491246db1fc4f07572e30d698ff0cf451a41fdaa
Author: Colin Li <[email protected]>
AuthorDate: Thu Jan 4 09:13:01 2024 +0800
fix insert&delete cases dead data
---
.../impl/DataNodeInternalRPCServiceImpl.java | 2 ++
.../db/queryengine/common/MPPQueryContext.java | 10 ++++++++++
.../execution/executor/RegionWriteExecutor.java | 3 ++-
.../iotdb/db/queryengine/plan/Coordinator.java | 8 +++++++-
.../analyze/cache/schema/DataNodeSchemaCache.java | 22 ++++++++++++++++++++++
.../plan/analyze/schema/ClusterSchemaFetcher.java | 9 +++++++++
6 files changed, 52 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index dc496b01034..a3ec3892697 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -513,11 +513,13 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TSStatus
invalidateMatchedSchemaCache(TInvalidateMatchedSchemaCacheReq req) {
DataNodeSchemaCache cache = DataNodeSchemaCache.getInstance();
+ cache.takeDeleteLock();
cache.takeWriteLock();
try {
cache.invalidate(PathPatternTree.deserialize(req.pathPatternTree).getAllPathPatterns());
} finally {
cache.releaseWriteLock();
+ cache.releaseDeleteLock();
}
return RpcUtils.SUCCESS_STATUS;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 1fda5d75385..16f0dcd8b51 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -59,6 +59,8 @@ public class MPPQueryContext {
private Filter globalTimeFilter;
+ private boolean acquiredLock;
+
public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
this.endPointBlackList = new LinkedList<>();
@@ -155,6 +157,14 @@ public class MPPQueryContext {
return sql;
}
+ public boolean getAcquiredLock() {
+ return acquiredLock;
+ }
+
+ public void setAcquiredLock(boolean acuqired) {
+ acquiredLock = acuqired;
+ }
+
public void generateGlobalTimeFilter(Analysis analysis) {
this.globalTimeFilter =
PredicateUtils.convertPredicateToTimeFilter(analysis.getGlobalTimePredicate());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
index eba3e73f42f..784105dd092 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java
@@ -305,7 +305,8 @@ public class RegionWriteExecutor {
@Override
public RegionExecutionResult visitDeleteData(
DeleteDataNode node, WritePlanNodeExecutionContext context) {
- // data deletion should block data insertion, especially when executed
for deleting timeseries
+ // data deletion don't need to block data insertion, but there are some
creation operation
+ // require write lock on data region.
context.getRegionWriteValidationRWLock().writeLock().lock();
try {
return super.visitDeleteData(node, context);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 1487a0f9932..abd0c356910 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.queryengine.common.QueryId;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.execution.QueryIdGenerator;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
+import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
@@ -132,11 +133,12 @@ public class Coordinator {
long timeOut) {
long startTime = System.currentTimeMillis();
QueryId globalQueryId = queryIdGenerator.createNextQueryId();
+ MPPQueryContext queryContext = null;
try (SetThreadName queryName = new SetThreadName(globalQueryId.getId())) {
if (sql != null && sql.length() > 0) {
LOGGER.debug("[QueryStart] sql: {}", sql);
}
- MPPQueryContext queryContext =
+ queryContext =
new MPPQueryContext(
sql,
globalQueryId,
@@ -159,6 +161,10 @@ public class Coordinator {
}
execution.start();
return execution.getStatus();
+ } finally {
+ if (queryContext != null && queryContext.getAcquiredLock()) {
+ DataNodeSchemaCache.getInstance().releaseInsertLock();
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
index 3f860f74d0b..f8198e4216b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java
@@ -63,6 +63,12 @@ public class DataNodeSchemaCache {
// cache update or clean have higher priority than cache read
private final ReentrantReadWriteLock readWriteLock = new
ReentrantReadWriteLock(false);
+ // To make insert and delete executive, I import a new rwlock to datanode
schema.
+ // The reason is that some operations [loadtsfiles] will hold readlock then
acquire writelock
+ // before
+ // closing the tsfile and resting the datanodeCache.
+ private final ReentrantReadWriteLock insertDeletionLock = new
ReentrantReadWriteLock(false);
+
private DataNodeSchemaCache() {
deviceUsingTemplateSchemaCache = new
DeviceUsingTemplateSchemaCache(templateManager);
timeSeriesSchemaCache = new TimeSeriesSchemaCache();
@@ -104,6 +110,22 @@ public class DataNodeSchemaCache {
readWriteLock.writeLock().unlock();
}
+ public void takeInsertLock() {
+ insertDeletionLock.readLock().lock();
+ }
+
+ public void releaseInsertLock() {
+ insertDeletionLock.readLock().unlock();
+ }
+
+ public void takeDeleteLock() {
+ insertDeletionLock.writeLock().lock();
+ }
+
+ public void releaseDeleteLock() {
+ insertDeletionLock.writeLock().unlock();
+ }
+
/**
* Get SchemaEntity info without auto create schema
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
index cc13537fbc9..538bda1c69b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetcher.java
@@ -52,6 +52,9 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private final Coordinator coordinator = Coordinator.getInstance();
+
+ // DataNodeSchemaCache's rwlock is used to block deletion when we insert the
same timeseries
+ // and will be released after coordinator's execute().
private final DataNodeSchemaCache schemaCache =
DataNodeSchemaCache.getInstance();
private final ITemplateManager templateManager =
ClusterTemplateManager.getInstance();
@@ -162,6 +165,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher
{
MPPQueryContext context) {
// The schema cache R/W and fetch operation must be locked together thus
the cache clean
// operation executed by delete timeseries will be effective.
+ schemaCache.takeInsertLock();
+ context.setAcquiredLock(true);
schemaCache.takeReadLock();
try {
Pair<Template, PartialPath> templateSetInfo =
@@ -198,6 +203,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher
{
MPPQueryContext context) {
// The schema cache R/W and fetch operation must be locked together thus
the cache clean
// operation executed by delete timeseries will be effective.
+ schemaCache.takeInsertLock();
+ context.setAcquiredLock(true);
schemaCache.takeReadLock();
try {
@@ -240,6 +247,8 @@ public class ClusterSchemaFetcher implements ISchemaFetcher
{
MPPQueryContext context) {
// The schema cache R/W and fetch operation must be locked together thus
the cache clean
// operation executed by delete timeseries will be effective.
+ schemaCache.takeInsertLock();
+ context.setAcquiredLock(true);
schemaCache.takeReadLock();
try {
ClusterSchemaTree schemaTree = new ClusterSchemaTree();