This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new 53e95200ff [fix](broker-load) fix error when using multi data
description for same table in load stmt (#22666) (#22909)
53e95200ff is described below
commit 53e95200ff524f0a26b44032949915b7683960ff
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon Aug 14 22:47:03 2023 +0800
[fix](broker-load) fix error when using multi data description for same
table in load stmt (#22666) (#22909)
cherry pick #22666
---
be/src/vec/exec/scan/new_file_scan_node.cpp | 2 --
be/src/vec/exec/scan/scanner_context.cpp | 3 +--
be/src/vec/exec/scan/scanner_context.h | 7 +------
be/src/vec/exec/scan/vfile_scanner.cpp | 6 ++++++
be/src/vec/exec/scan/vfile_scanner.h | 7 +++++++
be/src/vec/exec/scan/vscan_node.cpp | 5 ++---
be/src/vec/exec/scan/vscan_node.h | 6 ------
be/src/vec/exec/scan/vscanner.cpp | 3 ---
be/src/vec/exec/scan/vscanner.h | 2 --
9 files changed, 17 insertions(+), 24 deletions(-)
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp
b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 59d3ca8453..be2a533242 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -63,8 +63,6 @@ void NewFileScanNode::set_scan_ranges(const
std::vector<TScanRangeParams>& scan_
LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " <<
_scan_ranges.size();
}
if (scan_ranges.size() > 0) {
- _input_tuple_id =
-
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params.src_tuple_id;
_output_tuple_id =
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params.dest_tuple_id;
}
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 81da875d53..7f7341e116 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -28,7 +28,6 @@
namespace doris::vectorized {
Status ScannerContext::init() {
- _real_tuple_desc = _input_tuple_desc != nullptr ? _input_tuple_desc :
_output_tuple_desc;
// 1. Calculate max concurrency
// TODO: now the max thread num <=
config::doris_scanner_thread_pool_thread_num / 4
// should find a more reasonable value.
@@ -93,7 +92,7 @@ vectorized::Block* ScannerContext::get_free_block(bool*
get_free_block) {
*get_free_block = false;
COUNTER_UPDATE(_parent->_newly_create_free_blocks_num, 1);
- return new vectorized::Block(_real_tuple_desc->slots(),
_state->batch_size());
+ return new vectorized::Block(_output_tuple_desc->slots(),
_state->batch_size());
}
void ScannerContext::return_free_block(vectorized::Block* block) {
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index f4c4a297d0..ce618f9457 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -46,12 +46,11 @@ class VScanNode;
// and submits the Scanners to the scanner thread pool for data scanning.
class ScannerContext {
public:
- ScannerContext(RuntimeState* state_, VScanNode* parent, const
TupleDescriptor* input_tuple_desc,
+ ScannerContext(RuntimeState* state_, VScanNode* parent,
const TupleDescriptor* output_tuple_desc, const
std::list<VScanner*>& scanners_,
int64_t limit_, int64_t max_bytes_in_blocks_queue_)
: _state(state_),
_parent(parent),
- _input_tuple_desc(input_tuple_desc),
_output_tuple_desc(output_tuple_desc),
_process_status(Status::OK()),
limit(limit_),
@@ -141,11 +140,7 @@ private:
VScanNode* _parent;
// the comment of same fields in VScanNode
- const TupleDescriptor* _input_tuple_desc;
const TupleDescriptor* _output_tuple_desc;
- // If _input_tuple_desc is not null, _real_tuple_desc point to
_input_tuple_desc,
- // otherwise, _real_tuple_desc point to _output_tuple_desc
- const TupleDescriptor* _real_tuple_desc;
// _transfer_lock is used to protect the critical section
// where the ScanNode and ScannerScheduler interact.
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 78d291f358..395de439bb 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -56,6 +56,12 @@ VFileScanner::VFileScanner(RuntimeState* state,
NewFileScanNode* parent, int64_t
if (scan_range.params.__isset.strict_mode) {
_strict_mode = scan_range.params.strict_mode;
}
+
+ // For load scanner, there are input and output tuple.
+ // For query scanner, there is only output tuple
+ _input_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_params.src_tuple_id);
+ _real_tuple_desc = _input_tuple_desc == nullptr ? _output_tuple_desc :
_input_tuple_desc;
+ _is_load = (_input_tuple_desc != nullptr);
}
Status VFileScanner::prepare(
diff --git a/be/src/vec/exec/scan/vfile_scanner.h
b/be/src/vec/exec/scan/vfile_scanner.h
index 506908486d..71e526c378 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -127,6 +127,13 @@ private:
RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
RuntimeProfile::Counter* _file_counter = nullptr;
+ // Only for load scan node.
+ const TupleDescriptor* _input_tuple_desc = nullptr;
+ // If _input_tuple_desc is set,
+ // the _real_tuple_desc will point to _input_tuple_desc,
+ // otherwise, point to _output_tuple_desc
+ const TupleDescriptor* _real_tuple_desc = nullptr;
+
private:
Status _init_expr_ctxes();
Status _init_src_block(Block* block);
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index eabdfc7dcb..5b44c13e7f 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -81,7 +81,6 @@ Status VScanNode::prepare(RuntimeState* state) {
}
Status VScanNode::open(RuntimeState* state) {
- _input_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_input_tuple_id);
_output_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VScanNode::open");
SCOPED_TIMER(_runtime_profile->total_time_counter());
@@ -185,8 +184,8 @@ Status VScanNode::_init_profile() {
}
Status VScanNode::_start_scanners(const std::list<VScanner*>& scanners) {
- _scanner_ctx.reset(new ScannerContext(_state, this, _input_tuple_desc,
_output_tuple_desc,
- scanners, limit(),
_state->scan_queue_mem_limit()));
+ _scanner_ctx.reset(new ScannerContext(_state, this, _output_tuple_desc,
scanners, limit(),
+ _state->scan_queue_mem_limit()));
RETURN_IF_ERROR(_scanner_ctx->init());
RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
return Status::OK();
diff --git a/be/src/vec/exec/scan/vscan_node.h
b/be/src/vec/exec/scan/vscan_node.h
index 0653b55ead..d6be5adbe3 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -80,9 +80,7 @@ public:
int runtime_filter_num() const { return (int)_runtime_filter_ctxs.size(); }
- TupleId input_tuple_id() const { return _input_tuple_id; }
TupleId output_tuple_id() const { return _output_tuple_id; }
- const TupleDescriptor* input_tuple_desc() const { return
_input_tuple_desc; }
const TupleDescriptor* output_tuple_desc() const { return
_output_tuple_desc; }
enum class PushDownType {
@@ -159,11 +157,7 @@ protected:
protected:
RuntimeState* _state;
- // For load scan node, there should be both input and output tuple
descriptor.
- // For query scan node, there is only output_tuple_desc.
- TupleId _input_tuple_id = -1;
TupleId _output_tuple_id = -1;
- const TupleDescriptor* _input_tuple_desc = nullptr;
const TupleDescriptor* _output_tuple_desc = nullptr;
// These two values are from query_options
diff --git a/be/src/vec/exec/scan/vscanner.cpp
b/be/src/vec/exec/scan/vscanner.cpp
index 8679f37f89..fe938eb626 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -26,11 +26,8 @@ VScanner::VScanner(RuntimeState* state, VScanNode* parent,
int64_t limit, Runtim
_parent(parent),
_limit(limit),
_profile(profile),
- _input_tuple_desc(parent->input_tuple_desc()),
_output_tuple_desc(parent->output_tuple_desc()) {
- _real_tuple_desc = _input_tuple_desc != nullptr ? _input_tuple_desc :
_output_tuple_desc;
_total_rf_num = _parent->runtime_filter_num();
- _is_load = (_input_tuple_desc != nullptr);
}
Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) {
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 8920f29921..3910e78896 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -135,9 +135,7 @@ protected:
RuntimeProfile* _profile;
- const TupleDescriptor* _input_tuple_desc = nullptr;
const TupleDescriptor* _output_tuple_desc = nullptr;
- const TupleDescriptor* _real_tuple_desc = nullptr;
// If _input_tuple_desc is set, the scanner will read data into
// this _input_block first, then convert to the output block.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]