This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/add_ram_usage_ut
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a2423c62ec266476726f0f53a5719f9757c61b08
Author: Beyyes <[email protected]>
AuthorDate: Mon Dec 9 10:43:19 2024 +0800

    add fe ram usage ut
---
 .../db/queryengine/common/MPPQueryContext.java     |  4 +++
 .../iotdb/db/queryengine/plan/Coordinator.java     |  2 +-
 .../planner/memory/MemoryReservationManager.java   |  5 +++
 .../NotThreadSafeMemoryReservationManager.java     | 13 ++++---
 .../distribution/AggregationDistributionTest.java  | 41 ++++++++++++++++++++++
 5 files changed, 60 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index f96f4e6b4f4..ee41cfc3c48 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -344,6 +344,10 @@ public class MPPQueryContext {
     this.memoryReservationManager.releaseMemoryCumulatively(bytes);
   }
 
+  public MemoryReservationManager getMemoryReservationManager() {
+    return this.memoryReservationManager;
+  }
+
   // endregion
 
   public boolean isTableQuery() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index c3896cd60fd..0904d34ebc7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -201,7 +201,7 @@ public class Coordinator {
       }
       execution.start();
       ExecutionResult result = execution.getStatus();
-      if (!execution.isQuery() && result.status != null && 
needRetry(result.status)) {
+      if (!execution.isQuery() && needRetry(result.status)) {
         // if it's write request and the result status needs to retry
         result.status.setNeedRetry(true);
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java
index ba55d60b096..2661c21d9e1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java
@@ -43,4 +43,9 @@ public interface MemoryReservationManager {
    * this manager ends, Or the memory to be released in the batch may not be 
released correctly.
    */
   void releaseAllReservedMemory();
+
+  /**
+   * @return the total revered memory
+   */
+  long getReservedMemory();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
index 7d8d4b076c2..4b5561a962c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
@@ -30,7 +30,7 @@ public class NotThreadSafeMemoryReservationManager implements 
MemoryReservationM
   // bound for each batch.
   private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L;
 
-  private final LocalExecutionPlanner LOCAL_EXECUTION_PLANNER = 
LocalExecutionPlanner.getInstance();
+  private final LocalExecutionPlanner localExecutionPlanner = 
LocalExecutionPlanner.getInstance();
 
   private final QueryId queryId;
 
@@ -58,7 +58,7 @@ public class NotThreadSafeMemoryReservationManager implements 
MemoryReservationM
   @Override
   public void reserveMemoryImmediately() {
     if (bytesToBeReserved != 0) {
-      LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators(
+      localExecutionPlanner.reserveFromFreeMemoryForOperators(
           bytesToBeReserved, reservedBytesInTotal, queryId.getId(), 
contextHolder);
       reservedBytesInTotal += bytesToBeReserved;
       bytesToBeReserved = 0;
@@ -75,7 +75,7 @@ public class NotThreadSafeMemoryReservationManager implements 
MemoryReservationM
       } else {
         bytesToRelease = bytesToBeReleased - bytesToBeReserved;
         bytesToBeReserved = 0;
-        
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(bytesToRelease);
+        localExecutionPlanner.releaseToFreeMemoryForOperators(bytesToRelease);
         reservedBytesInTotal -= bytesToRelease;
       }
       bytesToBeReleased = 0;
@@ -85,10 +85,15 @@ public class NotThreadSafeMemoryReservationManager 
implements MemoryReservationM
   @Override
   public void releaseAllReservedMemory() {
     if (reservedBytesInTotal != 0) {
-      
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotal);
+      
localExecutionPlanner.releaseToFreeMemoryForOperators(reservedBytesInTotal);
       reservedBytesInTotal = 0;
       bytesToBeReserved = 0;
       bytesToBeReleased = 0;
     }
   }
+
+  @Override
+  public long getReservedMemory() {
+    return reservedBytesInTotal;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
index 75467533b8e..f795e837ebd 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationDistributionTest.java
@@ -946,4 +946,45 @@ public class AggregationDistributionTest {
     fragmentInstances.forEach(
         f -> verifyAggregationStep(expectedStep, 
f.getFragment().getPlanNodeTree()));
   }
+
+  /*
+   * IdentitySinkNode-36
+   *   └──AggregationMergeSort-31
+   *       ├──DeviceView-21
+   *       │   └──ProjectNode-20
+   *       │       └──FullOuterTimeJoinNode-19
+   *       │           ├──SeriesAggregationScanNode-17:[SeriesPath: 
root.sg.d1.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:1)]
+   *       │           └──SeriesAggregationScanNode-18:[SeriesPath: 
root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:1)]
+   *       ├──ExchangeNode-32: [SourceAddress:192.0.3.1/test_memory_fe.2.0/34]
+   *       └──ExchangeNode-33: [SourceAddress:192.0.2.1/test_memory_fe.3.0/35]
+   *
+   *  IdentitySinkNode-34
+   *   └──DeviceView-25
+   *       └──FullOuterTimeJoinNode-24
+   *           ├──SeriesAggregationScanNode-22:[SeriesPath: root.sg.d22.s1, 
Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:3)]
+   *           └──SeriesAggregationScanNode-23:[SeriesPath: root.sg.d22.s2, 
Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:3)]
+   *
+   *  IdentitySinkNode-35
+   *   └──DeviceView-30
+   *       └──ProjectNode-29
+   *           └──FullOuterTimeJoinNode-28
+   *               ├──SeriesAggregationScanNode-26:[SeriesPath: root.sg.d1.s2, 
Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:2)]
+   *               └──SeriesAggregationScanNode-27:[SeriesPath: root.sg.d1.s1, 
Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: 
TConsensusGroupId(type:DataRegion, id:2)]
+   */
+  @Test
+  public void feRamMemoryUsageTest() {
+    QueryId queryId = new QueryId("test_memory_fe");
+    MPPQueryContext context =
+        new MPPQueryContext("", queryId, null, new TEndPoint(), new 
TEndPoint());
+    String sql = "select count(s1) from root.sg.d1 align by device";
+    Analysis analysis = Util.analyze(sql, context);
+    PlanNode logicalPlanNode = Util.genLogicalPlan(analysis, context);
+    DistributionPlanner planner =
+        new DistributionPlanner(analysis, new LogicalQueryPlan(context, 
logicalPlanNode));
+    DistributedQueryPlan plan = planner.planFragments();
+    context.getMemoryReservationManager().reserveMemoryImmediately();
+    
System.out.println(context.getMemoryReservationManager().getReservedMemory());
+    List<FragmentInstance> fragmentInstances = plan.getInstances();
+    System.out.println("====");
+  }
 }

Reply via email to