This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c75e63a2a5c [Improvement](scan) Use scanner to do projection of scan
node (#29124)
c75e63a2a5c is described below
commit c75e63a2a5c7a1caad7e55befeb7987db8d1ef13
Author: Gabriel <[email protected]>
AuthorDate: Wed Dec 27 16:00:52 2023 +0800
[Improvement](scan) Use scanner to do projection of scan node (#29124)
---
be/src/exec/exec_node.h | 2 +-
be/src/pipeline/exec/scan_operator.cpp | 8 +--
be/src/pipeline/exec/scan_operator.h | 4 ++
be/src/pipeline/pipeline_x/operator.cpp | 10 ++--
be/src/pipeline/pipeline_x/operator.h | 10 +++-
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 2 +-
be/src/vec/exec/scan/pip_scanner_context.h | 11 +++--
be/src/vec/exec/scan/scanner_context.cpp | 16 +++++-
be/src/vec/exec/scan/scanner_context.h | 3 ++
be/src/vec/exec/scan/scanner_scheduler.cpp | 2 +-
be/src/vec/exec/scan/vscan_node.cpp | 7 +--
be/src/vec/exec/scan/vscan_node.h | 14 ++++++
be/src/vec/exec/scan/vscanner.cpp | 68 +++++++++++++++++++++++++-
be/src/vec/exec/scan/vscanner.h | 6 +++
14 files changed, 138 insertions(+), 25 deletions(-)
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index eeed37907f9..69f7771767f 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -103,7 +103,7 @@ public:
// TODO: AggregationNode and HashJoinNode cannot be "re-opened" yet.
[[nodiscard]] virtual Status get_next(RuntimeState* state,
vectorized::Block* block, bool* eos);
// new interface to compatible new optimizers in FE
- [[nodiscard]] Status get_next_after_projects(
+ [[nodiscard]] virtual Status get_next_after_projects(
RuntimeState* state, vectorized::Block* block, bool* eos,
const std::function<Status(RuntimeState*, vectorized::Block*,
bool*)>& fn,
bool clear_data = true);
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index af9529798a3..0c0ccd42410 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1239,10 +1239,10 @@ template <typename Derived>
Status ScanLocalState<Derived>::_start_scanners(
const std::list<vectorized::VScannerSPtr>& scanners) {
auto& p = _parent->cast<typename Derived::Parent>();
- _scanner_ctx = PipScannerContext::create_shared(state(), this,
p._output_tuple_desc, scanners,
- p.limit(),
state()->scan_queue_mem_limit(),
- p._col_distribute_ids, 1,
_scan_dependency,
- _finish_dependency);
+ _scanner_ctx = PipScannerContext::create_shared(
+ state(), this, p._output_tuple_desc, p.output_row_descriptor(),
scanners, p.limit(),
+ state()->scan_queue_mem_limit(), p._col_distribute_ids, 1,
_scan_dependency,
+ _finish_dependency);
return Status::OK();
}
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index afbcaea0a63..833ca55f095 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -427,6 +427,10 @@ public:
Status open(RuntimeState* state) override;
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
+ Status get_block_after_projects(RuntimeState* state, vectorized::Block*
block,
+ SourceState& source_state) override {
+ return get_block(state, block, source_state);
+ }
[[nodiscard]] bool is_source() const override { return true; }
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index b90c9551426..e763a6eee1d 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -193,8 +193,8 @@ Status OperatorXBase::do_projections(RuntimeState* state,
vectorized::Block* ori
return Status::OK();
}
-Status OperatorXBase::get_next_after_projects(RuntimeState* state,
vectorized::Block* block,
- SourceState& source_state) {
+Status OperatorXBase::get_block_after_projects(RuntimeState* state,
vectorized::Block* block,
+ SourceState& source_state) {
auto local_state = state->get_local_state(operator_id());
if (_output_row_descriptor) {
local_state->clear_origin_block();
@@ -461,8 +461,8 @@ Status
PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu
template <typename LocalStateType>
Status StreamingOperatorX<LocalStateType>::get_block(RuntimeState* state,
vectorized::Block* block,
SourceState&
source_state) {
-
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_next_after_projects(state,
block,
-
source_state));
+
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_block_after_projects(state,
block,
+
source_state));
return pull(state, block, source_state);
}
@@ -473,7 +473,7 @@ Status
StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state, vectori
->template cast<LocalStateType>();
if (need_more_input_data(state)) {
local_state._child_block->clear_column_data();
-
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_next_after_projects(
+
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_block_after_projects(
state, local_state._child_block.get(),
local_state._child_source_state));
source_state = local_state._child_source_state;
if (local_state._child_block->rows() == 0 &&
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index c4e0a7e94a0..ec2500acf33 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -271,10 +271,15 @@ public:
return _output_row_descriptor ? *_output_row_descriptor :
_row_descriptor;
}
+ [[nodiscard]] const RowDescriptor* output_row_descriptor() {
+ return _output_row_descriptor.get();
+ }
+
[[nodiscard]] bool is_source() const override { return false; }
- Status get_next_after_projects(RuntimeState* state, vectorized::Block*
block,
- SourceState& source_state);
+ [[nodiscard]] virtual Status get_block_after_projects(RuntimeState* state,
+ vectorized::Block*
block,
+ SourceState&
source_state);
/// Only use in vectorized exec engine try to do projections to trans
_row_desc -> _output_row_desc
Status do_projections(RuntimeState* state, vectorized::Block* origin_block,
@@ -286,6 +291,7 @@ protected:
template <typename Dependency>
friend class PipelineXLocalState;
friend class PipelineXLocalStateBase;
+ friend class VScanner;
const int _operator_id;
const int _node_id; // unique w/in single plan tree
TPlanNodeType::type _type;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index be62fcac213..1a0c0749e8b 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -272,7 +272,7 @@ Status PipelineXTask::execute(bool* eos) {
if (!_dry_run) {
SCOPED_TIMER(_get_block_timer);
_get_block_counter->update(1);
- RETURN_IF_ERROR(_root->get_next_after_projects(_state, block,
_data_state));
+ RETURN_IF_ERROR(_root->get_block_after_projects(_state, block,
_data_state));
} else {
_data_state = SourceState::FINISHED;
}
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h
b/be/src/vec/exec/scan/pip_scanner_context.h
index 42726cb17fe..fbf59fffab2 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -31,23 +31,26 @@ class PipScannerContext : public vectorized::ScannerContext
{
public:
PipScannerContext(RuntimeState* state, vectorized::VScanNode* parent,
const TupleDescriptor* output_tuple_desc,
+ const RowDescriptor* output_row_descriptor,
const std::list<vectorized::VScannerSPtr>& scanners,
int64_t limit_,
int64_t max_bytes_in_blocks_queue, const
std::vector<int>& col_distribute_ids,
const int num_parallel_instances)
- : vectorized::ScannerContext(state, parent, output_tuple_desc,
scanners, limit_,
- max_bytes_in_blocks_queue,
num_parallel_instances),
+ : vectorized::ScannerContext(state, parent, output_tuple_desc,
output_row_descriptor,
+ scanners, limit_,
max_bytes_in_blocks_queue,
+ num_parallel_instances),
_col_distribute_ids(col_distribute_ids),
_need_colocate_distribute(!_col_distribute_ids.empty()) {}
PipScannerContext(RuntimeState* state, ScanLocalStateBase* local_state,
const TupleDescriptor* output_tuple_desc,
+ const RowDescriptor* output_row_descriptor,
const std::list<vectorized::VScannerSPtr>& scanners,
int64_t limit_,
int64_t max_bytes_in_blocks_queue, const
std::vector<int>& col_distribute_ids,
const int num_parallel_instances,
std::shared_ptr<pipeline::ScanDependency> dependency,
std::shared_ptr<pipeline::Dependency> finish_dependency)
- : vectorized::ScannerContext(state, output_tuple_desc, scanners,
limit_,
- max_bytes_in_blocks_queue,
num_parallel_instances,
+ : vectorized::ScannerContext(state, output_tuple_desc,
output_row_descriptor, scanners,
+ limit_, max_bytes_in_blocks_queue,
num_parallel_instances,
local_state, dependency,
finish_dependency),
_need_colocate_distribute(false) {}
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 4d3820d7634..954c294574f 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -46,6 +46,7 @@ namespace doris::vectorized {
using namespace std::chrono_literals;
ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor*
output_tuple_desc,
+ const RowDescriptor* output_row_descriptor,
const std::list<VScannerSPtr>& scanners,
int64_t limit_,
int64_t max_bytes_in_blocks_queue, const int
num_parallel_instances,
pipeline::ScanLocalStateBase* local_state,
@@ -54,7 +55,10 @@ ScannerContext::ScannerContext(RuntimeState* state, const
TupleDescriptor* outpu
: _state(state),
_parent(nullptr),
_local_state(local_state),
- _output_tuple_desc(output_tuple_desc),
+ _output_tuple_desc(output_row_descriptor
+ ?
output_row_descriptor->tuple_descriptors().front()
+ : output_tuple_desc),
+ _output_row_descriptor(output_row_descriptor),
_process_status(Status::OK()),
_batch_size(state->batch_size()),
limit(limit_),
@@ -66,6 +70,8 @@ ScannerContext::ScannerContext(RuntimeState* state, const
TupleDescriptor* outpu
_num_parallel_instances(num_parallel_instances),
_dependency(dependency),
_finish_dependency(finish_dependency) {
+ DCHECK(_output_row_descriptor == nullptr ||
+ _output_row_descriptor->tuple_descriptors().size() == 1);
// Use the task exec context as a lock between scanner threads and
fragment exection threads
_task_exec_ctx = _state->get_task_execution_context();
_query_id = _state->get_query_ctx()->query_id();
@@ -92,13 +98,17 @@ ScannerContext::ScannerContext(RuntimeState* state, const
TupleDescriptor* outpu
ScannerContext::ScannerContext(doris::RuntimeState* state,
doris::vectorized::VScanNode* parent,
const doris::TupleDescriptor* output_tuple_desc,
+ const RowDescriptor* output_row_descriptor,
const std::list<VScannerSPtr>& scanners,
int64_t limit_,
int64_t max_bytes_in_blocks_queue, const int
num_parallel_instances,
pipeline::ScanLocalStateBase* local_state)
: _state(state),
_parent(parent),
_local_state(local_state),
- _output_tuple_desc(output_tuple_desc),
+ _output_tuple_desc(output_row_descriptor
+ ?
output_row_descriptor->tuple_descriptors().front()
+ : output_tuple_desc),
+ _output_row_descriptor(output_row_descriptor),
_process_status(Status::OK()),
_batch_size(state->batch_size()),
limit(limit_),
@@ -108,6 +118,8 @@ ScannerContext::ScannerContext(doris::RuntimeState* state,
doris::vectorized::VS
_scanners(scanners),
_scanners_ref(scanners.begin(), scanners.end()),
_num_parallel_instances(num_parallel_instances) {
+ DCHECK(_output_row_descriptor == nullptr ||
+ _output_row_descriptor->tuple_descriptors().size() == 1);
// Use the task exec context as a lock between scanner threads and
fragment exection threads
_task_exec_ctx = _state->get_task_execution_context();
_query_id = _state->get_query_ctx()->query_id();
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index ba9c1fdee10..6a3e8553f8f 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -70,6 +70,7 @@ class ScannerContext : public
std::enable_shared_from_this<ScannerContext> {
public:
ScannerContext(RuntimeState* state, VScanNode* parent, const
TupleDescriptor* output_tuple_desc,
+ const RowDescriptor* output_row_descriptor,
const std::list<VScannerSPtr>& scanners, int64_t limit_,
int64_t max_bytes_in_blocks_queue, const int
num_parallel_instances = 1,
pipeline::ScanLocalStateBase* local_state = nullptr);
@@ -187,6 +188,7 @@ private:
protected:
ScannerContext(RuntimeState* state_, const TupleDescriptor*
output_tuple_desc,
+ const RowDescriptor* output_row_descriptor,
const std::list<VScannerSPtr>& scanners_, int64_t limit_,
int64_t max_bytes_in_blocks_queue_, const int
num_parallel_instances,
pipeline::ScanLocalStateBase* local_state,
@@ -203,6 +205,7 @@ protected:
// the comment of same fields in VScanNode
const TupleDescriptor* _output_tuple_desc = nullptr;
+ const RowDescriptor* _output_row_descriptor = nullptr;
// _transfer_lock is used to protect the critical section
// where the ScanNode and ScannerScheduler interact.
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 84f56813c34..6ec83e8bd6a 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -351,7 +351,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler,
BlockUPtr block = ctx->get_free_block();
- status = scanner->get_block(state, block.get(), &eos);
+ status = scanner->get_block_after_projects(state, block.get(), &eos);
// The VFileScanner for external table may try to open not exist files,
// Because FE file cache for external table may out of date.
// So, NOT_FOUND for VFileScanner is not a fail case.
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index 5176d7900b3..c45468e0514 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -323,10 +323,11 @@ Status VScanNode::_start_scanners(const
std::list<VScannerSPtr>& scanners,
if (_is_pipeline_scan) {
int max_queue_size = _shared_scan_opt ?
std::max(query_parallel_instance_num, 1) : 1;
_scanner_ctx = pipeline::PipScannerContext::create_shared(
- _state, this, _output_tuple_desc, scanners, limit(),
_state->scan_queue_mem_limit(),
- _col_distribute_ids, max_queue_size);
+ _state, this, _output_tuple_desc,
_output_row_descriptor.get(), scanners, limit(),
+ _state->scan_queue_mem_limit(), _col_distribute_ids,
max_queue_size);
} else {
- _scanner_ctx = ScannerContext::create_shared(_state, this,
_output_tuple_desc, scanners,
+ _scanner_ctx = ScannerContext::create_shared(_state, this,
_output_tuple_desc,
+
_output_row_descriptor.get(), scanners,
limit(),
_state->scan_queue_mem_limit());
}
return Status::OK();
diff --git a/be/src/vec/exec/scan/vscan_node.h
b/be/src/vec/exec/scan/vscan_node.h
index 5917d0ff46b..7cd8b15991e 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -177,6 +177,20 @@ public:
RuntimeProfile* scanner_profile() { return _scanner_profile.get(); }
+ Status get_next_after_projects(
+ RuntimeState* state, vectorized::Block* block, bool* eos,
+ const std::function<Status(RuntimeState*, vectorized::Block*,
bool*)>& fn,
+ bool clear_data = true) override {
+ Defer defer([block, this]() {
+ if (block && !block->empty()) {
+ COUNTER_UPDATE(_output_bytes_counter,
block->allocated_bytes());
+ COUNTER_UPDATE(_block_count_counter, 1);
+ }
+ });
+ _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
+ return get_next(state, block, eos);
+ }
+
protected:
// Different data sources register different profiles by implementing this
method
virtual Status _init_profile();
diff --git a/be/src/vec/exec/scan/vscanner.cpp
b/be/src/vec/exec/scan/vscanner.cpp
index a7c0c3c4062..0a7a8c9c019 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -35,7 +35,8 @@ VScanner::VScanner(RuntimeState* state, VScanNode* parent,
int64_t limit, Runtim
_local_state(nullptr),
_limit(limit),
_profile(profile),
- _output_tuple_desc(parent->output_tuple_desc()) {
+ _output_tuple_desc(parent->output_tuple_desc()),
+ _output_row_descriptor(_parent->_output_row_descriptor.get()) {
_total_rf_num = _parent->runtime_filter_num();
}
@@ -46,7 +47,8 @@ VScanner::VScanner(RuntimeState* state,
pipeline::ScanLocalStateBase* local_stat
_local_state(local_state),
_limit(limit),
_profile(profile),
- _output_tuple_desc(_local_state->output_tuple_desc()) {
+ _output_tuple_desc(_local_state->output_tuple_desc()),
+
_output_row_descriptor(_local_state->_parent->output_row_descriptor()) {
_total_rf_num = _local_state->runtime_filter_num();
}
@@ -58,9 +60,30 @@ Status VScanner::prepare(RuntimeState* state, const
VExprContextSPtrs& conjuncts
}
}
+ const auto& projections = _parent ? _parent->_projections :
_local_state->_projections;
+ if (!projections.empty()) {
+ _projections.resize(projections.size());
+ for (size_t i = 0; i != projections.size(); ++i) {
+ RETURN_IF_ERROR(projections[i]->clone(state, _projections[i]));
+ }
+ }
+
return Status::OK();
}
+Status VScanner::get_block_after_projects(RuntimeState* state,
vectorized::Block* block,
+ bool* eos) {
+ auto& row_descriptor =
+ _parent ? _parent->_row_descriptor :
_local_state->_parent->row_descriptor();
+ if (_output_row_descriptor) {
+
_origin_block.clear_column_data(row_descriptor.num_materialized_slots());
+ auto status = get_block(state, &_origin_block, eos);
+ if (UNLIKELY(!status.ok())) return status;
+ return _do_projections(&_origin_block, block);
+ }
+ return get_block(state, block, eos);
+}
+
Status VScanner::get_block(RuntimeState* state, Block* block, bool* eof) {
// only empty block should be here
DCHECK(block->rows() == 0);
@@ -138,6 +161,47 @@ Status VScanner::_filter_output_block(Block* block) {
return st;
}
+Status VScanner::_do_projections(vectorized::Block* origin_block,
vectorized::Block* output_block) {
+ auto projection_timer = _parent ? _parent->_projection_timer :
_local_state->_projection_timer;
+ auto exec_timer = _parent ? _parent->_exec_timer :
_local_state->_exec_timer;
+ SCOPED_TIMER(exec_timer);
+ SCOPED_TIMER(projection_timer);
+
+ MutableBlock mutable_block =
+ VectorizedUtils::build_mutable_mem_reuse_block(output_block,
*_output_row_descriptor);
+ auto rows = origin_block->rows();
+
+ if (rows != 0) {
+ auto& mutable_columns = mutable_block.mutable_columns();
+
+ if (mutable_columns.size() != _projections.size()) {
+ return Status::InternalError(
+ "Logical error in scanner, output of projections {}
mismatches with "
+ "scanner output {}",
+ _projections.size(), mutable_columns.size());
+ }
+
+ for (int i = 0; i < mutable_columns.size(); ++i) {
+ auto result_column_id = -1;
+ RETURN_IF_ERROR(_projections[i]->execute(origin_block,
&result_column_id));
+ auto column_ptr = origin_block->get_by_position(result_column_id)
+
.column->convert_to_full_column_if_const();
+ //TODO: this is a quick fix, we need a new function like
"change_to_nullable" to do it
+ if (mutable_columns[i]->is_nullable() xor
column_ptr->is_nullable()) {
+ DCHECK(mutable_columns[i]->is_nullable() &&
!column_ptr->is_nullable());
+ reinterpret_cast<ColumnNullable*>(mutable_columns[i].get())
+ ->insert_range_from_not_nullable(*column_ptr, 0, rows);
+ } else {
+ mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
+ }
+ }
+ DCHECK(mutable_block.rows() == rows);
+ output_block->set_columns(std::move(mutable_columns));
+ }
+
+ return Status::OK();
+}
+
Status VScanner::try_append_late_arrival_runtime_filter() {
if (_applied_rf_num == _total_rf_num) {
return Status::OK();
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 29daf9a68c5..b00c3f348f4 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -67,6 +67,7 @@ public:
virtual Status open(RuntimeState* state) { return Status::OK(); }
Status get_block(RuntimeState* state, Block* block, bool* eos);
+ Status get_block_after_projects(RuntimeState* state, vectorized::Block*
block, bool* eos);
virtual Status close(RuntimeState* state);
@@ -89,6 +90,8 @@ protected:
// Filter the output block finally.
Status _filter_output_block(Block* block);
+ Status _do_projections(vectorized::Block* origin_block, vectorized::Block*
output_block);
+
// Not virtual, all child will call this method explictly
Status prepare(RuntimeState* state, const VExprContextSPtrs& conjuncts);
@@ -172,6 +175,7 @@ protected:
RuntimeProfile* _profile = nullptr;
const TupleDescriptor* _output_tuple_desc = nullptr;
+ const RowDescriptor* _output_row_descriptor = nullptr;
// If _input_tuple_desc is set, the scanner will read data into
// this _input_block first, then convert to the output block.
@@ -189,6 +193,8 @@ protected:
// Cloned from _conjuncts of scan node.
// It includes predicate in SQL and runtime filters.
VExprContextSPtrs _conjuncts;
+ VExprContextSPtrs _projections;
+ vectorized::Block _origin_block;
VExprContextSPtrs _common_expr_ctxs_push_down;
// Late arriving runtime filters will update _conjuncts.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]