Gabriel39 commented on code in PR #61271:
URL: https://github.com/apache/doris/pull/61271#discussion_r3014007704


##########
be/src/exec/scan/scanner_context.h:
##########
@@ -52,32 +52,147 @@ class Dependency;
 class Scanner;
 class ScannerDelegate;
 class ScannerScheduler;
-class ScannerScheduler;
 class TaskExecutor;
 class TaskHandle;
+struct MemLimiter;
+
+// Query-level memory arbitrator that distributes memory fairly across all 
scan contexts
+struct MemShareArbitrator {
+    ENABLE_FACTORY_CREATOR(MemShareArbitrator)
+    TUniqueId query_id;
+    int64_t query_mem_limit = 0;
+    int64_t mem_limit = 0;
+    std::atomic<int64_t> total_mem_bytes = 0;
+
+    MemShareArbitrator(const TUniqueId& qid, int64_t query_mem_limit, double 
max_scan_ratio);
+
+    // Update memory allocation when scanner memory usage changes
+    // Returns new scan memory limit for this context
+    int64_t update_mem_bytes(int64_t old_value, int64_t new_value);
+    void register_scan_node();
+    std::string debug_string() const {
+        return fmt::format("query_id: {}, query_mem_limit: {}, mem_limit: {}", 
print_id(query_id),
+                           query_mem_limit, mem_limit);
+    }
+};
+
+// Scan-context-level memory limiter that controls scanner concurrency based 
on memory
+struct MemLimiter {
+private:
+    TUniqueId query_id;
+    mutable std::mutex lock;
+    // Parallelism of the scan operator
+    const int64_t parallelism = 0;
+    const bool serial_operator = false;
+    const int64_t operator_mem_limit;
+    std::atomic<int64_t> running_tasks_count = 0;
+
+    std::atomic<int64_t> estimated_block_mem_bytes = 0;
+    int64_t estimated_block_mem_bytes_update_count = 0;
+    int64_t arb_mem_bytes = 0;
+    std::atomic<int64_t> open_tasks_count = 0;
+
+    // Memory limit for this scan node (shared by all instances), updated by 
memory share arbitrator
+    std::atomic<int64_t> mem_limit = 0;
+
+public:
+    ENABLE_FACTORY_CREATOR(MemLimiter)
+    MemLimiter(const TUniqueId& qid, int64_t parallelism, bool 
serial_operator_, int64_t mem_limit)
+            : query_id(qid),
+              parallelism(parallelism),
+              serial_operator(serial_operator_),
+              operator_mem_limit(mem_limit) {}
+    ~MemLimiter() { DCHECK_EQ(open_tasks_count, 0); }
+

Review Comment:
   `open_tasks_count ` should be less or equal to 0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to