This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rc/1.2.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 658a6170d75d77ac314ee81e73936623c1228312
Author: HTHou <[email protected]>
AuthorDate: Tue Sep 26 19:15:16 2023 +0800

    Revert "Pipe: Improve performance for 10000+ pipes (#11021) (#11051)"
    
    This reverts commit a0f89ab9ecdcc7f27b1c176d46e6c25c4ba6885f.
---
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |  9 +++++-
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 36 +++-------------------
 .../resource/tsfile/PipeTsFileResourceManager.java |  7 +----
 .../impl/DataNodeInternalRPCServiceImpl.java       |  4 +--
 4 files changed, 14 insertions(+), 42 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 34e99e09fb5..e54203f0f7c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.pipe.task.PipeBuilder;
 import org.apache.iotdb.db.pipe.task.PipeTask;
 import org.apache.iotdb.db.pipe.task.PipeTaskBuilder;
 import org.apache.iotdb.db.pipe.task.PipeTaskManager;
+import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
@@ -776,7 +777,13 @@ public class PipeTaskAgent {
 
   ///////////////////////// Heartbeat /////////////////////////
 
-  public synchronized void collectPipeMetaList(THeartbeatResp resp) throws 
TException {
+  public synchronized void collectPipeMetaList(THeartbeatReq req, 
THeartbeatResp resp)
+      throws TException {
+    // If the pipe heartbeat is separated from the cluster heartbeat, then the 
lock doesn't
+    // need to be acquired
+    if (!req.isNeedPipeMetaList()) {
+      return;
+    }
     // Try the lock instead of directly acquire it to prevent the block of the 
cluster heartbeat
     // 10s is the half of the HEARTBEAT_TIMEOUT_TIME defined in class 
BaseNodeCache in ConfigNode
     if (!tryReadLockWithTimeOut(10)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index dc084a45d9c..8a971bf46cf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -42,8 +42,6 @@ import java.io.IOException;
 import java.time.ZoneId;
 import java.util.ArrayDeque;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Queue;
 import java.util.stream.Collectors;
 
@@ -58,9 +56,6 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileExtractor.class);
 
-  private static final Map<Integer, Long> 
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP = new HashMap<>();
-  private static final long PIPE_MIN_FLUSH_INTERVAL_IN_MS = 2000;
-
   private PipeTaskMeta pipeTaskMeta;
   private ProgressIndex startIndex;
 
@@ -90,9 +85,6 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     startIndex = environment.getPipeTaskMeta().getProgressIndex();
 
     dataRegionId = environment.getRegionId();
-    synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
-      DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.putIfAbsent(dataRegionId, 0L);
-    }
 
     pattern = parameters.getStringOrDefault(EXTRACTOR_PATTERN_KEY, 
EXTRACTOR_PATTERN_DEFAULT_VALUE);
 
@@ -141,14 +133,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     // If we don't invoke flushDataRegionAllTsFiles() in the realtime only 
mode, the data generated
     // between the creation time of the pipe the time when the pipe starts 
will be lost.
     if (historicalDataExtractionTimeLowerBound != Long.MIN_VALUE) {
-      synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
-        final long lastFlushedByPipeTime =
-            DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
-        if (System.currentTimeMillis() - lastFlushedByPipeTime >= 
PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
-          flushDataRegionAllTsFiles();
-          DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, 
System.currentTimeMillis());
-        }
-      }
+      flushDataRegionAllTsFiles();
     }
   }
 
@@ -178,14 +163,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
 
     dataRegion.writeLock("Pipe: start to extract historical TsFile");
     try {
-      synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
-        final long lastFlushedByPipeTime =
-            DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
-        if (System.currentTimeMillis() - lastFlushedByPipeTime >= 
PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
-          dataRegion.syncCloseAllWorkingTsFileProcessors();
-          DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, 
System.currentTimeMillis());
-        }
-      }
+      dataRegion.syncCloseAllWorkingTsFileProcessors();
 
       final TsFileManager tsFileManager = dataRegion.getTsFileManager();
       tsFileManager.readLock();
@@ -196,10 +174,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
             tsFileManager.getTsFileList(true).stream()
                 .filter(
                     resource ->
-                        // Some resource may be not closed due to the control 
of
-                        // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore 
them.
-                        resource.isClosed()
-                            && 
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
+                        
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
                             && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
                 .map(
@@ -218,10 +193,7 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
             tsFileManager.getTsFileList(false).stream()
                 .filter(
                     resource ->
-                        // Some resource may be not closed due to the control 
of
-                        // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore 
them.
-                        resource.isClosed()
-                            && 
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
+                        
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
                             && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
                 .map(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 4f6b4b1dde2..08f1e7db2e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -34,9 +34,6 @@ public class PipeTsFileResourceManager {
 
   private final Map<String, Integer> hardlinkOrCopiedFileToReferenceMap = new 
HashMap<>();
 
-  /** Cache the File objects here to avoid redundancy */
-  private final Map<String, File> fileNameToFileMap = new HashMap<>();
-
   /**
    * given a file, create a hardlink or copy it to pipe dir, maintain a 
reference count for the
    * hardlink or copied file, and return the hardlink or copied file.
@@ -66,14 +63,13 @@ public class PipeTsFileResourceManager {
     // file in pipe dir. if so, increase reference count and return it
     final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file);
     if (increaseReferenceIfExists(hardlinkOrCopiedFile.getPath())) {
-      return fileNameToFileMap.get(hardlinkOrCopiedFile.getPath());
+      return hardlinkOrCopiedFile;
     }
 
     // if the file is not a hardlink or copied file, and there is no related 
hardlink or copied
     // file in pipe dir, create a hardlink or copy it to pipe dir, maintain a 
reference count for
     // the hardlink or copied file, and return the hardlink or copied file.
     hardlinkOrCopiedFileToReferenceMap.put(hardlinkOrCopiedFile.getPath(), 1);
-    fileNameToFileMap.put(hardlinkOrCopiedFile.getPath(), 
hardlinkOrCopiedFile);
     // if the file is a tsfile, create a hardlink in pipe dir and return it.
     // otherwise, copy the file (.mod or .resource) to pipe dir and return it.
     return isTsFile
@@ -165,7 +161,6 @@ public class PipeTsFileResourceManager {
     if (updatedReference != null && updatedReference == 0) {
       Files.deleteIfExists(hardlinkOrCopiedFile.toPath());
       
hardlinkOrCopiedFileToReferenceMap.remove(hardlinkOrCopiedFile.getPath());
-      fileNameToFileMap.remove(hardlinkOrCopiedFile.getPath());
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index cd3193f1abb..30dacbd6233 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1206,9 +1206,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     
SchemaEngine.getInstance().updateAndFillSchemaCountMap(req.schemaQuotaCount, 
resp);
 
     // Update pipe meta if necessary
-    if (req.isNeedPipeMetaList()) {
-      PipeAgent.task().collectPipeMetaList(resp);
-    }
+    PipeAgent.task().collectPipeMetaList(req, resp);
 
     return resp;
   }

Reply via email to