This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 266da0869a2 branch-4.0: [refactor](scanner) remove scanner scheduler
class since it is already split into SimplifiedScheduler #58310 (#58346)
266da0869a2 is described below
commit 266da0869a2b25bceb3fccd92276bcbcd7cf83e5
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Nov 26 09:39:43 2025 +0800
branch-4.0: [refactor](scanner) remove scanner scheduler class since it is
already split into SimplifiedScheduler #58310 (#58346)
Cherry-picked from #58310
Co-authored-by: yiguolei <[email protected]>
---
be/src/exec/rowid_fetcher.cpp | 4 +-
be/src/runtime/exec_env.h | 3 -
be/src/runtime/exec_env_init.cpp | 5 --
be/src/runtime/query_context.h | 8 +--
be/src/runtime/workload_group/workload_group.cpp | 8 +--
be/src/runtime/workload_group/workload_group.h | 10 ++--
be/src/service/internal_service.cpp | 4 +-
be/src/vec/exec/scan/scanner_context.cpp | 5 +-
be/src/vec/exec/scan/scanner_context.h | 9 +--
be/src/vec/exec/scan/scanner_scheduler.cpp | 22 +------
be/src/vec/exec/scan/scanner_scheduler.h | 74 ++++++++++--------------
be/test/scan/mock_scanner_scheduler.h | 30 ----------
be/test/scan/scanner_context_test.cpp | 12 +---
13 files changed, 53 insertions(+), 141 deletions(-)
diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index 4e09c9ac36c..6134fa1fd25 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -831,8 +831,8 @@ Status RowIdStorageReader::read_batch_external_row(
workload_group_ids.emplace_back(workload_group_id);
auto wg =
ExecEnv::GetInstance()->workload_group_mgr()->get_group(workload_group_ids);
doris::pipeline::TaskScheduler* exec_sched = nullptr;
- vectorized::SimplifiedScanScheduler* scan_sched = nullptr;
- vectorized::SimplifiedScanScheduler* remote_scan_sched = nullptr;
+ vectorized::ScannerScheduler* scan_sched = nullptr;
+ vectorized::ScannerScheduler* remote_scan_sched = nullptr;
wg->get_query_scheduler(&exec_sched, &scan_sched, &remote_scan_sched);
DCHECK(remote_scan_sched);
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 7ce356520b7..6cd71f8e542 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -50,7 +50,6 @@ class MemoryPool;
namespace doris {
namespace vectorized {
class VDataStreamMgr;
-class ScannerScheduler;
class SpillStreamManager;
class DeltaWriterV2Pool;
class DictionaryFactory;
@@ -276,7 +275,6 @@ public:
StreamLoadExecutor* stream_load_executor() { return
_stream_load_executor.get(); }
RoutineLoadTaskExecutor* routine_load_task_executor() { return
_routine_load_task_executor; }
HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; }
- vectorized::ScannerScheduler* scanner_scheduler() { return
_scanner_scheduler; }
FileMetaCache* file_meta_cache() { return _file_meta_cache; }
MemTableMemoryLimiter* memtable_memory_limiter() { return
_memtable_memory_limiter.get(); }
WalManager* wal_mgr() { return _wal_manager.get(); }
@@ -483,7 +481,6 @@ private:
RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr;
SmallFileMgr* _small_file_mgr = nullptr;
HeartbeatFlags* _heartbeat_flags = nullptr;
- vectorized::ScannerScheduler* _scanner_scheduler = nullptr;
// To save meta info of external file, such as parquet footer.
FileMetaCache* _file_meta_cache = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index b21d63280fd..04d3f85dbce 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -300,7 +300,6 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
_init_runtime_filter_timer_queue();
_workload_group_manager = new WorkloadGroupMgr();
- _scanner_scheduler = new doris::vectorized::ScannerScheduler();
_fragment_mgr = new FragmentMgr(this);
_result_cache = new ResultCache(config::query_cache_max_size_mb,
@@ -354,7 +353,6 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
}
_broker_mgr->init();
static_cast<void>(_small_file_mgr->init());
- status = _scanner_scheduler->init(this);
if (!status.ok()) {
LOG(ERROR) << "Scanner scheduler init failed. " << status;
return status;
@@ -753,7 +751,6 @@ void ExecEnv::destroy() {
SAFE_STOP(_wal_manager);
_wal_manager.reset();
SAFE_STOP(_load_channel_mgr);
- SAFE_STOP(_scanner_scheduler);
SAFE_STOP(_broker_mgr);
SAFE_STOP(_load_path_mgr);
SAFE_STOP(_result_mgr);
@@ -813,8 +810,6 @@ void ExecEnv::destroy() {
SAFE_DELETE(_tablet_schema_cache);
SAFE_DELETE(_tablet_column_object_pool);
- // _scanner_scheduler must be desotried before _storage_page_cache
- SAFE_DELETE(_scanner_scheduler);
// _storage_page_cache must be destoried before _cache_manager
SAFE_DELETE(_storage_page_cache);
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index ccec66fd992..7e879e6221e 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -195,9 +195,9 @@ public:
TUniqueId query_id() const { return _query_id; }
- vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return
_scan_task_scheduler; }
+ vectorized::ScannerScheduler* get_scan_scheduler() { return
_scan_task_scheduler; }
- vectorized::SimplifiedScanScheduler* get_remote_scan_scheduler() {
+ vectorized::ScannerScheduler* get_remote_scan_scheduler() {
return _remote_scan_task_scheduler;
}
@@ -317,8 +317,8 @@ private:
AtomicStatus _exec_status;
doris::pipeline::TaskScheduler* _task_scheduler = nullptr;
- vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr;
- vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr;
+ vectorized::ScannerScheduler* _scan_task_scheduler = nullptr;
+ vectorized::ScannerScheduler* _remote_scan_task_scheduler = nullptr;
// This dependency indicates if the 2nd phase RPC received from FE.
std::unique_ptr<pipeline::Dependency> _execution_dependency;
// This dependency indicates if memory is sufficient to execute.
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index bcb1daf818e..406554c211f 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -550,7 +550,7 @@ Status
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
}
if (_scan_task_sched == nullptr) {
- std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler;
+ std::unique_ptr<vectorized::ScannerScheduler> scan_scheduler;
if (config::enable_task_executor_in_internal_table) {
scan_scheduler =
std::make_unique<vectorized::TaskExecutorSimplifiedScanScheduler>(
"ls_" + wg_name, cg_cpu_ctl_ptr, wg_name);
@@ -572,7 +572,7 @@ Status
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
if (_remote_scan_task_sched == nullptr) {
int remote_scan_thread_queue_size =
vectorized::ScannerScheduler::get_remote_scan_thread_queue_size();
- std::unique_ptr<vectorized::SimplifiedScanScheduler>
remote_scan_scheduler;
+ std::unique_ptr<vectorized::ScannerScheduler> remote_scan_scheduler;
if (config::enable_task_executor_in_external_table) {
remote_scan_scheduler =
std::make_unique<vectorized::TaskExecutorSimplifiedScanScheduler>(
@@ -646,8 +646,8 @@ Status
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* wg_info) {
}
void WorkloadGroup::get_query_scheduler(doris::pipeline::TaskScheduler**
exec_sched,
- vectorized::SimplifiedScanScheduler**
scan_sched,
- vectorized::SimplifiedScanScheduler**
remote_scan_sched) {
+ vectorized::ScannerScheduler**
scan_sched,
+ vectorized::ScannerScheduler**
remote_scan_sched) {
std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
*exec_sched = _task_sched.get();
*scan_sched = _scan_task_sched.get();
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index cfdbaa8b4b8..6a563cc9a68 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -45,7 +45,7 @@ class IOThrottle;
class ResourceContext;
namespace vectorized {
-class SimplifiedScanScheduler;
+class ScannerScheduler;
}
namespace pipeline {
@@ -167,8 +167,8 @@ public:
Status upsert_task_scheduler(WorkloadGroupInfo* tg_info);
virtual void get_query_scheduler(doris::pipeline::TaskScheduler**
exec_sched,
- vectorized::SimplifiedScanScheduler**
scan_sched,
- vectorized::SimplifiedScanScheduler**
remote_scan_sched);
+ vectorized::ScannerScheduler** scan_sched,
+ vectorized::ScannerScheduler**
remote_scan_sched);
void try_stop_schedulers();
@@ -250,8 +250,8 @@ private:
// so it should be shared ptr;
std::shared_ptr<CgroupCpuCtl> _cgroup_cpu_ctl {nullptr};
std::unique_ptr<doris::pipeline::TaskScheduler> _task_sched {nullptr};
- std::unique_ptr<vectorized::SimplifiedScanScheduler> _scan_task_sched
{nullptr};
- std::unique_ptr<vectorized::SimplifiedScanScheduler>
_remote_scan_task_sched {nullptr};
+ std::unique_ptr<vectorized::ScannerScheduler> _scan_task_sched {nullptr};
+ std::unique_ptr<vectorized::ScannerScheduler> _remote_scan_task_sched
{nullptr};
std::unique_ptr<ThreadPool> _memtable_flush_pool {nullptr};
std::map<std::string, std::shared_ptr<IOThrottle>> _scan_io_throttle_map;
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index fd67863cd46..03aef2669d5 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -2117,8 +2117,8 @@ void
PInternalService::multiget_data_v2(google::protobuf::RpcController* control
}
doris::pipeline::TaskScheduler* exec_sched = nullptr;
- vectorized::SimplifiedScanScheduler* scan_sched = nullptr;
- vectorized::SimplifiedScanScheduler* remote_scan_sched = nullptr;
+ vectorized::ScannerScheduler* scan_sched = nullptr;
+ vectorized::ScannerScheduler* remote_scan_sched = nullptr;
wg->get_query_scheduler(&exec_sched, &scan_sched, &remote_scan_sched);
DCHECK(remote_scan_sched);
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index e7b8ada5983..beaf1f5f5cd 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -66,7 +66,6 @@ ScannerContext::ScannerContext(
_output_row_descriptor(output_row_descriptor),
_batch_size(state->batch_size()),
limit(limit_),
- _scanner_scheduler_global(state->exec_env()->scanner_scheduler()),
_all_scanners(scanners.begin(), scanners.end()),
_parallism_of_scan_operator(parallism_of_scan_operator),
_min_scan_concurrency_of_scan_scheduler(_state->min_scan_concurrency_of_scan_scheduler()),
@@ -266,7 +265,7 @@ Status
ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task,
// if submit succeed, it will be also added back by
ScannerContext::push_back_scan_task
// see ScannerScheduler::_scanner_scan.
_num_scheduled_scanners++;
- return _scanner_scheduler_global->submit(shared_from_this(), scan_task);
+ return _scanner_scheduler->submit(shared_from_this(), scan_task);
}
void ScannerContext::clear_free_blocks() {
@@ -523,7 +522,7 @@ int32_t
ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock,
// This function must be called with:
// 1. _transfer_lock held.
-// 2. SimplifiedScanScheduler::_lock held.
+// 2. ScannerScheduler::_lock held.
Status ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask>
current_scan_task,
std::unique_lock<std::mutex>&
transfer_lock,
std::unique_lock<std::shared_mutex>&
scheduler_lock) {
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 1d93237e4c6..143a4f12dfd 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -56,7 +56,7 @@ namespace vectorized {
class Scanner;
class ScannerDelegate;
class ScannerScheduler;
-class SimplifiedScanScheduler;
+class ScannerScheduler;
class TaskExecutor;
class TaskHandle;
@@ -116,7 +116,7 @@ public:
class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
public HasTaskExecutionCtx {
ENABLE_FACTORY_CREATOR(ScannerContext);
- friend class SimplifiedScanScheduler;
+ friend class ScannerScheduler;
public:
ScannerContext(RuntimeState* state, pipeline::ScanLocalStateBase*
local_state,
@@ -165,8 +165,6 @@ public:
RuntimeState* state() { return _state; }
- SimplifiedScanScheduler* get_scan_scheduler() { return _scanner_scheduler;
}
-
void stop_scanners(RuntimeState* state);
int batch_size() const { return _batch_size; }
@@ -230,8 +228,7 @@ protected:
int64_t limit;
int64_t _max_bytes_in_queue = 0;
- doris::vectorized::ScannerScheduler* _scanner_scheduler_global = nullptr;
- SimplifiedScanScheduler* _scanner_scheduler = nullptr;
+ ScannerScheduler* _scanner_scheduler = nullptr;
// Using stack so that we can resubmit scanner in a LIFO order, maybe more
cache friendly
std::stack<std::shared_ptr<ScanTask>> _pending_scanners;
// Scanner that is submitted to the scheduler.
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 7381405ac95..0bd27114d72 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -52,25 +52,6 @@
namespace doris::vectorized {
-ScannerScheduler::ScannerScheduler() = default;
-
-ScannerScheduler::~ScannerScheduler() = default;
-
-void ScannerScheduler::stop() {
- if (!_is_init) {
- return;
- }
-
- _is_closed = true;
-
- LOG(INFO) << "ScannerScheduler stopped";
-}
-
-Status ScannerScheduler::init(ExecEnv* env) {
- _is_init = true;
- return Status::OK();
-}
-
Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
std::shared_ptr<ScanTask> scan_task) {
if (ctx->done()) {
@@ -90,7 +71,6 @@ Status
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
scanner_delegate->_scanner->start_wait_worker_timer();
TabletStorageType type = scanner_delegate->_scanner->get_storage_type();
auto sumbit_task = [&]() {
- SimplifiedScanScheduler* scan_sched = ctx->get_scan_scheduler();
auto work_func = [scanner_ref = scan_task, ctx]() {
auto status = [&] {
RETURN_IF_CATCH_EXCEPTION(_scanner_scan(ctx, scanner_ref));
@@ -105,7 +85,7 @@ Status
ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
return scanner_ref->is_eos();
};
SimplifiedScanTask simple_scan_task = {work_func, ctx, scan_task};
- return scan_sched->submit_scan_task(simple_scan_task);
+ return this->submit_scan_task(simple_scan_task);
};
Status submit_status = sumbit_task();
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index f4cc3be2acf..7dc374a9168 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -44,47 +44,7 @@ namespace doris::vectorized {
class ScannerDelegate;
class ScanTask;
class ScannerContext;
-class SimplifiedScanScheduler;
-
-// Responsible for the scheduling and execution of all Scanners of a BE node.
-// Execution thread pool
-// When a ScannerContext is launched, it will submit the running scanners
to this scheduler.
-// The scheduling thread will submit the running scanner and its
ScannerContext
-// to the execution thread pool to do the actual scan task.
-// Each Scanner will act as a producer, read the next block and put it into
-// the corresponding block queue.
-// The corresponding ScanNode will act as a consumer to consume blocks
from the block queue.
-// After the block is consumed, the unfinished scanner will resubmit to
this scheduler.
-class ScannerScheduler {
-public:
- ScannerScheduler();
- virtual ~ScannerScheduler();
-
- [[nodiscard]] Status init(ExecEnv* env);
-
- MOCK_FUNCTION Status submit(std::shared_ptr<ScannerContext> ctx,
- std::shared_ptr<ScanTask> scan_task);
-
- void stop();
-
- int remote_thread_pool_max_thread_num() const { return
_remote_thread_pool_max_thread_num; }
-
- static int get_remote_scan_thread_num();
-
- static int get_remote_scan_thread_queue_size();
-
-private:
- static void _scanner_scan(std::shared_ptr<ScannerContext> ctx,
- std::shared_ptr<ScanTask> scan_task);
-
- static void _make_sure_virtual_col_is_materialized(const
std::shared_ptr<Scanner>& scanner,
- vectorized::Block*
block);
-
- // true is the scheduler is closed.
- std::atomic_bool _is_closed = {false};
- bool _is_init = false;
- int _remote_thread_pool_max_thread_num;
-};
+class ScannerScheduler;
struct SimplifiedScanTask {
SimplifiedScanTask() = default;
@@ -131,9 +91,26 @@ private:
};
// Abstract interface for scan scheduler
-class SimplifiedScanScheduler {
+
+// Responsible for the scheduling and execution of all Scanners of a BE node.
+// Execution thread pool
+// When a ScannerContext is launched, it will submit the running scanners
to this scheduler.
+// The scheduling thread will submit the running scanner and its
ScannerContext
+// to the execution thread pool to do the actual scan task.
+// Each Scanner will act as a producer, read the next block and put it into
+// the corresponding block queue.
+// The corresponding ScanNode will act as a consumer to consume blocks
from the block queue.
+// After the block is consumed, the unfinished scanner will resubmit to
this scheduler.
+
+class ScannerScheduler {
public:
- virtual ~SimplifiedScanScheduler() {}
+ virtual ~ScannerScheduler() {}
+
+ Status submit(std::shared_ptr<ScannerContext> ctx,
std::shared_ptr<ScanTask> scan_task);
+
+ static int get_remote_scan_thread_num();
+
+ static int get_remote_scan_thread_queue_size();
virtual Status start(int max_thread_num, int min_thread_num, int
queue_size) = 0;
virtual void stop() = 0;
@@ -153,9 +130,16 @@ public:
virtual Status schedule_scan_task(std::shared_ptr<ScannerContext>
scanner_ctx,
std::shared_ptr<ScanTask>
current_scan_task,
std::unique_lock<std::mutex>&
transfer_lock) = 0;
+
+private:
+ static void _scanner_scan(std::shared_ptr<ScannerContext> ctx,
+ std::shared_ptr<ScanTask> scan_task);
+
+ static void _make_sure_virtual_col_is_materialized(const
std::shared_ptr<Scanner>& scanner,
+ vectorized::Block*
block);
};
-class ThreadPoolSimplifiedScanScheduler : public SimplifiedScanScheduler {
+class ThreadPoolSimplifiedScanScheduler : public ScannerScheduler {
public:
ThreadPoolSimplifiedScanScheduler(std::string sched_name,
std::shared_ptr<CgroupCpuCtl>
cgroup_cpu_ctl,
@@ -277,7 +261,7 @@ private:
std::shared_mutex _lock;
};
-class TaskExecutorSimplifiedScanScheduler : public SimplifiedScanScheduler {
+class TaskExecutorSimplifiedScanScheduler : public ScannerScheduler {
public:
TaskExecutorSimplifiedScanScheduler(std::string sched_name,
std::shared_ptr<CgroupCpuCtl>
cgroup_cpu_ctl,
diff --git a/be/test/scan/mock_scanner_scheduler.h
b/be/test/scan/mock_scanner_scheduler.h
deleted file mode 100644
index 2033a105b81..00000000000
--- a/be/test/scan/mock_scanner_scheduler.h
+++ /dev/null
@@ -1,30 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gmock/gmock.h>
-
-#include "common/status.h"
-#include "vec/exec/scan/scanner_scheduler.h"
-
-namespace doris::vectorized {
-class MockScannerScheduler : ScannerScheduler {
-public:
- MockScannerScheduler() = default;
-
- MOCK_METHOD2(submit, Status(std::shared_ptr<ScannerContext>,
std::shared_ptr<ScanTask>));
-};
-} // namespace doris::vectorized
diff --git a/be/test/scan/scanner_context_test.cpp
b/be/test/scan/scanner_context_test.cpp
index 4ff5db0ba46..5cf127b804d 100644
--- a/be/test/scan/scanner_context_test.cpp
+++ b/be/test/scan/scanner_context_test.cpp
@@ -29,7 +29,6 @@
#include <tuple>
#include "common/object_pool.h"
-#include "mock_scanner_scheduler.h"
#include "mock_simplified_scan_scheduler.h"
#include "pipeline/dependency.h"
#include "pipeline/exec/olap_scan_operator.h"
@@ -121,7 +120,7 @@ private:
std::shared_ptr<pipeline::Dependency> scan_dependency =
pipeline::Dependency::create_shared(0, 0, "TestScanDependency");
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl =
std::make_shared<CgroupV2CpuCtl>(1);
- std::unique_ptr<SimplifiedScanScheduler> scan_scheduler =
+ std::unique_ptr<ScannerScheduler> scan_scheduler =
std::make_unique<ThreadPoolSimplifiedScanScheduler>("ForTest",
cgroup_cpu_ctl);
};
@@ -545,12 +544,6 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
EXPECT_CALL(*scheduler,
get_active_threads()).WillRepeatedly(testing::Return(0));
EXPECT_CALL(*scheduler,
get_queue_size()).WillRepeatedly(testing::Return(0));
- std::unique_ptr<MockScannerScheduler> scanner_scheduler =
- std::make_unique<MockScannerScheduler>();
- EXPECT_CALL(*scanner_scheduler, submit(testing::_, testing::_))
- .WillRepeatedly(testing::Return(Status::OK()));
-
- scanner_context->_scanner_scheduler_global = scanner_scheduler.get();
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_max_scan_concurrency = 1;
scanner_context->_max_scan_concurrency = 1;
@@ -571,7 +564,6 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
- scanner_context->_scanner_scheduler_global = scanner_scheduler.get();
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_max_scan_concurrency = 100;
@@ -593,7 +585,6 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
- scanner_context->_scanner_scheduler_global = scanner_scheduler.get();
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_max_scan_concurrency = 1;
@@ -610,7 +601,6 @@ TEST_F(ScannerContextTest, schedule_scan_task) {
state.get(), olap_scan_local_state.get(), output_tuple_desc,
output_row_descriptor,
scanners, limit, scan_dependency, parallel_tasks);
- scanner_context->_scanner_scheduler_global = scanner_scheduler.get();
scanner_context->_scanner_scheduler = scheduler.get();
scanner_context->_max_scan_concurrency = 1;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]