This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch cp-206 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5a16bddff5ee23f55ff4e5d430a461cc0a301cb9 Author: Caideyipi <[email protected]> AuthorDate: Tue Nov 11 19:26:28 2025 +0800 Optimized the lock for encoding & compressor's invalidate cache (#16733) * fix * jr --- .../impl/schema/AlterEncodingCompressorProcedure.java | 2 +- .../procedure/impl/schema/DeleteTimeSeriesProcedure.java | 7 ++++--- .../protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java | 11 ++++++++--- .../thrift-datanode/src/main/thrift/datanode.thrift | 1 + 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java index 773fb0a2b9a..31f1a4f40eb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java @@ -133,7 +133,7 @@ public class AlterEncodingCompressorProcedure break; case CLEAR_CACHE: LOGGER.info("Invalidate cache of timeSeries {}", requestMessage); - invalidateCache(env, patternTreeBytes, requestMessage, this::setFailure); + invalidateCache(env, patternTreeBytes, requestMessage, this::setFailure, false); collectPayload4Pipe(env); return Flow.NO_MORE_STATE; default: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java index 59ca04eccc6..ed0f3f0698b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java @@ -117,7 +117,7 @@ public class DeleteTimeSeriesProcedure } case CLEAN_DATANODE_SCHEMA_CACHE: LOGGER.info("Invalidate cache of timeSeries {}", requestMessage); - invalidateCache(env, patternTreeBytes, requestMessage, this::setFailure); + invalidateCache(env, patternTreeBytes, requestMessage, this::setFailure, true); setNextState(DeleteTimeSeriesState.DELETE_DATA); break; case DELETE_DATA: @@ -197,13 +197,14 @@ public class DeleteTimeSeriesProcedure final ConfigNodeProcedureEnv env, final ByteBuffer patternTreeBytes, final String requestMessage, - final Consumer<ProcedureException> setFailure) { + final Consumer<ProcedureException> setFailure, + final boolean needLock) { final Map<Integer, TDataNodeLocation> dataNodeLocationMap = env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations(); final DataNodeAsyncRequestContext<TInvalidateMatchedSchemaCacheReq, TSStatus> clientHandler = new DataNodeAsyncRequestContext<>( CnToDnAsyncRequestType.INVALIDATE_MATCHED_SCHEMA_CACHE, - new TInvalidateMatchedSchemaCacheReq(patternTreeBytes), + new TInvalidateMatchedSchemaCacheReq(patternTreeBytes).setNeedLock(needLock), dataNodeLocationMap); CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler); final Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 8b66f50b933..fc3a2191400 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -699,7 +699,10 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface @Override public TSStatus invalidateMatchedSchemaCache(final TInvalidateMatchedSchemaCacheReq req) { final TreeDeviceSchemaCacheManager cache = TreeDeviceSchemaCacheManager.getInstance(); - DataNodeSchemaLockManager.getInstance().takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TREE); + if (req.needLock || !req.isSetNeedLock()) { + DataNodeSchemaLockManager.getInstance() + .takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TREE); + } try { cache.takeWriteLock(); try { @@ -708,8 +711,10 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface cache.releaseWriteLock(); } } finally { - DataNodeSchemaLockManager.getInstance() - .releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TREE); + if (req.needLock || !req.isSetNeedLock()) { + DataNodeSchemaLockManager.getInstance() + .releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TREE); + } } return RpcUtils.SUCCESS_STATUS; } diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index f210ad5bf9a..353ec310117 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -436,6 +436,7 @@ struct TRollbackSchemaBlackListReq { struct TInvalidateMatchedSchemaCacheReq { 1: required binary pathPatternTree + 2: optional bool needLock } struct TFetchSchemaBlackListReq {
