This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ce932ba81e7 Pipe: push down the construction of history events to save
memory when large amount of pipes are running (#11067)
ce932ba81e7 is described below
commit ce932ba81e7b590f278683c3e7f5e55e222cc7d7
Author: 马子坤 <[email protected]>
AuthorDate: Wed Sep 13 16:55:21 2023 +0800
Pipe: push down the construction of history events to save memory when
large amount of pipes are running (#11067)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../PipeHistoricalDataRegionTsFileExtractor.java | 78 +++++++++++++---------
.../resource/tsfile/PipeTsFileResourceManager.java | 9 +++
2 files changed, 56 insertions(+), 31 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index dc084a45d9c..75ad889c10e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
@@ -73,7 +74,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
private long historicalDataExtractionTimeLowerBound; // Arrival time
- private Queue<PipeTsFileInsertionEvent> pendingQueue;
+ private Queue<TsFileResource> pendingQueue;
@Override
public void validate(PipeParameterValidator validator) {
@@ -192,7 +193,7 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
try {
pendingQueue = new ArrayDeque<>(tsFileManager.size(true) +
tsFileManager.size(false));
- final Collection<PipeTsFileInsertionEvent> sequenceFileInsertionEvents
=
+ final Collection<TsFileResource> sequenceTsFileResources =
tsFileManager.getTsFileList(true).stream()
.filter(
resource ->
@@ -202,19 +203,10 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
&&
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
&&
isTsFileResourceOverlappedWithTimeRange(resource)
&&
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
- .map(
- resource ->
- new PipeTsFileInsertionEvent(
- resource,
- false,
- pipeTaskMeta,
- pattern,
- historicalDataExtractionStartTime,
- historicalDataExtractionEndTime))
.collect(Collectors.toList());
- pendingQueue.addAll(sequenceFileInsertionEvents);
+ pendingQueue.addAll(sequenceTsFileResources);
- final Collection<PipeTsFileInsertionEvent>
unsequenceFileInsertionEvents =
+ final Collection<TsFileResource> unsequenceTsFileResources =
tsFileManager.getTsFileList(false).stream()
.filter(
resource ->
@@ -224,29 +216,26 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
&&
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
&&
isTsFileResourceOverlappedWithTimeRange(resource)
&&
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
- .map(
- resource ->
- new PipeTsFileInsertionEvent(
- resource,
- false,
- pipeTaskMeta,
- pattern,
- historicalDataExtractionStartTime,
- historicalDataExtractionEndTime))
.collect(Collectors.toList());
- pendingQueue.addAll(unsequenceFileInsertionEvents);
+ pendingQueue.addAll(unsequenceTsFileResources);
pendingQueue.forEach(
- event ->
- event.increaseReferenceCount(
- PipeHistoricalDataRegionTsFileExtractor.class.getName()));
+ resource -> {
+ // Pin the resource, in case the file is removed by compaction
or anything.
+ // Will unpin it after the PipeTsFileInsertionEvent is created
and pinned.
+ try {
+ PipeResourceManager.tsfile().pinTsFileResource(resource);
+ } catch (IOException e) {
+ LOGGER.warn("Pipe: failed to pin TsFileResource {}",
resource.getTsFilePath());
+ }
+ });
LOGGER.info(
"Pipe: start to extract historical TsFile, data region {}, "
+ "sequence file count {}, unsequence file count {}",
dataRegionId,
- sequenceFileInsertionEvents.size(),
- unsequenceFileInsertionEvents.size());
+ sequenceTsFileResources.size(),
+ unsequenceTsFileResources.size());
} finally {
tsFileManager.readUnlock();
}
@@ -281,8 +270,28 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
if (pendingQueue == null) {
return null;
}
+ TsFileResource resource = pendingQueue.poll();
+ if (resource == null) {
+ return null;
+ }
- return pendingQueue.poll();
+ final PipeTsFileInsertionEvent event =
+ new PipeTsFileInsertionEvent(
+ resource,
+ false,
+ pipeTaskMeta,
+ pattern,
+ historicalDataExtractionStartTime,
+ historicalDataExtractionEndTime);
+
event.increaseReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName());
+ try {
+ PipeResourceManager.tsfile().unpinTsFileResource(resource);
+ } catch (IOException e) {
+ LOGGER.warn(
+ "Pipe: failed to unpin TsFileResource after creating event, original
path: {}",
+ resource.getTsFilePath());
+ }
+ return event;
}
public synchronized boolean hasConsumedAll() {
@@ -293,8 +302,15 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
public synchronized void close() {
if (pendingQueue != null) {
pendingQueue.forEach(
- event ->
-
event.clearReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName()));
+ resource -> {
+ try {
+ PipeResourceManager.tsfile().unpinTsFileResource(resource);
+ } catch (IOException e) {
+ LOGGER.warn(
+ "Pipe: failed to unpin TsFileResource after dropping pipe,
original path: {}",
+ resource.getTsFilePath());
+ }
+ });
pendingQueue.clear();
pendingQueue = null;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 4f6b4b1dde2..9839d793a25 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.resource.tsfile;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import java.io.File;
import java.io.IOException;
@@ -178,4 +179,12 @@ public class PipeTsFileResourceManager {
public synchronized int getFileReferenceCount(File hardlinkOrCopiedFile) {
return
hardlinkOrCopiedFileToReferenceMap.getOrDefault(hardlinkOrCopiedFile.getPath(),
0);
}
+
+ public synchronized void pinTsFileResource(TsFileResource resource) throws
IOException {
+ increaseFileReference(resource.getTsFile(), true);
+ }
+
+ public synchronized void unpinTsFileResource(TsFileResource resource) throws
IOException {
+
decreaseFileReference(getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
+ }
}