This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch potential-leak in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 29804ff25169f05d98f99d3591056b20a17836d8 Author: Caideyipi <[email protected]> AuthorDate: Thu May 14 15:33:50 2026 +0800 leak_fix --- .../overview/PipeDataNodeSinglePipeMetrics.java | 135 ++++++++++--------- .../overview/PipeTsFileToTabletsMetrics.java | 18 ++- .../commons/pipe/agent/task/PipeTaskAgent.java | 102 +++++++++++--- .../commons/pipe/agent/task/PipeTaskAgentTest.java | 149 +++++++++++++++++++++ 4 files changed, 318 insertions(+), 86 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java index 6535d371a91..b9d90fd8a68 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeSinglePipeMetrics.java @@ -139,12 +139,14 @@ public class PipeDataNodeSinglePipeMetrics implements IMetricSet { } private void removeMetrics(final String pipeID) { - removeAutoGauge(pipeID); + final PipeDataNodeRemainingEventAndTimeOperator operator = + remainingEventAndTimeOperatorMap.remove(pipeID); + if (Objects.nonNull(operator) && Objects.nonNull(metricService)) { + removeAutoGauge(operator); + } } - private void removeAutoGauge(final String pipeID) { - final PipeDataNodeRemainingEventAndTimeOperator operator = - remainingEventAndTimeOperatorMap.get(pipeID); + private void removeAutoGauge(final PipeDataNodeRemainingEventAndTimeOperator operator) { metricService.remove( MetricType.AUTO_GAUGE, Metric.PIPE_DATANODE_REMAINING_EVENT_COUNT.toString(), @@ -190,14 +192,31 @@ public class PipeDataNodeSinglePipeMetrics implements IMetricSet { Metric.PIPE_TSFILE_EVENT_TRANSFER_TIME.toString(), Tag.NAME.toString(), operator.getPipeName()); - remainingEventAndTimeOperatorMap.remove(pipeID); } //////////////////////////// register & deregister (pipe integration) //////////////////////////// + private static String generatePipeID(final String pipeName, final long creationTime) { + return pipeName + "_" + creationTime; + } + + private boolean isPipeAlive(final String pipeName, final long creationTime) { + return PipeDataNodeAgent.task().getPipeCreationTime(pipeName) == creationTime; + } + + private PipeDataNodeRemainingEventAndTimeOperator getOrCreateOperatorIfPipeAlive( + final String pipeName, final long creationTime) { + if (!isPipeAlive(pipeName, creationTime)) { + return null; + } + return remainingEventAndTimeOperatorMap.computeIfAbsent( + generatePipeID(pipeName, creationTime), + key -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)); + } + public void register(final IoTDBDataRegionSource source) { // The metric is global thus the regionId is omitted - final String pipeID = source.getPipeName() + "_" + source.getCreationTime(); + final String pipeID = generatePipeID(source.getPipeName(), source.getCreationTime()); remainingEventAndTimeOperatorMap.computeIfAbsent( pipeID, k -> @@ -210,7 +229,7 @@ public class PipeDataNodeSinglePipeMetrics implements IMetricSet { public void register(final IoTDBSchemaRegionSource source) { // The metric is global thus the regionId is omitted - final String pipeID = source.getPipeName() + "_" + source.getCreationTime(); + final String pipeID = generatePipeID(source.getPipeName(), source.getCreationTime()); remainingEventAndTimeOperatorMap .computeIfAbsent( pipeID, @@ -224,19 +243,20 @@ public class PipeDataNodeSinglePipeMetrics implements IMetricSet { } public void increaseInsertNodeEventCount(final String pipeName, final long creationTime) { - remainingEventAndTimeOperatorMap - .computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)) - .increaseInsertNodeEventCount(); + final PipeDataNodeRemainingEventAndTimeOperator operator = + getOrCreateOperatorIfPipeAlive(pipeName, creationTime); + if (Objects.nonNull(operator)) { + operator.increaseInsertNodeEventCount(); + } } public void decreaseInsertNodeEventCount( final String pipeName, final long creationTime, final long transferTime) { final PipeDataNodeRemainingEventAndTimeOperator operator = - remainingEventAndTimeOperatorMap.computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)); + remainingEventAndTimeOperatorMap.get(generatePipeID(pipeName, creationTime)); + if (Objects.isNull(operator)) { + return; + } operator.decreaseInsertNodeEventCount(); @@ -247,46 +267,44 @@ public class PipeDataNodeSinglePipeMetrics implements IMetricSet { public void updateInsertNodeTransferTimer( final String pipeName, final long creationTime, final long transferTime) { - if (transferTime > 0) { - remainingEventAndTimeOperatorMap - .computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)) - .getInsertNodeTransferTimer() - .update(transferTime, TimeUnit.NANOSECONDS); + final PipeDataNodeRemainingEventAndTimeOperator operator = + remainingEventAndTimeOperatorMap.get(generatePipeID(pipeName, creationTime)); + if (Objects.nonNull(operator) && transferTime > 0) { + operator.getInsertNodeTransferTimer().update(transferTime, TimeUnit.NANOSECONDS); } } public void increaseRawTabletEventCount(final String pipeName, final long creationTime) { - remainingEventAndTimeOperatorMap - .computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)) - .increaseRawTabletEventCount(); + final PipeDataNodeRemainingEventAndTimeOperator operator = + getOrCreateOperatorIfPipeAlive(pipeName, creationTime); + if (Objects.nonNull(operator)) { + operator.increaseRawTabletEventCount(); + } } public void decreaseRawTabletEventCount(final String pipeName, final long creationTime) { - remainingEventAndTimeOperatorMap - .computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)) - .decreaseRawTabletEventCount(); + final PipeDataNodeRemainingEventAndTimeOperator operator = + remainingEventAndTimeOperatorMap.get(generatePipeID(pipeName, creationTime)); + if (Objects.nonNull(operator)) { + operator.decreaseRawTabletEventCount(); + } } public void increaseTsFileEventCount(final String pipeName, final long creationTime) { - remainingEventAndTimeOperatorMap - .computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)) - .increaseTsFileEventCount(); + final PipeDataNodeRemainingEventAndTimeOperator operator = + getOrCreateOperatorIfPipeAlive(pipeName, creationTime); + if (Objects.nonNull(operator)) { + operator.increaseTsFileEventCount(); + } } public void decreaseTsFileEventCount( final String pipeName, final long creationTime, final long transferTime) { final PipeDataNodeRemainingEventAndTimeOperator operator = - remainingEventAndTimeOperatorMap.computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)); + remainingEventAndTimeOperatorMap.get(generatePipeID(pipeName, creationTime)); + if (Objects.isNull(operator)) { + return; + } operator.decreaseTsFileEventCount(); @@ -297,30 +315,27 @@ public class PipeDataNodeSinglePipeMetrics implements IMetricSet { public void updateTsFileTransferTimer( final String pipeName, final long creationTime, final long transferTime) { - if (transferTime > 0) { - remainingEventAndTimeOperatorMap - .computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)) - .getTsFileTransferTimer() - .update(transferTime, TimeUnit.NANOSECONDS); + final PipeDataNodeRemainingEventAndTimeOperator operator = + remainingEventAndTimeOperatorMap.get(generatePipeID(pipeName, creationTime)); + if (Objects.nonNull(operator) && transferTime > 0) { + operator.getTsFileTransferTimer().update(transferTime, TimeUnit.NANOSECONDS); } } public void increaseHeartbeatEventCount(final String pipeName, final long creationTime) { - remainingEventAndTimeOperatorMap - .computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)) - .increaseHeartbeatEventCount(); + final PipeDataNodeRemainingEventAndTimeOperator operator = + getOrCreateOperatorIfPipeAlive(pipeName, creationTime); + if (Objects.nonNull(operator)) { + operator.increaseHeartbeatEventCount(); + } } public void decreaseHeartbeatEventCount(final String pipeName, final long creationTime) { - remainingEventAndTimeOperatorMap - .computeIfAbsent( - pipeName + "_" + creationTime, - k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)) - .decreaseHeartbeatEventCount(); + final PipeDataNodeRemainingEventAndTimeOperator operator = + remainingEventAndTimeOperatorMap.get(generatePipeID(pipeName, creationTime)); + if (Objects.nonNull(operator)) { + operator.decreaseHeartbeatEventCount(); + } } public void thawRate(final String pipeID) { @@ -350,9 +365,7 @@ public class PipeDataNodeSinglePipeMetrics implements IMetricSet { pipeID); return; } - if (Objects.nonNull(metricService)) { - removeMetrics(pipeID); - } + removeMetrics(pipeID); } public void markRegionCommit(final String pipeID, final boolean isDataRegion) { @@ -395,7 +408,7 @@ public class PipeDataNodeSinglePipeMetrics implements IMetricSet { final String pipeName, final long creationTime) { final PipeDataNodeRemainingEventAndTimeOperator operator = remainingEventAndTimeOperatorMap.computeIfAbsent( - pipeName + "_" + creationTime, + generatePipeID(pipeName, creationTime), k -> new PipeDataNodeRemainingEventAndTimeOperator(pipeName, creationTime)); return new Pair<>(operator.getRemainingNonHeartbeatEvents(), operator.getRemainingTime()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java index f9436377bb3..90260fd17fc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeTsFileToTabletsMetrics.java @@ -110,39 +110,47 @@ public class PipeTsFileToTabletsMetrics implements IMetricSet { } private void removeMetrics(final String pipeID) { + pipeTimerMap.remove(pipeID); metricService.remove( MetricType.TIMER, Metric.PIPE_TSFILE_TO_TABLETS_TIME.toString(), Tag.NAME.toString(), pipeID); - pipeTimerMap.remove(pipeID); + pipeRateMap.remove(pipeID); metricService.remove( MetricType.RATE, Metric.PIPE_TSFILE_TO_TABLETS_RATE.toString(), Tag.NAME.toString(), pipeID); - pipeRateMap.remove(pipeID); + pipeTabletCountMap.remove(pipeID); metricService.remove( MetricType.COUNTER, Metric.PIPE_TSFILE_TO_TABLETS_COUNT.toString(), Tag.NAME.toString(), pipeID); - pipeTabletCountMap.remove(pipeID); + pipeTabletMemoryMap.remove(pipeID); metricService.remove( MetricType.COUNTER, Metric.PIPE_TSFILE_TO_TABLETS_TOTAL_MEMORY.toString(), Tag.NAME.toString(), pipeID); - pipeTabletMemoryMap.remove(pipeID); + pipeParseFileCountMap.remove(pipeID); metricService.remove( MetricType.COUNTER, Metric.PIPE_TSFILE_PARSE_FILE_COUNT.toString(), Tag.NAME.toString(), pipeID); + } + + private void removePipeStats(final String pipeID) { + pipeTimerMap.remove(pipeID); + pipeRateMap.remove(pipeID); + pipeTabletCountMap.remove(pipeID); + pipeTabletMemoryMap.remove(pipeID); pipeParseFileCountMap.remove(pipeID); } @@ -165,6 +173,8 @@ public class PipeTsFileToTabletsMetrics implements IMetricSet { try { if (Objects.nonNull(metricService)) { removeMetrics(pipeID); + } else { + removePipeStats(pipeID); } } finally { pipe.remove(pipeID); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index 99d45298794..41efbef9c04 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -60,6 +60,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -104,10 +105,14 @@ public abstract class PipeTaskAgent { protected final PipeMetaKeeper pipeMetaKeeper; protected final PipeTaskManager pipeTaskManager; + private final Map<String, AtomicLong> pipeNameWithCreationTime2FloatingMemoryUsageInByteMap; + private final Map<String, Set<Long>> pipeName2CreationTimeSetMap; protected PipeTaskAgent() { pipeMetaKeeper = new PipeMetaKeeper(); pipeTaskManager = new PipeTaskManager(); + pipeNameWithCreationTime2FloatingMemoryUsageInByteMap = new ConcurrentHashMap<>(); + pipeName2CreationTimeSetMap = new ConcurrentHashMap<>(); // Help PipeEndPointRateLimiter to check if the pipe is still alive PipeEndPointRateLimiter.setTaskAgent(this); @@ -601,6 +606,7 @@ public abstract class PipeTaskAgent { // Remove pipe meta from pipe meta keeper pipeMetaKeeper.removePipeMeta(pipeName); + cleanupFloatingMemoryUsageCounterIfNecessary(pipeName, creationTime); return true; } @@ -639,6 +645,8 @@ public abstract class PipeTaskAgent { // Remove pipe meta from pipe meta keeper pipeMetaKeeper.removePipeMeta(pipeName); + cleanupFloatingMemoryUsageCounterIfNecessary( + pipeName, existedPipeMeta.getStaticMeta().getCreationTime()); return true; } @@ -1170,6 +1178,57 @@ public abstract class PipeTaskAgent { ///////////////////////// Maintain meta info ///////////////////////// + private static String generatePipeNameWithCreationTime( + final String pipeName, final long creationTime) { + return pipeName + "_" + creationTime; + } + + private AtomicLong getOrCreateFloatingMemoryUsageCounter( + final String pipeName, final long creationTime) { + pipeName2CreationTimeSetMap + .computeIfAbsent(pipeName, key -> ConcurrentHashMap.newKeySet()) + .add(creationTime); + return pipeNameWithCreationTime2FloatingMemoryUsageInByteMap.computeIfAbsent( + generatePipeNameWithCreationTime(pipeName, creationTime), key -> new AtomicLong(0)); + } + + private void tryCleanupFloatingMemoryUsageCounter( + final String pipeName, final long creationTime, final AtomicLong counter) { + if (counter.get() != 0) { + return; + } + + final PipeMeta currentPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); + if (Objects.nonNull(currentPipeMeta) + && currentPipeMeta.getStaticMeta().getCreationTime() == creationTime) { + return; + } + + final String pipeNameWithCreationTime = + generatePipeNameWithCreationTime(pipeName, creationTime); + pipeNameWithCreationTime2FloatingMemoryUsageInByteMap.remove(pipeNameWithCreationTime, counter); + + if (!pipeNameWithCreationTime2FloatingMemoryUsageInByteMap.containsKey( + pipeNameWithCreationTime)) { + pipeName2CreationTimeSetMap.computeIfPresent( + pipeName, + (key, creationTimes) -> { + creationTimes.remove(creationTime); + return creationTimes.isEmpty() ? null : creationTimes; + }); + } + } + + private void cleanupFloatingMemoryUsageCounterIfNecessary( + final String pipeName, final long creationTime) { + final AtomicLong counter = + pipeNameWithCreationTime2FloatingMemoryUsageInByteMap.get( + generatePipeNameWithCreationTime(pipeName, creationTime)); + if (Objects.nonNull(counter)) { + tryCleanupFloatingMemoryUsageCounter(pipeName, creationTime, counter); + } + } + public long getPipeCreationTime(final String pipeName) { final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); return pipeMeta == null ? 0 : pipeMeta.getStaticMeta().getCreationTime(); @@ -1178,7 +1237,7 @@ public abstract class PipeTaskAgent { public String getPipeNameWithCreationTime(final String pipeName, final long creationTime) { final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); return pipeMeta == null - ? pipeName + "_" + creationTime + ? generatePipeNameWithCreationTime(pipeName, creationTime) : ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()).getPipeNameWithCreationTime(); } @@ -1192,22 +1251,23 @@ public abstract class PipeTaskAgent { } public long getAllFloatingMemoryUsageInByte() { - final AtomicLong bytes = new AtomicLong(0); - pipeMetaKeeper - .getPipeMetaList() - .forEach( - pipeMeta -> - bytes.addAndGet( - ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()) - .getFloatingMemoryUsageInByte())); - return bytes.get(); + return pipeNameWithCreationTime2FloatingMemoryUsageInByteMap.values().stream() + .mapToLong(AtomicLong::get) + .sum(); } public long getFloatingMemoryUsageInByte(final String pipeName) { - final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); - return pipeMeta == null - ? 0 - : ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()).getFloatingMemoryUsageInByte(); + final Set<Long> creationTimes = pipeName2CreationTimeSetMap.get(pipeName); + if (Objects.isNull(creationTimes)) { + return 0; + } + + return creationTimes.stream() + .map(creationTime -> generatePipeNameWithCreationTime(pipeName, creationTime)) + .map(pipeNameWithCreationTime2FloatingMemoryUsageInByteMap::get) + .filter(Objects::nonNull) + .mapToLong(AtomicLong::get) + .sum(); } public void addFloatingMemoryUsageInByte( @@ -1215,18 +1275,18 @@ public abstract class PipeTaskAgent { final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); // To avoid stale pipe before alter if (Objects.nonNull(pipeMeta) && pipeMeta.getStaticMeta().getCreationTime() == creationTime) { - ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()) - .addFloatingMemoryUsageInByte(sizeInByte); + getOrCreateFloatingMemoryUsageCounter(pipeName, creationTime).addAndGet(sizeInByte); } } public void decreaseFloatingMemoryUsageInByte( final String pipeName, final long creationTime, final long sizeInByte) { - final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName); - // To avoid stale pipe before alter - if (Objects.nonNull(pipeMeta) && pipeMeta.getStaticMeta().getCreationTime() == creationTime) { - ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()) - .decreaseFloatingMemoryUsageInByte(sizeInByte); + final AtomicLong counter = + pipeNameWithCreationTime2FloatingMemoryUsageInByteMap.get( + generatePipeNameWithCreationTime(pipeName, creationTime)); + if (Objects.nonNull(counter)) { + counter.addAndGet(-sizeInByte); + tryCleanupFloatingMemoryUsageCounter(pipeName, creationTime, counter); } } diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgentTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgentTest.java new file mode 100644 index 00000000000..50a485df9d2 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgentTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.agent.task; + +import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; +import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq; + +import org.apache.thrift.TException; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +public class PipeTaskAgentTest { + + @Test + public void testFloatingMemoryUsageSurvivesDropUntilLateRelease() throws IllegalPathException { + final DummyPipeTaskAgent agent = new DummyPipeTaskAgent(); + + agent.createPipeForTest(generatePipeMeta("pipe", 1L)); + agent.addFloatingMemoryUsageInByte("pipe", 1L, 100L); + Assert.assertEquals(100L, agent.getAllFloatingMemoryUsageInByte()); + Assert.assertEquals(100L, agent.getFloatingMemoryUsageInByte("pipe")); + + agent.dropPipeForTest("pipe", 1L); + Assert.assertEquals(100L, agent.getAllFloatingMemoryUsageInByte()); + Assert.assertEquals(100L, agent.getFloatingMemoryUsageInByte("pipe")); + + agent.createPipeForTest(generatePipeMeta("pipe", 2L)); + agent.addFloatingMemoryUsageInByte("pipe", 2L, 20L); + Assert.assertEquals(120L, agent.getAllFloatingMemoryUsageInByte()); + Assert.assertEquals(120L, agent.getFloatingMemoryUsageInByte("pipe")); + + agent.decreaseFloatingMemoryUsageInByte("pipe", 1L, 100L); + Assert.assertEquals(20L, agent.getAllFloatingMemoryUsageInByte()); + Assert.assertEquals(20L, agent.getFloatingMemoryUsageInByte("pipe")); + + agent.dropPipeForTest("pipe", 2L); + agent.decreaseFloatingMemoryUsageInByte("pipe", 2L, 20L); + Assert.assertEquals(0L, agent.getAllFloatingMemoryUsageInByte()); + Assert.assertEquals(0L, agent.getFloatingMemoryUsageInByte("pipe")); + } + + @Test + public void testZeroFloatingMemoryUsageCounterIsCleanedAfterDrop() throws Exception { + final DummyPipeTaskAgent agent = new DummyPipeTaskAgent(); + + agent.createPipeForTest(generatePipeMeta("pipe", 1L)); + agent.addFloatingMemoryUsageInByte("pipe", 1L, 100L); + agent.decreaseFloatingMemoryUsageInByte("pipe", 1L, 100L); + agent.dropPipeForTest("pipe", 1L); + + Assert.assertTrue( + getMapField(agent, "pipeNameWithCreationTime2FloatingMemoryUsageInByteMap").isEmpty()); + Assert.assertTrue(getMapField(agent, "pipeName2CreationTimeSetMap").isEmpty()); + } + + @SuppressWarnings("unchecked") + private static Map<?, ?> getMapField(final PipeTaskAgent agent, final String fieldName) + throws NoSuchFieldException, IllegalAccessException { + final Field field = PipeTaskAgent.class.getDeclaredField(fieldName); + field.setAccessible(true); + return (Map<?, ?>) field.get(agent); + } + + private static PipeMeta generatePipeMeta(final String pipeName, final long creationTime) { + return new PipeMeta( + new PipeStaticMeta( + pipeName, creationTime, new HashMap<>(), new HashMap<>(), new HashMap<>()), + new PipeRuntimeMeta(new ConcurrentHashMap<>())); + } + + private static class DummyPipeTaskAgent extends PipeTaskAgent { + + private boolean createPipeForTest(final PipeMeta pipeMeta) throws IllegalPathException { + return createPipe(pipeMeta); + } + + private boolean dropPipeForTest(final String pipeName, final long creationTime) { + return dropPipe(pipeName, creationTime); + } + + @Override + protected boolean isShutdown() { + return false; + } + + @Override + protected void thawRate(final String pipeName, final long creationTime) { + // Do nothing + } + + @Override + protected void freezeRate(final String pipeName, final long creationTime) { + // Do nothing + } + + @Override + protected Map<Integer, PipeTask> buildPipeTasks(final PipeMeta pipeMetaFromCoordinator) { + return new HashMap<>(); + } + + @Override + protected void createPipeTask( + final int consensusGroupId, + final PipeStaticMeta pipeStaticMeta, + final org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta pipeTaskMeta) { + // Do nothing + } + + @Override + protected void collectPipeMetaListInternal( + final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws TException { + // Do nothing + } + + @Override + public void runPipeTasks( + final Collection<PipeTask> pipeTasks, final Consumer<PipeTask> runSingle) { + pipeTasks.forEach(runSingle); + } + } +}
