This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch 10000-pipes-1.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9e6ad3868a148ae52ed71849fc56c233e2faa316 Author: 马子坤 <[email protected]> AuthorDate: Tue Sep 5 10:57:41 2023 +0800 Pipe: Improve performance for 10000+ pipes (#11021) Improvements: * Cache the File objects to reduce the size of TsFileInsertionEvents. * Limit the frequency of flushing TsFiles when cocurrently creating historical extractors. * If cluster heartbeat doesn't need PipeMetaList, do not get into PipeTaskAgent. This will prevent datanode from turning UNKNOWN when many pipes are being created. --------- Co-authored-by: Steve Yurong Su <[email protected]> (cherry picked from commit 1a3568ff2a0a812a6f2b6867fb1261a85978e0dd) --- .../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 9 +----- .../PipeHistoricalDataRegionTsFileExtractor.java | 36 +++++++++++++++++++--- .../resource/tsfile/PipeTsFileResourceManager.java | 7 ++++- .../impl/DataNodeInternalRPCServiceImpl.java | 4 ++- 4 files changed, 42 insertions(+), 14 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java index e54203f0f7c..34e99e09fb5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java @@ -36,7 +36,6 @@ import org.apache.iotdb.db.pipe.task.PipeBuilder; import org.apache.iotdb.db.pipe.task.PipeTask; import org.apache.iotdb.db.pipe.task.PipeTaskBuilder; import org.apache.iotdb.db.pipe.task.PipeTaskManager; -import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq; import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; @@ -777,13 +776,7 @@ public class PipeTaskAgent { ///////////////////////// Heartbeat ///////////////////////// - public synchronized void collectPipeMetaList(THeartbeatReq req, THeartbeatResp resp) - throws TException { - // If the pipe heartbeat is separated from the cluster heartbeat, then the lock doesn't - // need to be acquired - if (!req.isNeedPipeMetaList()) { - return; - } + public synchronized void collectPipeMetaList(THeartbeatResp resp) throws TException { // Try the lock instead of directly acquire it to prevent the block of the cluster heartbeat // 10s is the half of the HEARTBEAT_TIMEOUT_TIME defined in class BaseNodeCache in ConfigNode if (!tryReadLockWithTimeOut(10)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java index c522cd6eb6a..ee7be9e1452 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -42,6 +42,8 @@ import java.io.IOException; import java.time.ZoneId; import java.util.ArrayDeque; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Queue; import java.util.stream.Collectors; @@ -56,6 +58,9 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa private static final Logger LOGGER = LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileExtractor.class); + private static final Map<Integer, Long> DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP = new HashMap<>(); + private static final long PIPE_MIN_FLUSH_INTERVAL_IN_MS = 2000; + private PipeTaskMeta pipeTaskMeta; private ProgressIndex startIndex; @@ -85,6 +90,9 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa startIndex = environment.getPipeTaskMeta().getProgressIndex(); dataRegionId = environment.getRegionId(); + synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) { + DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.putIfAbsent(dataRegionId, 0L); + } pattern = parameters.getStringOrDefault(EXTRACTOR_PATTERN_KEY, EXTRACTOR_PATTERN_DEFAULT_VALUE); @@ -133,7 +141,14 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa // If we don't invoke flushDataRegionAllTsFiles() in the realtime only mode, the data generated // between the creation time of the pipe the time when the pipe starts will be lost. if (historicalDataExtractionTimeLowerBound != Long.MIN_VALUE) { - flushDataRegionAllTsFiles(); + synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) { + final long lastFlushedByPipeTime = + DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId); + if (System.currentTimeMillis() - lastFlushedByPipeTime >= PIPE_MIN_FLUSH_INTERVAL_IN_MS) { + flushDataRegionAllTsFiles(); + DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, System.currentTimeMillis()); + } + } } } @@ -163,7 +178,14 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa dataRegion.writeLock("Pipe: start to extract historical TsFile"); try { - dataRegion.syncCloseAllWorkingTsFileProcessors(); + synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) { + final long lastFlushedByPipeTime = + DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId); + if (System.currentTimeMillis() - lastFlushedByPipeTime >= PIPE_MIN_FLUSH_INTERVAL_IN_MS) { + dataRegion.syncCloseAllWorkingTsFileProcessors(); + DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, System.currentTimeMillis()); + } + } final TsFileManager tsFileManager = dataRegion.getTsFileManager(); tsFileManager.readLock(); @@ -174,7 +196,10 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa tsFileManager.getTsFileList(true).stream() .filter( resource -> - !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()) + // Some resource may be not closed due to the control of + // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them. + resource.isClosed() + && !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()) && isTsFileResourceOverlappedWithTimeRange(resource) && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) .map( @@ -193,7 +218,10 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa tsFileManager.getTsFileList(false).stream() .filter( resource -> - !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()) + // Some resource may be not closed due to the control of + // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them. + resource.isClosed() + && !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()) && isTsFileResourceOverlappedWithTimeRange(resource) && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) .map( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java index 08f1e7db2e1..4f6b4b1dde2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java @@ -34,6 +34,9 @@ public class PipeTsFileResourceManager { private final Map<String, Integer> hardlinkOrCopiedFileToReferenceMap = new HashMap<>(); + /** Cache the File objects here to avoid redundancy */ + private final Map<String, File> fileNameToFileMap = new HashMap<>(); + /** * given a file, create a hardlink or copy it to pipe dir, maintain a reference count for the * hardlink or copied file, and return the hardlink or copied file. @@ -63,13 +66,14 @@ public class PipeTsFileResourceManager { // file in pipe dir. if so, increase reference count and return it final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file); if (increaseReferenceIfExists(hardlinkOrCopiedFile.getPath())) { - return hardlinkOrCopiedFile; + return fileNameToFileMap.get(hardlinkOrCopiedFile.getPath()); } // if the file is not a hardlink or copied file, and there is no related hardlink or copied // file in pipe dir, create a hardlink or copy it to pipe dir, maintain a reference count for // the hardlink or copied file, and return the hardlink or copied file. hardlinkOrCopiedFileToReferenceMap.put(hardlinkOrCopiedFile.getPath(), 1); + fileNameToFileMap.put(hardlinkOrCopiedFile.getPath(), hardlinkOrCopiedFile); // if the file is a tsfile, create a hardlink in pipe dir and return it. // otherwise, copy the file (.mod or .resource) to pipe dir and return it. return isTsFile @@ -161,6 +165,7 @@ public class PipeTsFileResourceManager { if (updatedReference != null && updatedReference == 0) { Files.deleteIfExists(hardlinkOrCopiedFile.toPath()); hardlinkOrCopiedFileToReferenceMap.remove(hardlinkOrCopiedFile.getPath()); + fileNameToFileMap.remove(hardlinkOrCopiedFile.getPath()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 80589eacb39..0ba27aac9c6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -1206,7 +1206,9 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface SchemaEngine.getInstance().updateAndFillSchemaCountMap(req.schemaQuotaCount, resp); // Update pipe meta if necessary - PipeAgent.task().collectPipeMetaList(req, resp); + if (req.isNeedPipeMetaList()) { + PipeAgent.task().collectPipeMetaList(resp); + } return resp; }
