This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1a3568ff2a0 Pipe: Improve performance for 10000+ pipes (#11021)
1a3568ff2a0 is described below
commit 1a3568ff2a0a812a6f2b6867fb1261a85978e0dd
Author: 马子坤 <[email protected]>
AuthorDate: Tue Sep 5 10:57:41 2023 +0800
Pipe: Improve performance for 10000+ pipes (#11021)
Improvements:
* Cache the File objects to reduce the size of TsFileInsertionEvents.
* Limit the frequency of flushing TsFiles when cocurrently creating
historical extractors.
* If cluster heartbeat doesn't need PipeMetaList, do not get into
PipeTaskAgent. This will prevent datanode from turning UNKNOWN when many pipes
are being created.
---------
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 9 +-----
.../PipeHistoricalDataRegionTsFileExtractor.java | 36 +++++++++++++++++++---
.../resource/tsfile/PipeTsFileResourceManager.java | 7 ++++-
.../impl/DataNodeInternalRPCServiceImpl.java | 4 ++-
4 files changed, 42 insertions(+), 14 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index e54203f0f7c..34e99e09fb5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -36,7 +36,6 @@ import org.apache.iotdb.db.pipe.task.PipeBuilder;
import org.apache.iotdb.db.pipe.task.PipeTask;
import org.apache.iotdb.db.pipe.task.PipeTaskBuilder;
import org.apache.iotdb.db.pipe.task.PipeTaskManager;
-import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
@@ -777,13 +776,7 @@ public class PipeTaskAgent {
///////////////////////// Heartbeat /////////////////////////
- public synchronized void collectPipeMetaList(THeartbeatReq req,
THeartbeatResp resp)
- throws TException {
- // If the pipe heartbeat is separated from the cluster heartbeat, then the
lock doesn't
- // need to be acquired
- if (!req.isNeedPipeMetaList()) {
- return;
- }
+ public synchronized void collectPipeMetaList(THeartbeatResp resp) throws
TException {
// Try the lock instead of directly acquire it to prevent the block of the
cluster heartbeat
// 10s is the half of the HEARTBEAT_TIMEOUT_TIME defined in class
BaseNodeCache in ConfigNode
if (!tryReadLockWithTimeOut(10)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index c522cd6eb6a..ee7be9e1452 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -42,6 +42,8 @@ import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayDeque;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Queue;
import java.util.stream.Collectors;
@@ -56,6 +58,9 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileExtractor.class);
+ private static final Map<Integer, Long>
DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP = new HashMap<>();
+ private static final long PIPE_MIN_FLUSH_INTERVAL_IN_MS = 2000;
+
private PipeTaskMeta pipeTaskMeta;
private ProgressIndex startIndex;
@@ -85,6 +90,9 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
startIndex = environment.getPipeTaskMeta().getProgressIndex();
dataRegionId = environment.getRegionId();
+ synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
+ DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.putIfAbsent(dataRegionId, 0L);
+ }
pattern = parameters.getStringOrDefault(EXTRACTOR_PATTERN_KEY,
EXTRACTOR_PATTERN_DEFAULT_VALUE);
@@ -133,7 +141,14 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
// If we don't invoke flushDataRegionAllTsFiles() in the realtime only
mode, the data generated
// between the creation time of the pipe the time when the pipe starts
will be lost.
if (historicalDataExtractionTimeLowerBound != Long.MIN_VALUE) {
- flushDataRegionAllTsFiles();
+ synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
+ final long lastFlushedByPipeTime =
+ DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
+ if (System.currentTimeMillis() - lastFlushedByPipeTime >=
PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
+ flushDataRegionAllTsFiles();
+ DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId,
System.currentTimeMillis());
+ }
+ }
}
}
@@ -163,7 +178,14 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
dataRegion.writeLock("Pipe: start to extract historical TsFile");
try {
- dataRegion.syncCloseAllWorkingTsFileProcessors();
+ synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) {
+ final long lastFlushedByPipeTime =
+ DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId);
+ if (System.currentTimeMillis() - lastFlushedByPipeTime >=
PIPE_MIN_FLUSH_INTERVAL_IN_MS) {
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+ DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId,
System.currentTimeMillis());
+ }
+ }
final TsFileManager tsFileManager = dataRegion.getTsFileManager();
tsFileManager.readLock();
@@ -174,7 +196,10 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
tsFileManager.getTsFileList(true).stream()
.filter(
resource ->
-
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
+ // Some resource may be not closed due to the control
of
+ // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore
them.
+ resource.isClosed()
+ &&
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
&&
isTsFileResourceOverlappedWithTimeRange(resource)
&&
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
.map(
@@ -193,7 +218,10 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
tsFileManager.getTsFileList(false).stream()
.filter(
resource ->
-
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
+ // Some resource may be not closed due to the control
of
+ // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore
them.
+ resource.isClosed()
+ &&
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
&&
isTsFileResourceOverlappedWithTimeRange(resource)
&&
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
.map(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
index 08f1e7db2e1..4f6b4b1dde2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java
@@ -34,6 +34,9 @@ public class PipeTsFileResourceManager {
private final Map<String, Integer> hardlinkOrCopiedFileToReferenceMap = new
HashMap<>();
+ /** Cache the File objects here to avoid redundancy */
+ private final Map<String, File> fileNameToFileMap = new HashMap<>();
+
/**
* given a file, create a hardlink or copy it to pipe dir, maintain a
reference count for the
* hardlink or copied file, and return the hardlink or copied file.
@@ -63,13 +66,14 @@ public class PipeTsFileResourceManager {
// file in pipe dir. if so, increase reference count and return it
final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file);
if (increaseReferenceIfExists(hardlinkOrCopiedFile.getPath())) {
- return hardlinkOrCopiedFile;
+ return fileNameToFileMap.get(hardlinkOrCopiedFile.getPath());
}
// if the file is not a hardlink or copied file, and there is no related
hardlink or copied
// file in pipe dir, create a hardlink or copy it to pipe dir, maintain a
reference count for
// the hardlink or copied file, and return the hardlink or copied file.
hardlinkOrCopiedFileToReferenceMap.put(hardlinkOrCopiedFile.getPath(), 1);
+ fileNameToFileMap.put(hardlinkOrCopiedFile.getPath(),
hardlinkOrCopiedFile);
// if the file is a tsfile, create a hardlink in pipe dir and return it.
// otherwise, copy the file (.mod or .resource) to pipe dir and return it.
return isTsFile
@@ -161,6 +165,7 @@ public class PipeTsFileResourceManager {
if (updatedReference != null && updatedReference == 0) {
Files.deleteIfExists(hardlinkOrCopiedFile.toPath());
hardlinkOrCopiedFileToReferenceMap.remove(hardlinkOrCopiedFile.getPath());
+ fileNameToFileMap.remove(hardlinkOrCopiedFile.getPath());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 80589eacb39..0ba27aac9c6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1206,7 +1206,9 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
SchemaEngine.getInstance().updateAndFillSchemaCountMap(req.schemaQuotaCount,
resp);
// Update pipe meta if necessary
- PipeAgent.task().collectPipeMetaList(req, resp);
+ if (req.isNeedPipeMetaList()) {
+ PipeAgent.task().collectPipeMetaList(resp);
+ }
return resp;
}