This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch rc/1.2.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d33b33c97ebbb766e33e548d65f2398fd483c559 Author: HTHou <[email protected]> AuthorDate: Tue Sep 26 19:10:06 2023 +0800 Revert "Pipe: push down the construction of history events to save memory when large amount of pipes are running (#11067) (#11135)" This reverts commit 50e5411d10048bdab191bd9bb8a25fec7300abae. --- .../PipeHistoricalDataRegionTsFileExtractor.java | 78 +++++++++------------- .../resource/tsfile/PipeTsFileResourceManager.java | 9 --- 2 files changed, 31 insertions(+), 56 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java index 75ad889c10e..dc084a45d9c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -24,7 +24,6 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta; import org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; -import org.apache.iotdb.db.pipe.resource.PipeResourceManager; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; @@ -74,7 +73,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa private long historicalDataExtractionTimeLowerBound; // Arrival time - private Queue<TsFileResource> pendingQueue; + private Queue<PipeTsFileInsertionEvent> pendingQueue; @Override public void validate(PipeParameterValidator validator) { @@ -193,7 +192,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa try { pendingQueue = new ArrayDeque<>(tsFileManager.size(true) + tsFileManager.size(false)); - final Collection<TsFileResource> sequenceTsFileResources = + final Collection<PipeTsFileInsertionEvent> sequenceFileInsertionEvents = tsFileManager.getTsFileList(true).stream() .filter( resource -> @@ -203,10 +202,19 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa && !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()) && isTsFileResourceOverlappedWithTimeRange(resource) && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) + .map( + resource -> + new PipeTsFileInsertionEvent( + resource, + false, + pipeTaskMeta, + pattern, + historicalDataExtractionStartTime, + historicalDataExtractionEndTime)) .collect(Collectors.toList()); - pendingQueue.addAll(sequenceTsFileResources); + pendingQueue.addAll(sequenceFileInsertionEvents); - final Collection<TsFileResource> unsequenceTsFileResources = + final Collection<PipeTsFileInsertionEvent> unsequenceFileInsertionEvents = tsFileManager.getTsFileList(false).stream() .filter( resource -> @@ -216,26 +224,29 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa && !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()) && isTsFileResourceOverlappedWithTimeRange(resource) && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) + .map( + resource -> + new PipeTsFileInsertionEvent( + resource, + false, + pipeTaskMeta, + pattern, + historicalDataExtractionStartTime, + historicalDataExtractionEndTime)) .collect(Collectors.toList()); - pendingQueue.addAll(unsequenceTsFileResources); + pendingQueue.addAll(unsequenceFileInsertionEvents); pendingQueue.forEach( - resource -> { - // Pin the resource, in case the file is removed by compaction or anything. - // Will unpin it after the PipeTsFileInsertionEvent is created and pinned. - try { - PipeResourceManager.tsfile().pinTsFileResource(resource); - } catch (IOException e) { - LOGGER.warn("Pipe: failed to pin TsFileResource {}", resource.getTsFilePath()); - } - }); + event -> + event.increaseReferenceCount( + PipeHistoricalDataRegionTsFileExtractor.class.getName())); LOGGER.info( "Pipe: start to extract historical TsFile, data region {}, " + "sequence file count {}, unsequence file count {}", dataRegionId, - sequenceTsFileResources.size(), - unsequenceTsFileResources.size()); + sequenceFileInsertionEvents.size(), + unsequenceFileInsertionEvents.size()); } finally { tsFileManager.readUnlock(); } @@ -270,28 +281,8 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa if (pendingQueue == null) { return null; } - TsFileResource resource = pendingQueue.poll(); - if (resource == null) { - return null; - } - final PipeTsFileInsertionEvent event = - new PipeTsFileInsertionEvent( - resource, - false, - pipeTaskMeta, - pattern, - historicalDataExtractionStartTime, - historicalDataExtractionEndTime); - event.increaseReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName()); - try { - PipeResourceManager.tsfile().unpinTsFileResource(resource); - } catch (IOException e) { - LOGGER.warn( - "Pipe: failed to unpin TsFileResource after creating event, original path: {}", - resource.getTsFilePath()); - } - return event; + return pendingQueue.poll(); } public synchronized boolean hasConsumedAll() { @@ -302,15 +293,8 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa public synchronized void close() { if (pendingQueue != null) { pendingQueue.forEach( - resource -> { - try { - PipeResourceManager.tsfile().unpinTsFileResource(resource); - } catch (IOException e) { - LOGGER.warn( - "Pipe: failed to unpin TsFileResource after dropping pipe, original path: {}", - resource.getTsFilePath()); - } - }); + event -> + event.clearReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName())); pendingQueue.clear(); pendingQueue = null; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java index 9839d793a25..4f6b4b1dde2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.resource.tsfile; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import java.io.File; import java.io.IOException; @@ -179,12 +178,4 @@ public class PipeTsFileResourceManager { public synchronized int getFileReferenceCount(File hardlinkOrCopiedFile) { return hardlinkOrCopiedFileToReferenceMap.getOrDefault(hardlinkOrCopiedFile.getPath(), 0); } - - public synchronized void pinTsFileResource(TsFileResource resource) throws IOException { - increaseFileReference(resource.getTsFile(), true); - } - - public synchronized void unpinTsFileResource(TsFileResource resource) throws IOException { - decreaseFileReference(getHardlinkOrCopiedFileInPipeDir(resource.getTsFile())); - } }
