This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ce932ba81e7 Pipe: push down the construction of history events to save 
memory when large amount of pipes are running (#11067)
ce932ba81e7 is described below

commit ce932ba81e7b590f278683c3e7f5e55e222cc7d7
Author: 马子坤 <[email protected]>
AuthorDate: Wed Sep 13 16:55:21 2023 +0800

    Pipe: push down the construction of history events to save memory when 
large amount of pipes are running (#11067)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 78 +++++++++++++---------
 .../resource/tsfile/PipeTsFileResourceManager.java |  9 +++
 2 files changed, 56 insertions(+), 31 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index dc084a45d9c..75ad889c10e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
@@ -73,7 +74,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
 
   private long historicalDataExtractionTimeLowerBound; // Arrival time
 
-  private Queue<PipeTsFileInsertionEvent> pendingQueue;
+  private Queue<TsFileResource> pendingQueue;
 
   @Override
   public void validate(PipeParameterValidator validator) {
@@ -192,7 +193,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       try {
         pendingQueue = new ArrayDeque<>(tsFileManager.size(true) + 
tsFileManager.size(false));
 
-        final Collection<PipeTsFileInsertionEvent> sequenceFileInsertionEvents 
=
+        final Collection<TsFileResource> sequenceTsFileResources =
             tsFileManager.getTsFileList(true).stream()
                 .filter(
                     resource ->
@@ -202,19 +203,10 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                             && 
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
                             && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
-                .map(
-                    resource ->
-                        new PipeTsFileInsertionEvent(
-                            resource,
-                            false,
-                            pipeTaskMeta,
-                            pattern,
-                            historicalDataExtractionStartTime,
-                            historicalDataExtractionEndTime))
                 .collect(Collectors.toList());
-        pendingQueue.addAll(sequenceFileInsertionEvents);
+        pendingQueue.addAll(sequenceTsFileResources);
 
-        final Collection<PipeTsFileInsertionEvent> 
unsequenceFileInsertionEvents =
+        final Collection<TsFileResource> unsequenceTsFileResources =
             tsFileManager.getTsFileList(false).stream()
                 .filter(
                     resource ->
@@ -224,29 +216,26 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                             && 
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
                             && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
-                .map(
-                    resource ->
-                        new PipeTsFileInsertionEvent(
-                            resource,
-                            false,
-                            pipeTaskMeta,
-                            pattern,
-                            historicalDataExtractionStartTime,
-                            historicalDataExtractionEndTime))
                 .collect(Collectors.toList());
-        pendingQueue.addAll(unsequenceFileInsertionEvents);
+        pendingQueue.addAll(unsequenceTsFileResources);
 
         pendingQueue.forEach(
-            event ->
-                event.increaseReferenceCount(
-                    PipeHistoricalDataRegionTsFileExtractor.class.getName()));
+            resource -> {
+              // Pin the resource, in case the file is removed by compaction 
or anything.
+              // Will unpin it after the PipeTsFileInsertionEvent is created 
and pinned.
+              try {
+                PipeResourceManager.tsfile().pinTsFileResource(resource);
+              } catch (IOException e) {
+                LOGGER.warn("Pipe: failed to pin TsFileResource {}", 
resource.getTsFilePath());
+              }
+            });
 
         LOGGER.info(
             "Pipe: start to extract historical TsFile, data region {}, "
                 + "sequence file count {}, unsequence file count {}",
             dataRegionId,
-            sequenceFileInsertionEvents.size(),
-            unsequenceFileInsertionEvents.size());
+            sequenceTsFileResources.size(),
+            unsequenceTsFileResources.size());
       } finally {
         tsFileManager.readUnlock();
       }
@@ -281,8 +270,28 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     if (pendingQueue == null) {
       return null;
     }
+    TsFileResource resource = pendingQueue.poll();
+    if (resource == null) {
+      return null;
+    }
 
-    return pendingQueue.poll();
+    final PipeTsFileInsertionEvent event =
+        new PipeTsFileInsertionEvent(
+            resource,
+            false,
+            pipeTaskMeta,
+            pattern,
+            historicalDataExtractionStartTime,
+            historicalDataExtractionEndTime);
+    
event.increaseReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName());
+    try {
+      PipeResourceManager.tsfile().unpinTsFileResource(resource);
+    } catch (IOException e) {
+      LOGGER.warn(
+          "Pipe: failed to unpin TsFileResource after creating event, original 
path: {}",
+          resource.getTsFilePath());
+    }
+    return event;
   }
 
   public synchronized boolean hasConsumedAll() {
@@ -293,8 +302,15 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   public synchronized void close() {
     if (pendingQueue != null) {
       pendingQueue.forEach(
-          event ->
-              
event.clearReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName()));
+          resource -> {
+            try {
+              PipeResourceManager.tsfile().unpinTsFileResource(resource);
+            } catch (IOException e) {
+              LOGGER.warn(
+                  "Pipe: failed to unpin TsFileResource after dropping pipe, 
original path: {}",
+                  resource.getTsFilePath());
+            }
+          });
       pendingQueue.clear();
       pendingQueue = null;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 4f6b4b1dde2..9839d793a25 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.resource.tsfile;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
 import java.io.File;
 import java.io.IOException;
@@ -178,4 +179,12 @@ public class PipeTsFileResourceManager {
   public synchronized int getFileReferenceCount(File hardlinkOrCopiedFile) {
     return 
hardlinkOrCopiedFileToReferenceMap.getOrDefault(hardlinkOrCopiedFile.getPath(), 
0);
   }
+
+  public synchronized void pinTsFileResource(TsFileResource resource) throws 
IOException {
+    increaseFileReference(resource.getTsFile(), true);
+  }
+
+  public synchronized void unpinTsFileResource(TsFileResource resource) throws 
IOException {
+    
decreaseFileReference(getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
+  }
 }

Reply via email to