This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch time-opti in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2e98e175a69cd7e9f23ad49581ff288098d52aa6 Author: Caideyipi <[email protected]> AuthorDate: Wed May 6 17:41:43 2026 +0800 opt --- ...istoricalDataRegionTsFileAndDeletionSource.java | 36 +++++++++++----------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index 6da04faeb3a..d98c886afb3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -495,11 +495,10 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource } final long startHistoricalExtractionTime = System.currentTimeMillis(); + final List<PersistentResource> originalResourceList = new ArrayList<>(); dataRegion.writeLock( "Pipe: start to extract historical TsFile and Deletion(if uses iotConsensusV2)"); try { - List<PersistentResource> originalResourceList = new ArrayList<>(); - if (shouldExtractInsertion) { flushTsFilesForExtraction(dataRegion); extractTsFiles(dataRegion, startHistoricalExtractionTime, originalResourceList); @@ -508,25 +507,26 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId)) .ifPresent(manager -> extractDeletions(manager, originalResourceList)); } - - // Sort tsFileResource and deletionResource - long startTime = System.currentTimeMillis(); - LOGGER.info("Pipe {}@{}: start to sort all extracted resources", pipeName, dataRegionId); - originalResourceList.sort( - (o1, o2) -> - startIndex instanceof TimeWindowStateProgressIndex - ? Long.compare(o1.getFileStartTime(), o2.getFileStartTime()) - : o1.getProgressIndex().topologicalCompareTo(o2.getProgressIndex())); - pendingQueue = new ArrayDeque<>(originalResourceList); - - LOGGER.info( - "Pipe {}@{}: finish to sort all extracted resources, took {} ms", - pipeName, - dataRegionId, - System.currentTimeMillis() - startTime); } finally { dataRegion.writeUnlock(); } + + // The extracted resources are already copied and pinned if necessary, so sorting and queue + // materialization can be moved out of the region write lock to reduce create/start blocking. + long startTime = System.currentTimeMillis(); + LOGGER.info("Pipe {}@{}: start to sort all extracted resources", pipeName, dataRegionId); + originalResourceList.sort( + (o1, o2) -> + startIndex instanceof TimeWindowStateProgressIndex + ? Long.compare(o1.getFileStartTime(), o2.getFileStartTime()) + : o1.getProgressIndex().topologicalCompareTo(o2.getProgressIndex())); + pendingQueue = new ArrayDeque<>(originalResourceList); + + LOGGER.info( + "Pipe {}@{}: finish to sort all extracted resources, took {} ms", + pipeName, + dataRegionId, + System.currentTimeMillis() - startTime); } private void flushTsFilesForExtraction(DataRegion dataRegion) {
