This is an automated email from the ASF dual-hosted git repository.

zouxinyi pushed a commit to branch resource_ctx
in repository https://gitbox.apache.org/repos/asf/doris.git

commit bc558cde8e21417c656df4639e292ef7aa3d1876
Author: yiguolei <[email protected]>
AuthorDate: Mon Dec 2 20:25:55 2024 +0800

    first pr for resource context
---
 be/src/runtime/workload_management/cpu_context.h   | 61 ++++++++++++++
 be/src/runtime/workload_management/io_context.h    | 90 +++++++++++++++++++++
 .../runtime/workload_management/memory_context.h   | 93 +++++++++++++++++++++
 .../runtime/workload_management/resource_context.h | 94 ++++++++++++++++++++++
 4 files changed, 338 insertions(+)

diff --git a/be/src/runtime/workload_management/cpu_context.h 
b/be/src/runtime/workload_management/cpu_context.h
new file mode 100644
index 00000000000..ba6681074eb
--- /dev/null
+++ b/be/src/runtime/workload_management/cpu_context.h
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+#include <memory>
+#include <queue>
+#include <shared_mutex>
+#include <string>
+
+#include "common/status.h"
+
+namespace doris {
+
+class CPUContext : public std::enable_shared_from_this<CPUContext> {
+    ENABLE_FACTORY_CREATOR(CPUContext);
+
+public:
+    // Used to collect cpu execution stats.
+    // The stats is not thread safe.
+    // For example, you should use a seperate object for every scanner and do 
merge and reset
+    class CPUStats {
+    public:
+        // Should add some cpu stats relared method here.
+        void reset();
+        void merge(CPUStats& stats);
+        std::string debug_string();
+    };
+
+public:
+    CPUContext() {}
+    virtual ~CPUContext() = default;
+    // Bind current thread to cgroup, only some load thread should do this.
+    void bind_workload_group() {
+        // Call workload group method to bind current thread to cgroup
+    }
+    CPUStats* cpu_stats() { return &stats_; }
+
+private:
+    CPUStats stats_;
+};
+
+} // namespace doris
diff --git a/be/src/runtime/workload_management/io_context.h 
b/be/src/runtime/workload_management/io_context.h
new file mode 100644
index 00000000000..9d34b1811db
--- /dev/null
+++ b/be/src/runtime/workload_management/io_context.h
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+#include <memory>
+#include <queue>
+#include <shared_mutex>
+#include <string>
+
+#include "common/status.h"
+
+namespace doris {
+
+class IOContext : public std::enable_shared_from_this<IOContext> {
+    ENABLE_FACTORY_CREATOR(IOContext);
+
+public:
+    // Used to collect io execution stats.
+    class IOStats {
+    public:
+        IOStats() = default;
+        virtual ~IOStats() = default;
+        void merge(IOStats stats);
+        void reset();
+        std::string debug_string();
+        int64_t scan_rows() { return scan_rows_; }
+        int64_t scan_bytes() { return scan_bytes_; }
+        int64_t scan_bytes_from_local_storage() { return 
scan_bytes_from_local_storage_; }
+        int64_t scan_bytes_from_remote_storage() { return 
scan_bytes_from_remote_storage_; }
+        int64_t returned_rows() { return returned_rows_; }
+        int64_t shuffle_send_bytes() { return shuffle_send_bytes_; }
+        int64_t shuffle_send_rows() { return shuffle_send_rows_; }
+
+        int64_t incr_scan_rows(int64_t delta) { return scan_rows_ + delta; }
+        int64_t incr_scan_bytes(int64_t delta) { return scan_bytes_ + delta; }
+        int64_t incr_scan_bytes_from_local_storage(int64_t delta) {
+            return scan_bytes_from_local_storage_ + delta;
+        }
+        int64_t incr_scan_bytes_from_remote_storage(int64_t delta) {
+            return scan_bytes_from_remote_storage_ + delta;
+        }
+        int64_t incr_returned_rows(int64_t delta) { return returned_rows_ + 
delta; }
+        int64_t incr_shuffle_send_bytes(int64_t delta) { return 
shuffle_send_bytes_ + delta; }
+        int64_t incr_shuffle_send_rows(int64_t delta) { return 
shuffle_send_rows_ + delta; }
+        std::string debug_string();
+
+    private:
+        int64_t scan_rows_ = 0;
+        int64_t scan_bytes_ = 0;
+        int64_t scan_bytes_from_local_storage_ = 0;
+        int64_t scan_bytes_from_remote_storage_ = 0;
+        // number rows returned by query.
+        // only set once by result sink when closing.
+        int64_t returned_rows_ = 0;
+        int64_t shuffle_send_bytes_ = 0;
+        int64_t shuffle_send_rows_ = 0;
+    };
+
+public:
+    IOContext() {}
+    virtual ~IOContext() = default;
+    IOThrottle* io_throttle() {
+        // get io throttle from workload group
+    }
+    IOStats* stats() { return &stats_; }
+
+private:
+    IOStats stats_;
+};
+
+} // namespace doris
diff --git a/be/src/runtime/workload_management/memory_context.h 
b/be/src/runtime/workload_management/memory_context.h
new file mode 100644
index 00000000000..3e3a067e840
--- /dev/null
+++ b/be/src/runtime/workload_management/memory_context.h
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+#include <memory>
+#include <queue>
+#include <shared_mutex>
+#include <string>
+
+#include "common/status.h"
+
+namespace doris {
+
+class MemoryContext : public std::enable_shared_from_this<MemoryContext> {
+    ENABLE_FACTORY_CREATOR(MemoryContext);
+
+public:
+    // Used to collect memory execution stats.
+    // The stats class is not thread safe, should not do concurrent 
modifications.
+    class MemoryStats {
+    public:
+        MemoryStats() = default;
+        virtual ~MemoryStats() = default;
+        void merge(MemoryStats& stats);
+        void reset();
+        std::string debug_string();
+        int64_t revoke_attempts() { return revoke_attempts_; }
+        int64_t revoke_wait_time_ms() { return revoke_wait_time_ms_; }
+        int64_t revoked_bytes() { return revoked_bytes_; }
+        int64_t max_peak_memory_bytes() { return max_peak_memory_bytes_; }
+        int64_t current_used_memory_bytes() { return 
current_used_memory_bytes_; }
+
+    private:
+        // Maximum memory peak for all backends.
+        // only set once by result sink when closing.
+        int64_t max_peak_memory_bytes_ = 0;
+        int64_t current_used_memory_bytes_ = 0;
+        // The total number of times that the revoke method is called.
+        int64_t revoke_attempts_ = 0;
+        // The time that waiting for revoke finished.
+        int64_t revoke_wait_time_ms_ = 0;
+        // The revoked bytes
+        int64_t revoked_bytes_ = 0;
+    };
+
+public:
+    MemoryContext(std::shared_ptr<MemtrackerLimiter> memtracker)
+            : memtracker_limiter_(memtracker) {}
+
+    virtual ~MemoryContext() = default;
+
+    MemtrackerLimiter* memtracker_limiter() { return 
memtracker_limiter_.get(); }
+
+    MemoryStats* stats() { return &stats_; }
+
+    // Following method is related with spill disk.
+    // Compute the number of bytes could be released.
+    virtual int64_t revokable_bytes() { return 0; }
+
+    virtual bool ready_do_revoke() { return true; }
+
+    // Begin to do revoke memory task.
+    virtual Status revoke(int64_t bytes) { return Status::OK(); }
+
+    virtual Status enter_arbitration(Status reason) { return Status::OK(); }
+
+    virtual Status leave_arbitration(Status reason) { return Status::OK(); }
+
+private:
+    MemoryStats stats_;
+    std::shared_ptr<MemtrackerLimiter> memtracker_limiter_;
+};
+
+} // namespace doris
diff --git a/be/src/runtime/workload_management/resource_context.h 
b/be/src/runtime/workload_management/resource_context.h
new file mode 100644
index 00000000000..359564d4a7c
--- /dev/null
+++ b/be/src/runtime/workload_management/resource_context.h
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+#include <memory>
+#include <queue>
+#include <shared_mutex>
+#include <string>
+
+#include "common/status.h"
+
+namespace doris {
+
+// Any task that allow cancel should implement this class.
+class TaskController {
+    ENABLE_FACTORY_CREATOR(TaskController);
+
+public:
+    virtual Status cancel(Status cancel_reason) { return Status::OK(); }
+    virtual Status running_time(int64_t* running_time_msecs) {
+        *running_time_msecs = 0;
+        return Status::OK();
+    }
+};
+
+// Every task should have its own resource context. And BE may adjust the 
resource
+// context during running.
+// Workload group will hold the resource context and do some control work.
+class ResourceContext : public std::enable_shared_from_this<ResourceContext> {
+    ENABLE_FACTORY_CREATOR(ResourceContext);
+
+public:
+    ResourceContext() {
+        // These all default values, it may be reset.
+        cpu_context_ = std::make_shared<CPUContext>();
+        memory_context_ = std::make_shared<MemoryContext>();
+        io_context_ = std::make_shared<IOContext>();
+        reclaimer_ = std::make_shared<ResourceReclaimer>();
+    }
+    virtual ~ResourceContext() = default;
+
+    // The caller should not hold the object, since it is a raw pointer.
+    CPUContext* cpu_context() { return cpu_context_.get(); }
+    MemoryContext* memory_context() { return memory_context_.get(); }
+    IOContext* io_context() { return io_context_.get(); }
+    TaskController* task_controller() { return task_controller_.get(); }
+
+    void set_cpu_context(std::shared_ptr<CPUContext> cpu_context) { 
cpu_context_ = cpu_context; }
+    void set_memory_context(std::shared_ptr<MemoryContext> memory_context) {
+        memory_context_ = memory_context;
+    }
+    void set_io_context(std::shared_ptr<IOContext> io_context) { io_context_ = 
io_context; }
+    void set_task_controller(std::shared_ptr<TaskController> task_controller) {
+        task_controller_ = task_controller;
+    }
+
+    void set_workload_group(std::shared_ptr<WorkloadGroup> wg) {
+        // update all child context's workload group property
+        workload_group_ = wg;
+    }
+
+    std::shared_ptr<WorkloadGroup> workload_group() { return 
workload_group_.lock(); }
+
+private:
+    // The controller's init value is nullptr, it means the resource context 
will ignore this controller.
+    std::shared_ptr<CPUContext> cpu_context_ = nullptr;
+    std::shared_ptr<MemoryContext> memory_context_ = nullptr;
+    std::shared_ptr<IOContext> io_context_ = nullptr;
+    std::shared_ptr<TaskController> task_controller_ = nullptr;
+    // Workload group will own resource context, so that resource context only 
have weak ptr for workload group.
+    // TODO: should use atomic weak ptr to avoid the concurrent modification 
of the pointer.
+    std::weak_ptr<WorkloadGroup> workload_group_ = nullptr;
+};
+
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to