This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/add_ram_usage_ut in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a2423c62ec266476726f0f53a5719f9757c61b08 Author: Beyyes <[email protected]> AuthorDate: Mon Dec 9 10:43:19 2024 +0800 add fe ram usage ut --- .../db/queryengine/common/MPPQueryContext.java | 4 +++ .../iotdb/db/queryengine/plan/Coordinator.java | 2 +- .../planner/memory/MemoryReservationManager.java | 5 +++ .../NotThreadSafeMemoryReservationManager.java | 13 ++++--- .../distribution/AggregationDistributionTest.java | 41 ++++++++++++++++++++++ 5 files changed, 60 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index f96f4e6b4f4..ee41cfc3c48 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -344,6 +344,10 @@ public class MPPQueryContext { this.memoryReservationManager.releaseMemoryCumulatively(bytes); } + public MemoryReservationManager getMemoryReservationManager() { + return this.memoryReservationManager; + } + // endregion public boolean isTableQuery() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index c3896cd60fd..0904d34ebc7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -201,7 +201,7 @@ public class Coordinator { } execution.start(); ExecutionResult result = execution.getStatus(); - if (!execution.isQuery() && result.status != null && needRetry(result.status)) { + if (!execution.isQuery() && needRetry(result.status)) { // if it's write request and the result status needs to retry result.status.setNeedRetry(true); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java index ba55d60b096..2661c21d9e1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java @@ -43,4 +43,9 @@ public interface MemoryReservationManager { * this manager ends, Or the memory to be released in the batch may not be released correctly. */ void releaseAllReservedMemory(); + + /** + * @return the total revered memory + */ + long getReservedMemory(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java index 7d8d4b076c2..4b5561a962c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java @@ -30,7 +30,7 @@ public class NotThreadSafeMemoryReservationManager implements MemoryReservationM // bound for each batch. private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L; - private final LocalExecutionPlanner LOCAL_EXECUTION_PLANNER = LocalExecutionPlanner.getInstance(); + private final LocalExecutionPlanner localExecutionPlanner = LocalExecutionPlanner.getInstance(); private final QueryId queryId; @@ -58,7 +58,7 @@ public class NotThreadSafeMemoryReservationManager implements MemoryReservationM @Override public void reserveMemoryImmediately() { if (bytesToBeReserved != 0) { - LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( + localExecutionPlanner.reserveFromFreeMemoryForOperators( bytesToBeReserved, reservedBytesInTotal, queryId.getId(), contextHolder); reservedBytesInTotal += bytesToBeReserved; bytesToBeReserved = 0; @@ -75,7 +75,7 @@ public class NotThreadSafeMemoryReservationManager implements MemoryReservationM } else { bytesToRelease = bytesToBeReleased - bytesToBeReserved; bytesToBeReserved = 0; - LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(bytesToRelease); + localExecutionPlanner.releaseToFreeMemoryForOperators(bytesToRelease); reservedBytesInTotal -= bytesToRelease; } bytesToBeReleased = 0; @@ -85,10 +85,15 @@ public class NotThreadSafeMemoryReservationManager implements MemoryReservationM @Override public void releaseAllReservedMemory() { if (reservedBytesInTotal != 0) { - LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotal); + localExecutionPlanner.releaseToFreeMemoryForOperators(reservedBytesInTotal); reservedBytesInTotal = 0; bytesToBeReserved = 0; bytesToBeReleased = 0; } } + + @Override + public long getReservedMemory() { + return reservedBytesInTotal; + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java index 75467533b8e..f795e837ebd 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java @@ -946,4 +946,45 @@ public class AggregationDistributionTest { fragmentInstances.forEach( f -> verifyAggregationStep(expectedStep, f.getFragment().getPlanNodeTree())); } + + /* + * IdentitySinkNode-36 + * └──AggregationMergeSort-31 + * ├──DeviceView-21 + * │ └──ProjectNode-20 + * │ └──FullOuterTimeJoinNode-19 + * │ ├──SeriesAggregationScanNode-17:[SeriesPath: root.sg.d1.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)] + * │ └──SeriesAggregationScanNode-18:[SeriesPath: root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)] + * ├──ExchangeNode-32: [SourceAddress:192.0.3.1/test_memory_fe.2.0/34] + * └──ExchangeNode-33: [SourceAddress:192.0.2.1/test_memory_fe.3.0/35] + * + * IdentitySinkNode-34 + * └──DeviceView-25 + * └──FullOuterTimeJoinNode-24 + * ├──SeriesAggregationScanNode-22:[SeriesPath: root.sg.d22.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:3)] + * └──SeriesAggregationScanNode-23:[SeriesPath: root.sg.d22.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:3)] + * + * IdentitySinkNode-35 + * └──DeviceView-30 + * └──ProjectNode-29 + * └──FullOuterTimeJoinNode-28 + * ├──SeriesAggregationScanNode-26:[SeriesPath: root.sg.d1.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:2)] + * └──SeriesAggregationScanNode-27:[SeriesPath: root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:2)] + */ + @Test + public void feRamMemoryUsageTest() { + QueryId queryId = new QueryId("test_memory_fe"); + MPPQueryContext context = + new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint()); + String sql = "select count(s1) from root.sg.d1 align by device"; + Analysis analysis = Util.analyze(sql, context); + PlanNode logicalPlanNode = Util.genLogicalPlan(analysis, context); + DistributionPlanner planner = + new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode)); + DistributedQueryPlan plan = planner.planFragments(); + context.getMemoryReservationManager().reserveMemoryImmediately(); + System.out.println(context.getMemoryReservationManager().getReservedMemory()); + List<FragmentInstance> fragmentInstances = plan.getInstances(); + System.out.println("===="); + } }
