yiguolei commented on code in PR #30746:
URL: https://github.com/apache/doris/pull/30746#discussion_r1477222377
##########
be/src/vec/exec/scan/scanner_context.cpp:
##########
@@ -31,64 +29,56 @@
#include "common/status.h"
#include "pipeline/exec/scan_operator.h"
#include "runtime/descriptors.h"
-#include "runtime/exec_env.h"
-#include "runtime/query_context.h"
#include "runtime/runtime_state.h"
-#include "util/pretty_printer.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
-#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/exec/scan/vscan_node.h"
-#include "vec/exec/scan/vscanner.h"
namespace doris::vectorized {
using namespace std::chrono_literals;
-static bvar::Status<int64_t>
g_bytes_in_scanner_queue("doris_bytes_in_scanner_queue", 0);
-static bvar::Status<int64_t>
g_num_running_scanners("doris_num_running_scanners", 0);
-
ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor*
output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const
std::list<std::shared_ptr<ScannerDelegate>>& scanners,
int64_t limit_, int64_t
max_bytes_in_blocks_queue,
const int num_parallel_instances,
- pipeline::ScanLocalStateBase* local_state,
- std::shared_ptr<pipeline::ScanDependency>
dependency)
+ pipeline::ScanLocalStateBase* local_state)
: HasTaskExecutionCtx(state),
_state(state),
- _parent(nullptr),
_local_state(local_state),
_output_tuple_desc(output_row_descriptor
?
output_row_descriptor->tuple_descriptors().front()
: output_tuple_desc),
_output_row_descriptor(output_row_descriptor),
- _process_status(Status::OK()),
_batch_size(state->batch_size()),
limit(limit_),
_max_bytes_in_queue(std::max(max_bytes_in_blocks_queue,
(int64_t)1024) *
num_parallel_instances),
_scanner_scheduler(state->exec_env()->scanner_scheduler()),
- _scanners(scanners.begin(), scanners.end()),
_all_scanners(scanners.begin(), scanners.end()),
- _num_parallel_instances(num_parallel_instances),
- _dependency(dependency) {
+ _num_parallel_instances(num_parallel_instances) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
_query_id = _state->get_query_ctx()->query_id();
ctx_id = UniqueId::gen_uid().to_string();
- if (_scanners.empty()) {
+ // Provide more memory for wide tables, increase proportionally by
multiples of 300
+ _max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
+ if (scanners.empty()) {
_is_finished = true;
_set_scanner_done();
}
+ _scanners.enqueue_bulk(scanners.begin(), scanners.size());
if (limit < 0) {
limit = -1;
+ } else if (limit < _batch_size) {
+ _batch_size = limit;
Review Comment:
remove this logic, keep the same as original since we do not know the actual
reason.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]