This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch rc/1.2.2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 658a6170d75d77ac314ee81e73936623c1228312 Author: HTHou <[email protected]> AuthorDate: Tue Sep 26 19:15:16 2023 +0800 Revert "Pipe: Improve performance for 10000+ pipes (#11021) (#11051)" This reverts commit a0f89ab9ecdcc7f27b1c176d46e6c25c4ba6885f. --- .../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 9 +++++- .../PipeHistoricalDataRegionTsFileExtractor.java | 36 +++------------------- .../resource/tsfile/PipeTsFileResourceManager.java | 7 +---- .../impl/DataNodeInternalRPCServiceImpl.java | 4 +-- 4 files changed, 14 insertions(+), 42 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java index 34e99e09fb5..e54203f0f7c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.pipe.task.PipeBuilder; import org.apache.iotdb.db.pipe.task.PipeTask; import org.apache.iotdb.db.pipe.task.PipeTaskBuilder; import org.apache.iotdb.db.pipe.task.PipeTaskManager; +import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq; import org.apache.iotdb.mpp.rpc.thrift.THeartbeatResp; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp; @@ -776,7 +777,13 @@ public class PipeTaskAgent { ///////////////////////// Heartbeat ///////////////////////// - public synchronized void collectPipeMetaList(THeartbeatResp resp) throws TException { + public synchronized void collectPipeMetaList(THeartbeatReq req, THeartbeatResp resp) + throws TException { + // If the pipe heartbeat is separated from the cluster heartbeat, then the lock doesn't + // need to be acquired + if (!req.isNeedPipeMetaList()) { + return; + } // Try the lock instead of directly acquire it to prevent the block of the cluster heartbeat // 10s is the half of the HEARTBEAT_TIMEOUT_TIME defined in class BaseNodeCache in ConfigNode if (!tryReadLockWithTimeOut(10)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java index dc084a45d9c..8a971bf46cf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java @@ -42,8 +42,6 @@ import java.io.IOException; import java.time.ZoneId; import java.util.ArrayDeque; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; import java.util.Queue; import java.util.stream.Collectors; @@ -58,9 +56,6 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa private static final Logger LOGGER = LoggerFactory.getLogger(PipeHistoricalDataRegionTsFileExtractor.class); - private static final Map<Integer, Long> DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP = new HashMap<>(); - private static final long PIPE_MIN_FLUSH_INTERVAL_IN_MS = 2000; - private PipeTaskMeta pipeTaskMeta; private ProgressIndex startIndex; @@ -90,9 +85,6 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa startIndex = environment.getPipeTaskMeta().getProgressIndex(); dataRegionId = environment.getRegionId(); - synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) { - DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.putIfAbsent(dataRegionId, 0L); - } pattern = parameters.getStringOrDefault(EXTRACTOR_PATTERN_KEY, EXTRACTOR_PATTERN_DEFAULT_VALUE); @@ -141,14 +133,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa // If we don't invoke flushDataRegionAllTsFiles() in the realtime only mode, the data generated // between the creation time of the pipe the time when the pipe starts will be lost. if (historicalDataExtractionTimeLowerBound != Long.MIN_VALUE) { - synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) { - final long lastFlushedByPipeTime = - DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId); - if (System.currentTimeMillis() - lastFlushedByPipeTime >= PIPE_MIN_FLUSH_INTERVAL_IN_MS) { - flushDataRegionAllTsFiles(); - DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, System.currentTimeMillis()); - } - } + flushDataRegionAllTsFiles(); } } @@ -178,14 +163,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa dataRegion.writeLock("Pipe: start to extract historical TsFile"); try { - synchronized (DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP) { - final long lastFlushedByPipeTime = - DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.get(dataRegionId); - if (System.currentTimeMillis() - lastFlushedByPipeTime >= PIPE_MIN_FLUSH_INTERVAL_IN_MS) { - dataRegion.syncCloseAllWorkingTsFileProcessors(); - DATA_REGION_ID_TO_PIPE_FLUSHED_TIME_MAP.replace(dataRegionId, System.currentTimeMillis()); - } - } + dataRegion.syncCloseAllWorkingTsFileProcessors(); final TsFileManager tsFileManager = dataRegion.getTsFileManager(); tsFileManager.readLock(); @@ -196,10 +174,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa tsFileManager.getTsFileList(true).stream() .filter( resource -> - // Some resource may be not closed due to the control of - // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them. - resource.isClosed() - && !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()) + !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()) && isTsFileResourceOverlappedWithTimeRange(resource) && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) .map( @@ -218,10 +193,7 @@ public class PipeHistoricalDataRegionTsFileExtractor implements PipeHistoricalDa tsFileManager.getTsFileList(false).stream() .filter( resource -> - // Some resource may be not closed due to the control of - // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore them. - resource.isClosed() - && !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()) + !startIndex.isAfter(resource.getMaxProgressIndexAfterClose()) && isTsFileResourceOverlappedWithTimeRange(resource) && isTsFileGeneratedAfterExtractionTimeLowerBound(resource)) .map( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java index 4f6b4b1dde2..08f1e7db2e1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java @@ -34,9 +34,6 @@ public class PipeTsFileResourceManager { private final Map<String, Integer> hardlinkOrCopiedFileToReferenceMap = new HashMap<>(); - /** Cache the File objects here to avoid redundancy */ - private final Map<String, File> fileNameToFileMap = new HashMap<>(); - /** * given a file, create a hardlink or copy it to pipe dir, maintain a reference count for the * hardlink or copied file, and return the hardlink or copied file. @@ -66,14 +63,13 @@ public class PipeTsFileResourceManager { // file in pipe dir. if so, increase reference count and return it final File hardlinkOrCopiedFile = getHardlinkOrCopiedFileInPipeDir(file); if (increaseReferenceIfExists(hardlinkOrCopiedFile.getPath())) { - return fileNameToFileMap.get(hardlinkOrCopiedFile.getPath()); + return hardlinkOrCopiedFile; } // if the file is not a hardlink or copied file, and there is no related hardlink or copied // file in pipe dir, create a hardlink or copy it to pipe dir, maintain a reference count for // the hardlink or copied file, and return the hardlink or copied file. hardlinkOrCopiedFileToReferenceMap.put(hardlinkOrCopiedFile.getPath(), 1); - fileNameToFileMap.put(hardlinkOrCopiedFile.getPath(), hardlinkOrCopiedFile); // if the file is a tsfile, create a hardlink in pipe dir and return it. // otherwise, copy the file (.mod or .resource) to pipe dir and return it. return isTsFile @@ -165,7 +161,6 @@ public class PipeTsFileResourceManager { if (updatedReference != null && updatedReference == 0) { Files.deleteIfExists(hardlinkOrCopiedFile.toPath()); hardlinkOrCopiedFileToReferenceMap.remove(hardlinkOrCopiedFile.getPath()); - fileNameToFileMap.remove(hardlinkOrCopiedFile.getPath()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index cd3193f1abb..30dacbd6233 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -1206,9 +1206,7 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface SchemaEngine.getInstance().updateAndFillSchemaCountMap(req.schemaQuotaCount, resp); // Update pipe meta if necessary - if (req.isNeedPipeMetaList()) { - PipeAgent.task().collectPipeMetaList(resp); - } + PipeAgent.task().collectPipeMetaList(req, resp); return resp; }
