This is an automated email from the ASF dual-hosted git repository. zouxinyi pushed a commit to branch resource_ctx in repository https://gitbox.apache.org/repos/asf/doris.git
commit bc558cde8e21417c656df4639e292ef7aa3d1876 Author: yiguolei <[email protected]> AuthorDate: Mon Dec 2 20:25:55 2024 +0800 first pr for resource context --- be/src/runtime/workload_management/cpu_context.h | 61 ++++++++++++++ be/src/runtime/workload_management/io_context.h | 90 +++++++++++++++++++++ .../runtime/workload_management/memory_context.h | 93 +++++++++++++++++++++ .../runtime/workload_management/resource_context.h | 94 ++++++++++++++++++++++ 4 files changed, 338 insertions(+) diff --git a/be/src/runtime/workload_management/cpu_context.h b/be/src/runtime/workload_management/cpu_context.h new file mode 100644 index 00000000000..ba6681074eb --- /dev/null +++ b/be/src/runtime/workload_management/cpu_context.h @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> +#include <stdint.h> + +#include <atomic> +#include <memory> +#include <queue> +#include <shared_mutex> +#include <string> + +#include "common/status.h" + +namespace doris { + +class CPUContext : public std::enable_shared_from_this<CPUContext> { + ENABLE_FACTORY_CREATOR(CPUContext); + +public: + // Used to collect cpu execution stats. + // The stats is not thread safe. + // For example, you should use a seperate object for every scanner and do merge and reset + class CPUStats { + public: + // Should add some cpu stats relared method here. + void reset(); + void merge(CPUStats& stats); + std::string debug_string(); + }; + +public: + CPUContext() {} + virtual ~CPUContext() = default; + // Bind current thread to cgroup, only some load thread should do this. + void bind_workload_group() { + // Call workload group method to bind current thread to cgroup + } + CPUStats* cpu_stats() { return &stats_; } + +private: + CPUStats stats_; +}; + +} // namespace doris diff --git a/be/src/runtime/workload_management/io_context.h b/be/src/runtime/workload_management/io_context.h new file mode 100644 index 00000000000..9d34b1811db --- /dev/null +++ b/be/src/runtime/workload_management/io_context.h @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> +#include <stdint.h> + +#include <atomic> +#include <memory> +#include <queue> +#include <shared_mutex> +#include <string> + +#include "common/status.h" + +namespace doris { + +class IOContext : public std::enable_shared_from_this<IOContext> { + ENABLE_FACTORY_CREATOR(IOContext); + +public: + // Used to collect io execution stats. + class IOStats { + public: + IOStats() = default; + virtual ~IOStats() = default; + void merge(IOStats stats); + void reset(); + std::string debug_string(); + int64_t scan_rows() { return scan_rows_; } + int64_t scan_bytes() { return scan_bytes_; } + int64_t scan_bytes_from_local_storage() { return scan_bytes_from_local_storage_; } + int64_t scan_bytes_from_remote_storage() { return scan_bytes_from_remote_storage_; } + int64_t returned_rows() { return returned_rows_; } + int64_t shuffle_send_bytes() { return shuffle_send_bytes_; } + int64_t shuffle_send_rows() { return shuffle_send_rows_; } + + int64_t incr_scan_rows(int64_t delta) { return scan_rows_ + delta; } + int64_t incr_scan_bytes(int64_t delta) { return scan_bytes_ + delta; } + int64_t incr_scan_bytes_from_local_storage(int64_t delta) { + return scan_bytes_from_local_storage_ + delta; + } + int64_t incr_scan_bytes_from_remote_storage(int64_t delta) { + return scan_bytes_from_remote_storage_ + delta; + } + int64_t incr_returned_rows(int64_t delta) { return returned_rows_ + delta; } + int64_t incr_shuffle_send_bytes(int64_t delta) { return shuffle_send_bytes_ + delta; } + int64_t incr_shuffle_send_rows(int64_t delta) { return shuffle_send_rows_ + delta; } + std::string debug_string(); + + private: + int64_t scan_rows_ = 0; + int64_t scan_bytes_ = 0; + int64_t scan_bytes_from_local_storage_ = 0; + int64_t scan_bytes_from_remote_storage_ = 0; + // number rows returned by query. + // only set once by result sink when closing. + int64_t returned_rows_ = 0; + int64_t shuffle_send_bytes_ = 0; + int64_t shuffle_send_rows_ = 0; + }; + +public: + IOContext() {} + virtual ~IOContext() = default; + IOThrottle* io_throttle() { + // get io throttle from workload group + } + IOStats* stats() { return &stats_; } + +private: + IOStats stats_; +}; + +} // namespace doris diff --git a/be/src/runtime/workload_management/memory_context.h b/be/src/runtime/workload_management/memory_context.h new file mode 100644 index 00000000000..3e3a067e840 --- /dev/null +++ b/be/src/runtime/workload_management/memory_context.h @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> +#include <stdint.h> + +#include <atomic> +#include <memory> +#include <queue> +#include <shared_mutex> +#include <string> + +#include "common/status.h" + +namespace doris { + +class MemoryContext : public std::enable_shared_from_this<MemoryContext> { + ENABLE_FACTORY_CREATOR(MemoryContext); + +public: + // Used to collect memory execution stats. + // The stats class is not thread safe, should not do concurrent modifications. + class MemoryStats { + public: + MemoryStats() = default; + virtual ~MemoryStats() = default; + void merge(MemoryStats& stats); + void reset(); + std::string debug_string(); + int64_t revoke_attempts() { return revoke_attempts_; } + int64_t revoke_wait_time_ms() { return revoke_wait_time_ms_; } + int64_t revoked_bytes() { return revoked_bytes_; } + int64_t max_peak_memory_bytes() { return max_peak_memory_bytes_; } + int64_t current_used_memory_bytes() { return current_used_memory_bytes_; } + + private: + // Maximum memory peak for all backends. + // only set once by result sink when closing. + int64_t max_peak_memory_bytes_ = 0; + int64_t current_used_memory_bytes_ = 0; + // The total number of times that the revoke method is called. + int64_t revoke_attempts_ = 0; + // The time that waiting for revoke finished. + int64_t revoke_wait_time_ms_ = 0; + // The revoked bytes + int64_t revoked_bytes_ = 0; + }; + +public: + MemoryContext(std::shared_ptr<MemtrackerLimiter> memtracker) + : memtracker_limiter_(memtracker) {} + + virtual ~MemoryContext() = default; + + MemtrackerLimiter* memtracker_limiter() { return memtracker_limiter_.get(); } + + MemoryStats* stats() { return &stats_; } + + // Following method is related with spill disk. + // Compute the number of bytes could be released. + virtual int64_t revokable_bytes() { return 0; } + + virtual bool ready_do_revoke() { return true; } + + // Begin to do revoke memory task. + virtual Status revoke(int64_t bytes) { return Status::OK(); } + + virtual Status enter_arbitration(Status reason) { return Status::OK(); } + + virtual Status leave_arbitration(Status reason) { return Status::OK(); } + +private: + MemoryStats stats_; + std::shared_ptr<MemtrackerLimiter> memtracker_limiter_; +}; + +} // namespace doris diff --git a/be/src/runtime/workload_management/resource_context.h b/be/src/runtime/workload_management/resource_context.h new file mode 100644 index 00000000000..359564d4a7c --- /dev/null +++ b/be/src/runtime/workload_management/resource_context.h @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <stddef.h> +#include <stdint.h> + +#include <atomic> +#include <memory> +#include <queue> +#include <shared_mutex> +#include <string> + +#include "common/status.h" + +namespace doris { + +// Any task that allow cancel should implement this class. +class TaskController { + ENABLE_FACTORY_CREATOR(TaskController); + +public: + virtual Status cancel(Status cancel_reason) { return Status::OK(); } + virtual Status running_time(int64_t* running_time_msecs) { + *running_time_msecs = 0; + return Status::OK(); + } +}; + +// Every task should have its own resource context. And BE may adjust the resource +// context during running. +// Workload group will hold the resource context and do some control work. +class ResourceContext : public std::enable_shared_from_this<ResourceContext> { + ENABLE_FACTORY_CREATOR(ResourceContext); + +public: + ResourceContext() { + // These all default values, it may be reset. + cpu_context_ = std::make_shared<CPUContext>(); + memory_context_ = std::make_shared<MemoryContext>(); + io_context_ = std::make_shared<IOContext>(); + reclaimer_ = std::make_shared<ResourceReclaimer>(); + } + virtual ~ResourceContext() = default; + + // The caller should not hold the object, since it is a raw pointer. + CPUContext* cpu_context() { return cpu_context_.get(); } + MemoryContext* memory_context() { return memory_context_.get(); } + IOContext* io_context() { return io_context_.get(); } + TaskController* task_controller() { return task_controller_.get(); } + + void set_cpu_context(std::shared_ptr<CPUContext> cpu_context) { cpu_context_ = cpu_context; } + void set_memory_context(std::shared_ptr<MemoryContext> memory_context) { + memory_context_ = memory_context; + } + void set_io_context(std::shared_ptr<IOContext> io_context) { io_context_ = io_context; } + void set_task_controller(std::shared_ptr<TaskController> task_controller) { + task_controller_ = task_controller; + } + + void set_workload_group(std::shared_ptr<WorkloadGroup> wg) { + // update all child context's workload group property + workload_group_ = wg; + } + + std::shared_ptr<WorkloadGroup> workload_group() { return workload_group_.lock(); } + +private: + // The controller's init value is nullptr, it means the resource context will ignore this controller. + std::shared_ptr<CPUContext> cpu_context_ = nullptr; + std::shared_ptr<MemoryContext> memory_context_ = nullptr; + std::shared_ptr<IOContext> io_context_ = nullptr; + std::shared_ptr<TaskController> task_controller_ = nullptr; + // Workload group will own resource context, so that resource context only have weak ptr for workload group. + // TODO: should use atomic weak ptr to avoid the concurrent modification of the pointer. + std::weak_ptr<WorkloadGroup> workload_group_ = nullptr; +}; + +} // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
