This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rc/1.2.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d33b33c97ebbb766e33e548d65f2398fd483c559
Author: HTHou <[email protected]>
AuthorDate: Tue Sep 26 19:10:06 2023 +0800

    Revert "Pipe: push down the construction of history events to save memory 
when large amount of pipes are running (#11067) (#11135)"
    
    This reverts commit 50e5411d10048bdab191bd9bb8a25fec7300abae.
---
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 78 +++++++++-------------
 .../resource/tsfile/PipeTsFileResourceManager.java |  9 ---
 2 files changed, 31 insertions(+), 56 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index 75ad889c10e..dc084a45d9c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.db.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
-import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
@@ -74,7 +73,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
 
   private long historicalDataExtractionTimeLowerBound; // Arrival time
 
-  private Queue<TsFileResource> pendingQueue;
+  private Queue<PipeTsFileInsertionEvent> pendingQueue;
 
   @Override
   public void validate(PipeParameterValidator validator) {
@@ -193,7 +192,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
       try {
         pendingQueue = new ArrayDeque<>(tsFileManager.size(true) + 
tsFileManager.size(false));
 
-        final Collection<TsFileResource> sequenceTsFileResources =
+        final Collection<PipeTsFileInsertionEvent> sequenceFileInsertionEvents 
=
             tsFileManager.getTsFileList(true).stream()
                 .filter(
                     resource ->
@@ -203,10 +202,19 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                             && 
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
                             && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
+                .map(
+                    resource ->
+                        new PipeTsFileInsertionEvent(
+                            resource,
+                            false,
+                            pipeTaskMeta,
+                            pattern,
+                            historicalDataExtractionStartTime,
+                            historicalDataExtractionEndTime))
                 .collect(Collectors.toList());
-        pendingQueue.addAll(sequenceTsFileResources);
+        pendingQueue.addAll(sequenceFileInsertionEvents);
 
-        final Collection<TsFileResource> unsequenceTsFileResources =
+        final Collection<PipeTsFileInsertionEvent> 
unsequenceFileInsertionEvents =
             tsFileManager.getTsFileList(false).stream()
                 .filter(
                     resource ->
@@ -216,26 +224,29 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
                             && 
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
                             && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
+                .map(
+                    resource ->
+                        new PipeTsFileInsertionEvent(
+                            resource,
+                            false,
+                            pipeTaskMeta,
+                            pattern,
+                            historicalDataExtractionStartTime,
+                            historicalDataExtractionEndTime))
                 .collect(Collectors.toList());
-        pendingQueue.addAll(unsequenceTsFileResources);
+        pendingQueue.addAll(unsequenceFileInsertionEvents);
 
         pendingQueue.forEach(
-            resource -> {
-              // Pin the resource, in case the file is removed by compaction 
or anything.
-              // Will unpin it after the PipeTsFileInsertionEvent is created 
and pinned.
-              try {
-                PipeResourceManager.tsfile().pinTsFileResource(resource);
-              } catch (IOException e) {
-                LOGGER.warn("Pipe: failed to pin TsFileResource {}", 
resource.getTsFilePath());
-              }
-            });
+            event ->
+                event.increaseReferenceCount(
+                    PipeHistoricalDataRegionTsFileExtractor.class.getName()));
 
         LOGGER.info(
             "Pipe: start to extract historical TsFile, data region {}, "
                 + "sequence file count {}, unsequence file count {}",
             dataRegionId,
-            sequenceTsFileResources.size(),
-            unsequenceTsFileResources.size());
+            sequenceFileInsertionEvents.size(),
+            unsequenceFileInsertionEvents.size());
       } finally {
         tsFileManager.readUnlock();
       }
@@ -270,28 +281,8 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     if (pendingQueue == null) {
       return null;
     }
-    TsFileResource resource = pendingQueue.poll();
-    if (resource == null) {
-      return null;
-    }
 
-    final PipeTsFileInsertionEvent event =
-        new PipeTsFileInsertionEvent(
-            resource,
-            false,
-            pipeTaskMeta,
-            pattern,
-            historicalDataExtractionStartTime,
-            historicalDataExtractionEndTime);
-    
event.increaseReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName());
-    try {
-      PipeResourceManager.tsfile().unpinTsFileResource(resource);
-    } catch (IOException e) {
-      LOGGER.warn(
-          "Pipe: failed to unpin TsFileResource after creating event, original 
path: {}",
-          resource.getTsFilePath());
-    }
-    return event;
+    return pendingQueue.poll();
   }
 
   public synchronized boolean hasConsumedAll() {
@@ -302,15 +293,8 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   public synchronized void close() {
     if (pendingQueue != null) {
       pendingQueue.forEach(
-          resource -> {
-            try {
-              PipeResourceManager.tsfile().unpinTsFileResource(resource);
-            } catch (IOException e) {
-              LOGGER.warn(
-                  "Pipe: failed to unpin TsFileResource after dropping pipe, 
original path: {}",
-                  resource.getTsFilePath());
-            }
-          });
+          event ->
+              
event.clearReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName()));
       pendingQueue.clear();
       pendingQueue = null;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 9839d793a25..4f6b4b1dde2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.resource.tsfile;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
 import java.io.File;
 import java.io.IOException;
@@ -179,12 +178,4 @@ public class PipeTsFileResourceManager {
   public synchronized int getFileReferenceCount(File hardlinkOrCopiedFile) {
     return 
hardlinkOrCopiedFileToReferenceMap.getOrDefault(hardlinkOrCopiedFile.getPath(), 
0);
   }
-
-  public synchronized void pinTsFileResource(TsFileResource resource) throws 
IOException {
-    increaseFileReference(resource.getTsFile(), true);
-  }
-
-  public synchronized void unpinTsFileResource(TsFileResource resource) throws 
IOException {
-    
decreaseFileReference(getHardlinkOrCopiedFileInPipeDir(resource.getTsFile()));
-  }
 }

Reply via email to