This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch 10000-pipes-1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9e6ad3868a148ae52ed71849fc56c233e2faa316
Author: 马子坤 <[email protected]>
AuthorDate: Tue Sep 5 10:57:41 2023 +0800

    Pipe: Improve performance for 10000+ pipes (#11021)
    
    Improvements:
    
    * Cache the File objects to reduce the size of TsFileInsertionEvents.
    
    * Limit the frequency of flushing TsFiles when cocurrently creating 
historical extractors.
    
    * If cluster heartbeat doesn't need PipeMetaList, do not get into 
PipeTaskAgent. This will prevent datanode from turning UNKNOWN when many pipes 
are being created.
    
    ---------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
    (cherry picked from commit 1a3568ff2a0a812a6f2b6867fb1261a85978e0dd)
---
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |  9 +-----
 .../PipeHistoricalDataRegionTsFileExtractor.java   | 36 +++++++++++++++++++---
 .../resource/tsfile/PipeTsFileResourceManager.java |  7 ++++-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  4 ++-
 4 files changed, 42 insertions(+), 14 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index e54203f0f7c..34e99e09fb5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -36,7 +36,6 @@ import org.apache.iotdb.db.pipe.task.PipeBuilder;
 import org.apache.iotdb.db.pipe.task.PipeTask;
 import org.apache.iotdb.db.pipe.task.PipeTaskBuilder;
 import org.apache.iotdb.db.pipe.task.PipeTaskManager;
-import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
@@ -777,13 +776,7 @@ public class PipeTaskAgent {
 
   ///////////////////////// Heartbeat /////////////////////////
 
-  public synchronized void collectPipeMetaList(THeartbeatReq req, 
THeartbeatResp resp)
-      throws TException {
-    // If the pipe heartbeat is separated from the cluster heartbeat, then the 
lock doesn't
-    // need to be acquired
-    if (!req.isNeedPipeMetaList()) {
-      return;
-    }
+  public synchronized void collectPipeMetaList(THeartbeatResp resp) throws 
TException {
     // Try the lock instead of directly acquire it to prevent the block of the 
cluster heartbeat
     // 10s is the half of the HEARTBEAT_TIMEOUT_TIME defined in class 
BaseNodeCache in ConfigNode
     if (!tryReadLockWithTimeOut(10)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index c522cd6eb6a..ee7be9e1452 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -42,6 +42,8 @@ import java.io.IOException;
 import java.time.ZoneId;
 import java.util.ArrayDeque;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Queue;
 import java.util.stream.Collectors;
 
@@ -56,6 +58,9 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileExtractor.class);
 
+  private static final Map<Integer, Long> 
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP = new HashMap<>();
+  private static final long PIPE_MIN_FLUSH_INTERVAL_IN_MS = 2000;
+
   private PipeTaskMeta pipeTaskMeta;
   private ProgressIndex startIndex;
 
@@ -85,6 +90,9 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     startIndex = environment.getPipeTaskMeta().getProgressIndex();
 
     dataRegionId = environment.getRegionId();
+    synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
+      DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.putIfAbsent(dataRegionId, 0L);
+    }
 
     pattern = parameters.getStringOrDefault(EXTRACTOR_PATTERN_KEY, 
EXTRACTOR_PATTERN_DEFAULT_VALUE);
 
@@ -133,7 +141,14 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
     // If we don't invoke flushDataRegionAllTsFiles() in the realtime only 
mode, the data generated
     // between the creation time of the pipe the time when the pipe starts 
will be lost.
     if (historicalDataExtractionTimeLowerBound != Long.MIN_VALUE) {
-      flushDataRegionAllTsFiles();
+      synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
+        final long lastFlushedByPipeTime =
+            DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
+        if (System.currentTimeMillis() - lastFlushedByPipeTime >= 
PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
+          flushDataRegionAllTsFiles();
+          DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, 
System.currentTimeMillis());
+        }
+      }
     }
   }
 
@@ -163,7 +178,14 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
 
     dataRegion.writeLock("Pipe: start to extract historical TsFile");
     try {
-      dataRegion.syncCloseAllWorkingTsFileProcessors();
+      synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
+        final long lastFlushedByPipeTime =
+            DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
+        if (System.currentTimeMillis() - lastFlushedByPipeTime >= 
PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
+          dataRegion.syncCloseAllWorkingTsFileProcessors();
+          DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, 
System.currentTimeMillis());
+        }
+      }
 
       final TsFileManager tsFileManager = dataRegion.getTsFileManager();
       tsFileManager.readLock();
@@ -174,7 +196,10 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
             tsFileManager.getTsFileList(true).stream()
                 .filter(
                     resource ->
-                        
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
+                        // Some resource may be not closed due to the control 
of
+                        // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore 
them.
+                        resource.isClosed()
+                            && 
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
                             && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
                 .map(
@@ -193,7 +218,10 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
             tsFileManager.getTsFileList(false).stream()
                 .filter(
                     resource ->
-                        
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
+                        // Some resource may be not closed due to the control 
of
+                        // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore 
them.
+                        resource.isClosed()
+                            && 
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
                             && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
                 .map(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 08f1e7db2e1..4f6b4b1dde2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -34,6 +34,9 @@ public class PipeTsFileResourceManager {
 
   private final Map<String, Integer> hardlinkOrCopiedFileToReferenceMap = new 
HashMap<>();
 
+  /** Cache the File objects here to avoid redundancy */
+  private final Map<String, File> fileNameToFileMap = new HashMap<>();
+
   /**
    * given a file, create a hardlink or copy it to pipe dir, maintain a 
reference count for the
    * hardlink or copied file, and return the hardlink or copied file.
@@ -63,13 +66,14 @@ public class PipeTsFileResourceManager {
     // file in pipe dir. if so, increase reference count and return it
     final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file);
     if (increaseReferenceIfExists(hardlinkOrCopiedFile.getPath())) {
-      return hardlinkOrCopiedFile;
+      return fileNameToFileMap.get(hardlinkOrCopiedFile.getPath());
     }
 
     // if the file is not a hardlink or copied file, and there is no related 
hardlink or copied
     // file in pipe dir, create a hardlink or copy it to pipe dir, maintain a 
reference count for
     // the hardlink or copied file, and return the hardlink or copied file.
     hardlinkOrCopiedFileToReferenceMap.put(hardlinkOrCopiedFile.getPath(), 1);
+    fileNameToFileMap.put(hardlinkOrCopiedFile.getPath(), 
hardlinkOrCopiedFile);
     // if the file is a tsfile, create a hardlink in pipe dir and return it.
     // otherwise, copy the file (.mod or .resource) to pipe dir and return it.
     return isTsFile
@@ -161,6 +165,7 @@ public class PipeTsFileResourceManager {
     if (updatedReference != null && updatedReference == 0) {
       Files.deleteIfExists(hardlinkOrCopiedFile.toPath());
       
hardlinkOrCopiedFileToReferenceMap.remove(hardlinkOrCopiedFile.getPath());
+      fileNameToFileMap.remove(hardlinkOrCopiedFile.getPath());
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 80589eacb39..0ba27aac9c6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1206,7 +1206,9 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     
SchemaEngine.getInstance().updateAndFillSchemaCountMap(req.schemaQuotaCount, 
resp);
 
     // Update pipe meta if necessary
-    PipeAgent.task().collectPipeMetaList(req, resp);
+    if (req.isNeedPipeMetaList()) {
+      PipeAgent.task().collectPipeMetaList(resp);
+    }
 
     return resp;
   }

Reply via email to