This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3235b636cc [refactor](remove unused code) remove thread pool manager
(#16179)
3235b636cc is described below
commit 3235b636cc7caff5e3ad126c36a4f12748cd4426
Author: yiguolei <[email protected]>
AuthorDate: Sun Jan 29 13:03:08 2023 +0800
[refactor](remove unused code) remove thread pool manager (#16179)
* remove thread resource manager
* remove string buffer
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/olap/memtable.h | 4 -
be/src/runtime/CMakeLists.txt | 1 -
be/src/runtime/exec_env.h | 3 -
be/src/runtime/exec_env_init.cpp | 3 -
be/src/runtime/runtime_state.h | 6 -
be/src/runtime/string_buffer.hpp | 108 ---------
be/src/runtime/thread_resource_mgr.cpp | 107 ---------
be/src/runtime/thread_resource_mgr.h | 247 ---------------------
be/src/util/CMakeLists.txt | 1 -
be/src/util/tuple_row_zorder_compare.cpp | 28 ---
be/src/util/tuple_row_zorder_compare.h | 34 ---
be/test/CMakeLists.txt | 2 -
be/test/http/stream_load_test.cpp | 4 -
be/test/runtime/external_scan_context_mgr_test.cpp | 4 -
be/test/runtime/string_buffer_test.cpp | 74 ------
be/test/runtime/test_env.cc | 2 -
be/test/runtime/thread_resource_mgr_test.cpp | 66 ------
be/test/vec/exec/vtablet_sink_test.cpp | 3 -
18 files changed, 697 deletions(-)
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 80bcb26c0c..0cd504ca3f 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -24,7 +24,6 @@
#include "olap/skiplist.h"
#include "olap/tablet.h"
#include "runtime/memory/mem_tracker.h"
-#include "util/tuple_row_zorder_compare.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
@@ -117,9 +116,6 @@ private:
Schema* _schema;
const TabletSchema* _tablet_schema;
- // TODO: change to unique_ptr of comparator
- std::shared_ptr<RowComparator> _row_comparator;
-
std::shared_ptr<RowInBlockComparator> _vec_row_comparator;
// `_insert_manual_mem_tracker` manually records the memory value of
memtable insert()
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 7dc25bda7a..ada78fc9d0 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -42,7 +42,6 @@ set(RUNTIME_FILES
runtime_predicate.cpp
jsonb_value.cpp
thread_context.cpp
- thread_resource_mgr.cpp
threadlocal.cc
decimalv2_value.cpp
large_int_value.cpp
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index f90cb98ba6..c6123d7fbe 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -58,7 +58,6 @@ class ResultBufferMgr;
class ResultQueueMgr;
class TMasterInfo;
class LoadChannelMgr;
-class ThreadResourceMgr;
class TmpFileMgr;
class WebPageHandler;
class StreamLoadExecutor;
@@ -124,7 +123,6 @@ public:
std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker() { return
_orphan_mem_tracker; }
MemTrackerLimiter* orphan_mem_tracker_raw() { return
_orphan_mem_tracker_raw; }
MemTrackerLimiter* experimental_mem_tracker() { return
_experimental_mem_tracker.get(); }
- ThreadResourceMgr* thread_mgr() { return _thread_mgr; }
ThreadPool* send_batch_thread_pool() { return
_send_batch_thread_pool.get(); }
ThreadPool* download_cache_thread_pool() { return
_download_cache_thread_pool.get(); }
void set_serial_download_cache_thread_token() {
@@ -204,7 +202,6 @@ private:
ClientCache<BackendServiceClient>* _backend_client_cache = nullptr;
ClientCache<FrontendServiceClient>* _frontend_client_cache = nullptr;
ClientCache<TPaloBrokerServiceClient>* _broker_client_cache = nullptr;
- ThreadResourceMgr* _thread_mgr = nullptr;
// The default tracker consumed by mem hook. If the thread does not attach
other trackers,
// by default all consumption will be passed to the process tracker
through the orphan tracker.
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 0764082e08..ef88739bf4 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -46,7 +46,6 @@
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_executor.h"
-#include "runtime/thread_resource_mgr.h"
#include "runtime/tmp_file_mgr.h"
#include "util/bfd_parser.h"
#include "util/brpc_client_cache.h"
@@ -95,7 +94,6 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths) {
_backend_client_cache = new
BackendServiceClientCache(config::max_client_cache_size_per_host);
_frontend_client_cache = new
FrontendServiceClientCache(config::max_client_cache_size_per_host);
_broker_client_cache = new
BrokerServiceClientCache(config::max_client_cache_size_per_host);
- _thread_mgr = new ThreadResourceMgr();
ThreadPoolBuilder("SendBatchThreadPool")
.set_min_threads(config::send_batch_thread_pool_thread_num)
@@ -313,7 +311,6 @@ void ExecEnv::_destroy() {
SAFE_DELETE(_fragment_mgr);
SAFE_DELETE(_pipeline_task_scheduler);
SAFE_DELETE(_cgroups_mgr);
- SAFE_DELETE(_thread_mgr);
SAFE_DELETE(_broker_client_cache);
SAFE_DELETE(_frontend_client_cache);
SAFE_DELETE(_backend_client_cache);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 83dae14d50..30608333ae 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -28,7 +28,6 @@
#include "gen_cpp/PaloInternalService_types.h" // for TQueryOptions
#include "gen_cpp/Types_types.h" // for TUniqueId
#include "runtime/query_fragments_ctx.h"
-#include "runtime/thread_resource_mgr.h"
#include "util/runtime_profile.h"
#include "util/telemetry/telemetry.h"
@@ -105,7 +104,6 @@ public:
const TUniqueId& fragment_instance_id() const { return
_fragment_instance_id; }
ExecEnv* exec_env() { return _exec_env; }
std::shared_ptr<MemTrackerLimiter> query_mem_tracker() { return
_query_mem_tracker; }
- ThreadResourceMgr::ResourcePool* resource_pool() { return _resource_pool; }
void set_fragment_root_id(PlanNodeId id) {
DCHECK(_root_node_id == -1) << "Should not set this twice.";
@@ -468,10 +466,6 @@ private:
TQueryOptions _query_options;
ExecEnv* _exec_env = nullptr;
- // Thread resource management object for this fragment's execution. The
runtime
- // state is responsible for returning this pool to the thread mgr.
- ThreadResourceMgr::ResourcePool* _resource_pool;
-
// if true, execution should stop with a CANCELLED status
std::atomic<bool> _is_cancelled;
diff --git a/be/src/runtime/string_buffer.hpp b/be/src/runtime/string_buffer.hpp
deleted file mode 100644
index e54473d4e9..0000000000
--- a/be/src/runtime/string_buffer.hpp
+++ /dev/null
@@ -1,108 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "runtime/mem_pool.h"
-#include "vec/common/string_ref.h"
-
-namespace doris {
-
-// Dynamic-sizable string (similar to std::string) but without as many
-// copies and allocations.
-// StringBuffer wraps a StringRef object with a pool and memory buffer length.
-// It supports a subset of the std::string functionality but will only allocate
-// bigger string buffers as necessary. std::string tries to be immutable and
will
-// reallocate very often. std::string should be avoided in all hot paths.
-class StringBuffer {
-public:
- // C'tor for StringBuffer. Memory backing the string will be allocated
from
- // the pool as necessary. Can optionally be initialized from a StringRef.
- StringBuffer(MemPool* pool, StringRef* str) : _pool(pool), _buffer_size(0)
{
- if (str != NULL) {
- _string_value = *str;
- _buffer_size = str->size;
- }
- }
-
- StringBuffer(MemPool* pool) : _pool(pool), _buffer_size(0) {}
-
- virtual ~StringBuffer() {}
-
- // append 'str' to the current string, allocating a new buffer as
necessary.
- void append(const char* str, int len) {
- int new_len = len + _string_value.size;
-
- if (new_len > _buffer_size) {
- grow_buffer(new_len);
- }
-
- memcpy(const_cast<char*>(_string_value.data) + _string_value.size,
str, len);
- _string_value.size = new_len;
- }
-
- // TODO: switch everything to uint8_t?
- void append(const uint8_t* str, int len) { append(reinterpret_cast<const
char*>(str), len); }
-
- // Assigns contents to StringBuffer
- void assign(const char* str, int len) {
- clear();
- append(str, len);
- }
-
- // clear the underlying StringRef. The allocated buffer can be reused.
- void clear() { _string_value.size = 0; }
-
- // Clears the underlying buffer and StringRef
- void reset() {
- _string_value.size = 0;
- _buffer_size = 0;
- }
-
- // Returns whether the current string is empty
- bool empty() const { return _string_value.size == 0; }
-
- // Returns the length of the current string
- int size() const { return _string_value.size; }
-
- // Returns the underlying StringRef
- const StringRef& str() const { return _string_value; }
-
- // Returns the buffer size
- int buffer_size() const { return _buffer_size; }
-
-private:
- // Grows the buffer backing the string to be at least new_size, copying
- // over the previous string data into the new buffer.
- // TODO: some kind of doubling strategy?
- void grow_buffer(int new_len) {
- char* new_buffer = reinterpret_cast<char*>(_pool->allocate(new_len));
-
- if (_string_value.size > 0) {
- memcpy(new_buffer, _string_value.data, _string_value.size);
- }
-
- _string_value.data = new_buffer;
- _buffer_size = new_len;
- }
-
- MemPool* _pool;
- StringRef _string_value;
- int _buffer_size;
-};
-
-} // namespace doris
diff --git a/be/src/runtime/thread_resource_mgr.cpp
b/be/src/runtime/thread_resource_mgr.cpp
deleted file mode 100644
index fcf9e47068..0000000000
--- a/be/src/runtime/thread_resource_mgr.cpp
+++ /dev/null
@@ -1,107 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/thread_resource_mgr.h"
-
-#include <vector>
-
-#include "common/config.h"
-#include "common/logging.h"
-#include "util/cpu_info.h"
-
-namespace doris {
-
-ThreadResourceMgr::ThreadResourceMgr(int threads_quota) {
- DCHECK_GE(threads_quota, 0);
-
- if (threads_quota == 0) {
- _system_threads_quota = CpuInfo::num_cores() *
config::num_threads_per_core;
- } else {
- _system_threads_quota = threads_quota;
- }
-
- _per_pool_quota = 0;
-}
-
-ThreadResourceMgr::ThreadResourceMgr() {
- _system_threads_quota = CpuInfo::num_cores() *
config::num_threads_per_core;
- _per_pool_quota = 0;
-}
-
-ThreadResourceMgr::~ThreadResourceMgr() {
- for (auto pool : _free_pool_objs) {
- delete pool;
- }
- for (auto pool : _pools) {
- delete pool;
- }
-}
-
-ThreadResourceMgr::ResourcePool::ResourcePool(ThreadResourceMgr* parent) :
_parent(parent) {}
-
-void ThreadResourceMgr::ResourcePool::reset() {
- _num_threads = 0;
- _num_reserved_optional_threads = 0;
- _max_quota = INT_MAX;
-}
-
-void ThreadResourceMgr::ResourcePool::reserve_optional_tokens(int num) {
- DCHECK_GE(num, 0);
- _num_reserved_optional_threads = num;
-}
-
-ThreadResourceMgr::ResourcePool* ThreadResourceMgr::register_pool() {
- std::unique_lock<std::mutex> l(_lock);
- ResourcePool* pool = nullptr;
-
- if (_free_pool_objs.empty()) {
- pool = new ResourcePool(this);
- } else {
- pool = _free_pool_objs.front();
- _free_pool_objs.pop_front();
- }
-
- DCHECK(pool != nullptr);
- DCHECK(_pools.find(pool) == _pools.end());
- _pools.insert(pool);
- pool->reset();
-
- // Added a new pool, update the quotas for each pool.
- update_pool_quotas();
- return pool;
-}
-
-void ThreadResourceMgr::unregister_pool(ResourcePool* pool) {
- DCHECK(pool != nullptr);
- std::unique_lock<std::mutex> l(_lock);
- // this may be double unregistered after pr #3326 by LaiYingChun, so check
if the pool is already unregisted
- if (_pools.find(pool) != _pools.end()) {
- _pools.erase(pool);
- _free_pool_objs.push_back(pool);
- update_pool_quotas();
- }
-}
-
-void ThreadResourceMgr::update_pool_quotas() {
- if (_pools.empty()) {
- return;
- }
-
- _per_pool_quota = ceil(static_cast<double>(_system_threads_quota) /
_pools.size());
-}
-
-} // namespace doris
diff --git a/be/src/runtime/thread_resource_mgr.h
b/be/src/runtime/thread_resource_mgr.h
deleted file mode 100644
index b48a49ecaf..0000000000
--- a/be/src/runtime/thread_resource_mgr.h
+++ /dev/null
@@ -1,247 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stdlib.h>
-
-#include <functional>
-#include <list>
-#include <mutex>
-#include <thread>
-
-#include "common/status.h"
-
-namespace doris {
-
-// Singleton object to manage CPU (aka thread) resources for the process.
-// Conceptually, there is a fixed pool of threads that are shared between
-// query fragments. If there is only one fragment running, it can use the
-// entire pool, spinning up the maximum number of threads to saturate the
-// hardware. If there are multiple fragments, the CPU pool must be shared
-// between them. Currently, the total system pool is split evenly between
-// all consumers. Each consumer gets ceil(total_system_threads /
num_consumers).
-//
-// Each fragment must register with the ThreadResourceMgr to request threads
-// (in the form of tokens). The fragment has required threads (it can't run
-// with fewer threads) and optional threads. If the fragment is running on its
-// own, it will be able to spin up more optional threads. When the system
-// is under load, the ThreadResourceMgr will stop giving out tokens for
optional
-// threads.
-// Pools should not use this for threads that are almost always idle (e.g.
-// periodic reporting threads).
-// Pools will temporarily go over the quota regularly and this is very
-// much by design. For example, if a pool is running on its own with
-// 4 required threads and 28 optional and another pool is added to the
-// system, the first pool's quota is then cut by half (16 total) and will
-// over time drop the optional threads.
-// This class is thread safe.
-// TODO: this is an initial simple version to improve the behavior with
-// concurrency. This will need to be expanded post GA. These include:
-// - More places where threads are optional (e.g. hash table build side,
-// data stream threads, etc).
-// - Admission control
-// - Integration with other nodes/statestore
-// - Priorities for different pools
-// If both the mgr and pool locks need to be taken, the mgr lock must
-// be taken first.
-class ThreadResourceMgr {
-public:
- class ResourcePool;
-
- // This function will be called whenever the pool has more threads it can
run on.
- // This can happen on ReleaseThreadToken or if the quota for this pool
increases.
- // This is a good place, for example, to wake up anything blocked on
available threads.
- // This callback must not block.
- // Note that this is not called once for each available thread or even
guaranteed that
- // when it is called, a thread is available (the quota could have changed
again in
- // between). It is simply that something might have happened (similar to
condition
- // variable semantics).
- // TODO: this is manageable now since it just needs to call into the io
- // mgr. What's the best model for something more general.
- typedef std::function<void(ResourcePool*)> thread_available_cb;
-
- // Pool abstraction for a single resource pool.
- // TODO: this is not quite sufficient going forward. We need a hierarchy
of pools,
- // one for the entire query, and a sub pool for each component that needs
threads,
- // all of which share a quota. Currently, the way state is tracked here,
it would
- // be impossible to have two components both want optional threads (e.g.
two things
- // that have 1+ thread usage).
- class ResourcePool {
- public:
- virtual ~ResourcePool() {};
- // Acquire a thread for the pool. This will always succeed; the
- // pool will go over the quota.
- // Pools should use this API to reserve threads they need in order
- // to make progress.
- void acquire_thread_token();
-
- // Try to acquire a thread for this pool. If the pool is at
- // the quota, this will return false and the pool should not run.
- // Pools should use this API for resources they can use but don't
- // need (e.g. scanner threads).
- bool try_acquire_thread_token();
-
- // Set a reserved optional number of threads for this pool. This can
be
- // used to implement that a component needs n+ number of threads. The
- // first 'num' threads are guaranteed to be acquirable (via
try_acquire_thread_token)
- // but anything beyond can fail.
- // This can also be done with:
- // if (pool->num_optional_threads() < num) acquire_thread_token();
- // else try_acquire_thread_token();
- // and similar tracking on the Release side but this is common enough
to
- // abstract it away.
- void reserve_optional_tokens(int num);
-
- // Release a thread for the pool. This must be called once for
- // each call to acquire_thread_token and each successful call to
try_acquire_thread_token
- // If the thread token is from acquire_thread_token, required must be
true; false
- // if from try_acquire_thread_token.
- // Must not be called from from thread_available_cb.
- void release_thread_token(bool required);
-
- // Returns the number of threads that are from acquire_thread_token.
- int num_required_threads() const { return _num_threads & 0xFFFFFFFF; }
-
- // Returns the number of thread resources returned by successful calls
- // to try_acquire_thread_token.
- int num_optional_threads() const { return _num_threads >> 32; }
-
- // Returns the total number of thread resources for this pool
- // (i.e. num_optional_threads + num_required_threads).
- int64_t num_threads() const { return num_required_threads() +
num_optional_threads(); }
-
- // Returns the number of optional threads that can still be used.
- int num_available_threads() const {
- int value = std::max(quota() - static_cast<int>(num_threads()),
- _num_reserved_optional_threads -
num_optional_threads());
- return std::max(0, value);
- }
-
- // Returns the quota for this pool. Note this changes dynamically
- // based on system load.
- int quota() const { return std::min(_max_quota,
_parent->_per_pool_quota); }
-
- // Sets the max thread quota for this pool. This is only used for
testing since
- // the dynamic values should be used normally. The actual quota is
the min of this
- // value and the dynamic quota.
- void set_max_quota(int quota) { _max_quota = quota; }
-
- private:
- friend class ThreadResourceMgr;
-
- ResourcePool(ThreadResourceMgr* parent);
-
- // Resets internal state.
- void reset();
-
- ThreadResourceMgr* _parent;
-
- int _max_quota;
- int _num_reserved_optional_threads;
-
- // A single 64 bit value to store both the number of optional and
- // required threads. This is combined to allow using compare and
- // swap operations. The number of required threads is the lower
- // 32 bits and the number of optional threads is the upper 32 bits.
- int64_t _num_threads;
- };
-
- // Create a thread mgr object. If threads_quota is non-zero, it will be
- // the number of threads for the system, otherwise it will be determined
- // based on the hardware.
- ThreadResourceMgr(int threads_quota);
- ThreadResourceMgr();
- ~ThreadResourceMgr();
-
- int system_threads_quota() const { return _system_threads_quota; }
-
- // Register a new pool with the thread mgr. Registering a pool
- // will update the quotas for all existing pools.
- ResourcePool* register_pool();
-
- // Unregisters the pool. 'pool' is no longer valid after this.
- // This updates the quotas for the remaining pools.
- void unregister_pool(ResourcePool* pool);
-
-private:
- // 'Optimal' number of threads for the entire process.
- int _system_threads_quota;
-
- // Lock for the entire object. Protects all fields below.
- std::mutex _lock;
-
- // Pools currently being managed
- typedef std::set<ResourcePool*> Pools;
- Pools _pools;
-
- // Each pool currently gets the same share. This is the ceil of the
- // system quota divided by the number of pools.
- int _per_pool_quota;
-
- // Recycled list of pool objects
- std::list<ResourcePool*> _free_pool_objs;
-
- void update_pool_quotas();
-};
-
-inline void ThreadResourceMgr::ResourcePool::acquire_thread_token() {
- __sync_fetch_and_add(&_num_threads, 1);
-}
-
-inline bool ThreadResourceMgr::ResourcePool::try_acquire_thread_token() {
- while (true) {
- int64_t previous_num_threads = _num_threads;
- int64_t new_optional_threads = (previous_num_threads >> 32) + 1;
- int64_t new_required_threads = previous_num_threads & 0xFFFFFFFF;
-
- if (new_optional_threads > _num_reserved_optional_threads &&
- new_optional_threads + new_required_threads > quota()) {
- return false;
- }
-
- int64_t new_value = new_optional_threads << 32 | new_required_threads;
-
- // Atomically swap the new value if no one updated _num_threads. We
do not
- // not care about the ABA problem here.
- if (__sync_bool_compare_and_swap(&_num_threads, previous_num_threads,
new_value)) {
- return true;
- }
- }
-}
-
-inline void ThreadResourceMgr::ResourcePool::release_thread_token(bool
required) {
- if (required) {
- DCHECK_GT(num_required_threads(), 0);
- __sync_fetch_and_add(&_num_threads, -1);
- } else {
- DCHECK_GT(num_optional_threads(), 0);
-
- while (true) {
- int64_t previous_num_threads = _num_threads;
- int64_t new_optional_threads = (previous_num_threads >> 32) - 1;
- int64_t new_required_threads = previous_num_threads & 0xFFFFFFFF;
- int64_t new_value = new_optional_threads << 32 |
new_required_threads;
-
- if (__sync_bool_compare_and_swap(&_num_threads,
previous_num_threads, new_value)) {
- break;
- }
- }
- }
-}
-
-} // namespace doris
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index 85ffc54135..33ca9a5e49 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -104,7 +104,6 @@ set(UTIL_FILES
hdfs_util.cpp
time_lut.cpp
cityhash102/city.cc
- tuple_row_zorder_compare.cpp
telemetry/telemetry.cpp
telemetry/brpc_carrier.cpp
telemetry/open_telemetry_scop_wrapper.hpp
diff --git a/be/src/util/tuple_row_zorder_compare.cpp
b/be/src/util/tuple_row_zorder_compare.cpp
deleted file mode 100644
index 25bb270eeb..0000000000
--- a/be/src/util/tuple_row_zorder_compare.cpp
+++ /dev/null
@@ -1,28 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "util/tuple_row_zorder_compare.h"
-
-namespace doris {
-
-RowComparator::RowComparator(Schema* schema) {}
-
-int RowComparator::operator()(const char* left, const char* right) const {
- return -1;
-}
-
-} // namespace doris
diff --git a/be/src/util/tuple_row_zorder_compare.h
b/be/src/util/tuple_row_zorder_compare.h
deleted file mode 100644
index 39775a3527..0000000000
--- a/be/src/util/tuple_row_zorder_compare.h
+++ /dev/null
@@ -1,34 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "olap/row_cursor.h"
-#include "olap/schema.h"
-#include "runtime/descriptors.h"
-#include "runtime/raw_value.h"
-#include "runtime/tuple.h"
-
-namespace doris {
-class RowComparator {
-public:
- RowComparator() = default;
- RowComparator(Schema* schema);
- virtual ~RowComparator() = default;
- virtual int operator()(const char* left, const char* right) const;
-};
-} // namespace doris
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index e1134269b0..f11962297e 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -139,10 +139,8 @@ set(RUNTIME_TEST_FILES
# runtime/datetime_value_test.cpp
# runtime/dpp_sink_test.cpp
# runtime/tmp_file_mgr_test.cpp
- # runtime/thread_resource_mgr_test.cpp
# runtime/export_task_mgr_test.cpp
runtime/mem_pool_test.cpp
- runtime/string_buffer_test.cpp
runtime/decimalv2_value_test.cpp
runtime/large_int_value_test.cpp
runtime/string_value_test.cpp
diff --git a/be/test/http/stream_load_test.cpp
b/be/test/http/stream_load_test.cpp
index 587bbfbd20..1609eedf80 100644
--- a/be/test/http/stream_load_test.cpp
+++ b/be/test/http/stream_load_test.cpp
@@ -29,7 +29,6 @@
#include "runtime/exec_env.h"
#include "runtime/stream_load/load_stream_mgr.h"
#include "runtime/stream_load/stream_load_executor.h"
-#include "runtime/thread_resource_mgr.h"
#include "util/brpc_client_cache.h"
#include "util/cpu_info.h"
@@ -71,7 +70,6 @@ public:
k_response_str = "";
config::streaming_load_max_mb = 1;
- _env._thread_mgr = new ThreadResourceMgr();
_env._master_info = new TMasterInfo();
_env._load_stream_mgr = new LoadStreamMgr();
_env._internal_client_cache = new
BrpcClientCache<PBackendService_Stub>();
@@ -89,8 +87,6 @@ public:
_env._load_stream_mgr = nullptr;
delete _env._master_info;
_env._master_info = nullptr;
- delete _env._thread_mgr;
- _env._thread_mgr = nullptr;
delete _env._stream_load_executor;
_env._stream_load_executor = nullptr;
diff --git a/be/test/runtime/external_scan_context_mgr_test.cpp
b/be/test/runtime/external_scan_context_mgr_test.cpp
index a9d1660f55..29a045234b 100644
--- a/be/test/runtime/external_scan_context_mgr_test.cpp
+++ b/be/test/runtime/external_scan_context_mgr_test.cpp
@@ -25,7 +25,6 @@
#include "common/status.h"
#include "runtime/fragment_mgr.h"
#include "runtime/result_queue_mgr.h"
-#include "runtime/thread_resource_mgr.h"
namespace doris {
@@ -33,15 +32,12 @@ class ExternalScanContextMgrTest : public testing::Test {
public:
ExternalScanContextMgrTest() {
FragmentMgr* fragment_mgr = new FragmentMgr(&_exec_env);
- ThreadResourceMgr* thread_mgr = new ThreadResourceMgr();
ResultQueueMgr* result_queue_mgr = new ResultQueueMgr();
_exec_env._fragment_mgr = fragment_mgr;
- _exec_env._thread_mgr = thread_mgr;
_exec_env._result_queue_mgr = result_queue_mgr;
}
virtual ~ExternalScanContextMgrTest() {
delete _exec_env._fragment_mgr;
- delete _exec_env._thread_mgr;
delete _exec_env._result_queue_mgr;
}
diff --git a/be/test/runtime/string_buffer_test.cpp
b/be/test/runtime/string_buffer_test.cpp
deleted file mode 100644
index 2e8d7a1b33..0000000000
--- a/be/test/runtime/string_buffer_test.cpp
+++ /dev/null
@@ -1,74 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/string_buffer.hpp"
-
-#include <gtest/gtest.h>
-
-#include <string>
-
-#include "runtime/mem_pool.h"
-
-namespace doris {
-
-void validate_string(const std::string& std_str, const StringBuffer& str) {
- EXPECT_EQ(std_str.empty(), str.empty());
- EXPECT_EQ((int)std_str.size(), str.size());
-
- if (std_str.size() > 0) {
- EXPECT_EQ(strncmp(std_str.c_str(), str.str().data, std_str.size()), 0);
- }
-}
-
-TEST(StringBufferTest, Basic) {
- MemPool pool;
- StringBuffer str(&pool);
- std::string std_str;
-
- // Empty string
- validate_string(std_str, str);
-
- // Clear empty string
- std_str.clear();
- str.clear();
- validate_string(std_str, str);
-
- // Append to empty
- std_str.append("Hello");
- str.append("Hello", strlen("Hello"));
- validate_string(std_str, str);
-
- // Append some more
- std_str.append("World");
- str.append("World", strlen("World"));
- validate_string(std_str, str);
-
- // Assign
- std_str.assign("foo");
- str.assign("foo", strlen("foo"));
- validate_string(std_str, str);
-
- // Clear
- std_str.clear();
- str.clear();
- validate_string(std_str, str);
-
- // Underlying buffer size should be the length of the max string during
the test.
- EXPECT_EQ(str.buffer_size(), strlen("HelloWorld"));
-}
-
-} // namespace doris
diff --git a/be/test/runtime/test_env.cc b/be/test/runtime/test_env.cc
index f33b6b5238..4551dfb330 100644
--- a/be/test/runtime/test_env.cc
+++ b/be/test/runtime/test_env.cc
@@ -32,7 +32,6 @@ namespace doris {
TestEnv::TestEnv() {
// Some code will use ExecEnv::GetInstance(), so init the global ExecEnv
singleton
_exec_env = ExecEnv::GetInstance();
- _exec_env->_thread_mgr = new ThreadResourceMgr(2);
_exec_env->_result_queue_mgr = new ResultQueueMgr();
// TODO may need rpc support, etc.
}
@@ -49,7 +48,6 @@ void TestEnv::init_tmp_file_mgr(const
std::vector<std::string>& tmp_dirs, bool o
TestEnv::~TestEnv() {
SAFE_DELETE(_exec_env->_result_queue_mgr);
- SAFE_DELETE(_exec_env->_thread_mgr);
if (_engine == StorageEngine::_s_instance) {
// the engine instance is created by this test env
diff --git a/be/test/runtime/thread_resource_mgr_test.cpp
b/be/test/runtime/thread_resource_mgr_test.cpp
deleted file mode 100644
index 21951a8105..0000000000
--- a/be/test/runtime/thread_resource_mgr_test.cpp
+++ /dev/null
@@ -1,66 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/thread_resource_mgr.h"
-
-#include <gtest/gtest.h>
-
-#include <functional>
-#include <string>
-
-#include "util/cpu_info.h"
-
-namespace doris {
-
-TEST(ThreadResourceMgr, BasicTest) {
- ThreadResourceMgr mgr(5);
-
- ThreadResourceMgr::ResourcePool* c1 = mgr.register_pool();
- c1->acquire_thread_token();
- c1->acquire_thread_token();
- c1->acquire_thread_token();
- EXPECT_EQ(c1->num_threads(), 3);
- EXPECT_EQ(c1->num_required_threads(), 3);
- EXPECT_EQ(c1->num_optional_threads(), 0);
- c1->release_thread_token(true);
- EXPECT_EQ(c1->num_threads(), 2);
- EXPECT_EQ(c1->num_required_threads(), 2);
- EXPECT_EQ(c1->num_optional_threads(), 0);
- EXPECT_TRUE(c1->try_acquire_thread_token());
- EXPECT_TRUE(c1->try_acquire_thread_token());
- EXPECT_TRUE(c1->try_acquire_thread_token());
- EXPECT_FALSE(c1->try_acquire_thread_token());
- EXPECT_EQ(c1->num_threads(), 5);
- EXPECT_EQ(c1->num_required_threads(), 2);
- EXPECT_EQ(c1->num_optional_threads(), 3);
- c1->release_thread_token(true);
- c1->release_thread_token(false);
-
- // Register a new consumer, quota is cut in half
- ThreadResourceMgr::ResourcePool* c2 = mgr.register_pool();
- EXPECT_FALSE(c1->try_acquire_thread_token());
- EXPECT_EQ(c1->num_threads(), 3);
- c1->acquire_thread_token();
- EXPECT_EQ(c1->num_threads(), 4);
- EXPECT_EQ(c1->num_required_threads(), 2);
- EXPECT_EQ(c1->num_optional_threads(), 2);
-
- mgr.unregister_pool(c1);
- mgr.unregister_pool(c2);
-}
-
-} // namespace doris
diff --git a/be/test/vec/exec/vtablet_sink_test.cpp
b/be/test/vec/exec/vtablet_sink_test.cpp
index 52706f57fa..05fca396dc 100644
--- a/be/test/vec/exec/vtablet_sink_test.cpp
+++ b/be/test/vec/exec/vtablet_sink_test.cpp
@@ -31,7 +31,6 @@
#include "runtime/result_queue_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/stream_load/load_stream_mgr.h"
-#include "runtime/thread_resource_mgr.h"
#include "runtime/types.h"
#include "service/brpc.h"
#include "util/brpc_client_cache.h"
@@ -344,7 +343,6 @@ public:
void SetUp() override {
k_add_batch_status = Status::OK();
_env = ExecEnv::GetInstance();
- _env->_thread_mgr = new ThreadResourceMgr();
_env->_master_info = new TMasterInfo();
_env->_load_stream_mgr = new LoadStreamMgr();
_env->_internal_client_cache = new
BrpcClientCache<PBackendService_Stub>();
@@ -363,7 +361,6 @@ public:
SAFE_DELETE(_env->_function_client_cache);
SAFE_DELETE(_env->_load_stream_mgr);
SAFE_DELETE(_env->_master_info);
- SAFE_DELETE(_env->_thread_mgr);
if (_server) {
_server->Stop(100);
_server->Join();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]