This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch force_ci/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8a3dbcb7db47a98561ddb8620933587be5a20ccc Author: Caideyipi <[email protected]> AuthorDate: Tue Nov 11 19:26:28 2025 +0800 Optimized the lock for encoding & compressor's invalidate cache (#16733) * fix * jr (cherry picked from commit a6e8493bfc0a910f54d4e9ad7abbb37cfd94bc56) --- .../impl/schema/AlterEncodingCompressorProcedure.java | 2 +- .../procedure/impl/schema/DeleteTimeSeriesProcedure.java | 7 ++++--- .../protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java | 11 ++++++++--- .../thrift-datanode/src/main/thrift/datanode.thrift | 1 + 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java index f92cc5d2dd8..6f6d4b162d4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/AlterEncodingCompressorProcedure.java @@ -132,7 +132,7 @@ public class AlterEncodingCompressorProcedure break; case CLEAR_CACHE: LOGGER.info("Invalidate cache of timeSeries {}", requestMessage); - invalidateCache(env, patternTreeBytes, requestMessage, this::setFailure); + invalidateCache(env, patternTreeBytes, requestMessage, this::setFailure, false); collectPayload4Pipe(env); return Flow.NO_MORE_STATE; default: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java index 594b6b035bb..c49f80948fc 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteTimeSeriesProcedure.java @@ -122,7 +122,7 @@ public class DeleteTimeSeriesProcedure } case CLEAN_DATANODE_SCHEMA_CACHE: LOGGER.info("Invalidate cache of timeSeries {}", requestMessage); - invalidateCache(env, patternTreeBytes, requestMessage, this::setFailure); + invalidateCache(env, patternTreeBytes, requestMessage, this::setFailure, true); setNextState(DeleteTimeSeriesState.DELETE_DATA); break; case DELETE_DATA: @@ -202,13 +202,14 @@ public class DeleteTimeSeriesProcedure final ConfigNodeProcedureEnv env, final ByteBuffer patternTreeBytes, final String requestMessage, - final Consumer<ProcedureException> setFailure) { + final Consumer<ProcedureException> setFailure, + final boolean needLock) { final Map<Integer, TDataNodeLocation> dataNodeLocationMap = env.getConfigManager().getNodeManager().getRegisteredDataNodeLocations(); final DataNodeAsyncRequestContext<TInvalidateMatchedSchemaCacheReq, TSStatus> clientHandler = new DataNodeAsyncRequestContext<>( CnToDnAsyncRequestType.INVALIDATE_MATCHED_SCHEMA_CACHE, - new TInvalidateMatchedSchemaCacheReq(patternTreeBytes), + new TInvalidateMatchedSchemaCacheReq(patternTreeBytes).setNeedLock(needLock), dataNodeLocationMap); CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler); final Map<Integer, TSStatus> statusMap = clientHandler.getResponseMap(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 90d7f20fb25..dcc62fcd3c8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -708,7 +708,10 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface @Override public TSStatus invalidateMatchedSchemaCache(final TInvalidateMatchedSchemaCacheReq req) { final TreeDeviceSchemaCacheManager cache = TreeDeviceSchemaCacheManager.getInstance(); - DataNodeSchemaLockManager.getInstance().takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TREE); + if (req.needLock || !req.isSetNeedLock()) { + DataNodeSchemaLockManager.getInstance() + .takeWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TREE); + } try { cache.takeWriteLock(); try { @@ -717,8 +720,10 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface cache.releaseWriteLock(); } } finally { - DataNodeSchemaLockManager.getInstance() - .releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TREE); + if (req.needLock || !req.isSetNeedLock()) { + DataNodeSchemaLockManager.getInstance() + .releaseWriteLock(SchemaLockType.VALIDATE_VS_DELETION_TREE); + } } return RpcUtils.SUCCESS_STATUS; } diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index 9416633a2c4..caaf44c16a7 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -440,6 +440,7 @@ struct TRollbackSchemaBlackListReq { struct TInvalidateMatchedSchemaCacheReq { 1: required binary pathPatternTree + 2: optional bool needLock } struct TFetchSchemaBlackListReq {
