github-actions[bot] commented on code in PR #24850:
URL: https://github.com/apache/doris/pull/24850#discussion_r1335347210
##########
be/src/pipeline/exec/schema_scan_operator.cpp:
##########
@@ -41,4 +42,230 @@ Status SchemaScanOperator::close(RuntimeState* state) {
return Status::OK();
}
+Status SchemaScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
+ RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
+
+ SCOPED_TIMER(profile()->total_time_counter());
+ SCOPED_TIMER(_open_timer);
+ auto& p = _parent->cast<SchemaScanOperatorX>();
+ _scanner_param.common_param = p._common_scanner_param;
+ // init schema scanner profile
+ _scanner_param.profile.reset(new RuntimeProfile("SchemaScanner"));
+ profile()->add_child(_scanner_param.profile.get(), true, nullptr);
+
+ // get src tuple desc
+ const SchemaTableDescriptor* schema_table =
Review Comment:
warning: variable 'schema_table' is not initialized
[cppcoreguidelines-init-variables]
```suggestion
const SchemaTableDescriptor* schema_table = nullptr =
```
##########
be/src/exec/schema_scanner.h:
##########
@@ -54,18 +63,14 @@ struct SchemaScannerParam {
int32_t port; // frontend thrift port
int64_t thread_id;
const std::string* catalog;
+};
+
+// scanner parameter from frontend
+struct SchemaScannerParam {
+ std::shared_ptr<SchemaScannerCommonParam> common_param;
std::unique_ptr<RuntimeProfile> profile;
- SchemaScannerParam()
- : db(nullptr),
- table(nullptr),
- wild(nullptr),
- user(nullptr),
- user_ip(nullptr),
- current_user_ident(nullptr),
- ip(nullptr),
- port(0),
- catalog(nullptr) {}
+ SchemaScannerParam() : common_param(new SchemaScannerCommonParam()) {}
Review Comment:
warning: use '= default' to define a trivial default constructor
[modernize-use-equals-default]
```suggestion
SchemaScannerParam() : common_param(new SchemaScannerCommonParam()) =
default;
```
##########
be/src/pipeline/exec/schema_scan_operator.cpp:
##########
@@ -41,4 +42,230 @@
return Status::OK();
}
+Status SchemaScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
+ RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
+
+ SCOPED_TIMER(profile()->total_time_counter());
+ SCOPED_TIMER(_open_timer);
+ auto& p = _parent->cast<SchemaScanOperatorX>();
+ _scanner_param.common_param = p._common_scanner_param;
+ // init schema scanner profile
+ _scanner_param.profile.reset(new RuntimeProfile("SchemaScanner"));
+ profile()->add_child(_scanner_param.profile.get(), true, nullptr);
+
+ // get src tuple desc
+ const SchemaTableDescriptor* schema_table =
+ static_cast<const
SchemaTableDescriptor*>(p._dest_tuple_desc->table_desc());
+ // new one scanner
+ _schema_scanner = SchemaScanner::create(schema_table->schema_table_type());
+
+ if (nullptr == _schema_scanner) {
+ return Status::InternalError("schema scanner get nullptr pointer.");
+ }
+
+ RETURN_IF_ERROR(_schema_scanner->init(&_scanner_param, state->obj_pool()));
+ return _schema_scanner->start(state);
+}
+
+SchemaScanOperatorX::SchemaScanOperatorX(ObjectPool* pool, const TPlanNode&
tnode,
+ const DescriptorTbl& descs)
+ : Base(pool, tnode, descs),
+ _table_name(tnode.schema_scan_node.table_name),
+ _common_scanner_param(new SchemaScannerCommonParam()),
+ _tuple_id(tnode.schema_scan_node.tuple_id),
+ _dest_tuple_desc(nullptr),
+ _tuple_idx(0),
+ _slot_num(0) {}
+
+Status SchemaScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
+ RETURN_IF_ERROR(Base::init(tnode, state));
+
+ if (tnode.schema_scan_node.__isset.db) {
+ _common_scanner_param->db =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.db));
+ }
+
+ if (tnode.schema_scan_node.__isset.table) {
+ _common_scanner_param->table =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.table));
+ }
+
+ if (tnode.schema_scan_node.__isset.wild) {
+ _common_scanner_param->wild =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.wild));
+ }
+
+ if (tnode.schema_scan_node.__isset.current_user_ident) {
+ _common_scanner_param->current_user_ident = state->obj_pool()->add(
+ new TUserIdentity(tnode.schema_scan_node.current_user_ident));
+ } else {
+ if (tnode.schema_scan_node.__isset.user) {
+ _common_scanner_param->user =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.user));
+ }
+ if (tnode.schema_scan_node.__isset.user_ip) {
+ _common_scanner_param->user_ip =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.user_ip));
+ }
+ }
+
+ if (tnode.schema_scan_node.__isset.ip) {
+ _common_scanner_param->ip =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.ip));
+ }
+ if (tnode.schema_scan_node.__isset.port) {
+ _common_scanner_param->port = tnode.schema_scan_node.port;
+ }
+
+ if (tnode.schema_scan_node.__isset.thread_id) {
+ _common_scanner_param->thread_id = tnode.schema_scan_node.thread_id;
+ }
+
+ if (tnode.schema_scan_node.__isset.catalog) {
+ _common_scanner_param->catalog =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.catalog));
+ }
+ return Status::OK();
+}
+
+Status SchemaScanOperatorX::open(RuntimeState* state) {
+ RETURN_IF_ERROR(Base::open(state));
+
+ if (_common_scanner_param->user) {
+ TSetSessionParams param;
Review Comment:
warning: variable 'param' is not initialized
[cppcoreguidelines-init-variables]
```suggestion
TSetSessionParams param = 0;
```
##########
be/src/pipeline/exec/schema_scan_operator.cpp:
##########
@@ -41,4 +42,230 @@
return Status::OK();
}
+Status SchemaScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
+ RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
+
+ SCOPED_TIMER(profile()->total_time_counter());
+ SCOPED_TIMER(_open_timer);
+ auto& p = _parent->cast<SchemaScanOperatorX>();
+ _scanner_param.common_param = p._common_scanner_param;
+ // init schema scanner profile
+ _scanner_param.profile.reset(new RuntimeProfile("SchemaScanner"));
+ profile()->add_child(_scanner_param.profile.get(), true, nullptr);
+
+ // get src tuple desc
+ const SchemaTableDescriptor* schema_table =
+ static_cast<const
SchemaTableDescriptor*>(p._dest_tuple_desc->table_desc());
+ // new one scanner
+ _schema_scanner = SchemaScanner::create(schema_table->schema_table_type());
+
+ if (nullptr == _schema_scanner) {
+ return Status::InternalError("schema scanner get nullptr pointer.");
+ }
+
+ RETURN_IF_ERROR(_schema_scanner->init(&_scanner_param, state->obj_pool()));
+ return _schema_scanner->start(state);
+}
+
+SchemaScanOperatorX::SchemaScanOperatorX(ObjectPool* pool, const TPlanNode&
tnode,
+ const DescriptorTbl& descs)
+ : Base(pool, tnode, descs),
+ _table_name(tnode.schema_scan_node.table_name),
+ _common_scanner_param(new SchemaScannerCommonParam()),
+ _tuple_id(tnode.schema_scan_node.tuple_id),
+ _dest_tuple_desc(nullptr),
+ _tuple_idx(0),
+ _slot_num(0) {}
+
+Status SchemaScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
+ RETURN_IF_ERROR(Base::init(tnode, state));
+
+ if (tnode.schema_scan_node.__isset.db) {
+ _common_scanner_param->db =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.db));
+ }
+
+ if (tnode.schema_scan_node.__isset.table) {
+ _common_scanner_param->table =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.table));
+ }
+
+ if (tnode.schema_scan_node.__isset.wild) {
+ _common_scanner_param->wild =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.wild));
+ }
+
+ if (tnode.schema_scan_node.__isset.current_user_ident) {
+ _common_scanner_param->current_user_ident = state->obj_pool()->add(
+ new TUserIdentity(tnode.schema_scan_node.current_user_ident));
+ } else {
+ if (tnode.schema_scan_node.__isset.user) {
+ _common_scanner_param->user =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.user));
+ }
+ if (tnode.schema_scan_node.__isset.user_ip) {
+ _common_scanner_param->user_ip =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.user_ip));
+ }
+ }
+
+ if (tnode.schema_scan_node.__isset.ip) {
+ _common_scanner_param->ip =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.ip));
+ }
+ if (tnode.schema_scan_node.__isset.port) {
+ _common_scanner_param->port = tnode.schema_scan_node.port;
+ }
+
+ if (tnode.schema_scan_node.__isset.thread_id) {
+ _common_scanner_param->thread_id = tnode.schema_scan_node.thread_id;
+ }
+
+ if (tnode.schema_scan_node.__isset.catalog) {
+ _common_scanner_param->catalog =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.catalog));
+ }
+ return Status::OK();
+}
+
+Status SchemaScanOperatorX::open(RuntimeState* state) {
+ RETURN_IF_ERROR(Base::open(state));
+
+ if (_common_scanner_param->user) {
+ TSetSessionParams param;
+ param.__set_user(*_common_scanner_param->user);
+ //TStatus t_status;
+ //RETURN_IF_ERROR(SchemaJniHelper::set_session(param, &t_status));
+ //RETURN_IF_ERROR(Status(t_status));
+ }
+
+ return Status::OK();
+}
+
+Status SchemaScanOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(Base::prepare(state));
+
+ // get dest tuple desc
+ _dest_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+
+ if (nullptr == _dest_tuple_desc) {
+ return Status::InternalError("Failed to get tuple descriptor.");
+ }
+
+ _slot_num = _dest_tuple_desc->slots().size();
+ // get src tuple desc
+ const SchemaTableDescriptor* schema_table =
Review Comment:
warning: variable 'schema_table' is not initialized
[cppcoreguidelines-init-variables]
```suggestion
const SchemaTableDescriptor* schema_table = nullptr =
```
##########
be/src/pipeline/exec/schema_scan_operator.cpp:
##########
@@ -41,4 +42,230 @@
return Status::OK();
}
+Status SchemaScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
+ RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
+
+ SCOPED_TIMER(profile()->total_time_counter());
+ SCOPED_TIMER(_open_timer);
+ auto& p = _parent->cast<SchemaScanOperatorX>();
+ _scanner_param.common_param = p._common_scanner_param;
+ // init schema scanner profile
+ _scanner_param.profile.reset(new RuntimeProfile("SchemaScanner"));
+ profile()->add_child(_scanner_param.profile.get(), true, nullptr);
+
+ // get src tuple desc
+ const SchemaTableDescriptor* schema_table =
+ static_cast<const
SchemaTableDescriptor*>(p._dest_tuple_desc->table_desc());
+ // new one scanner
+ _schema_scanner = SchemaScanner::create(schema_table->schema_table_type());
+
+ if (nullptr == _schema_scanner) {
+ return Status::InternalError("schema scanner get nullptr pointer.");
+ }
+
+ RETURN_IF_ERROR(_schema_scanner->init(&_scanner_param, state->obj_pool()));
+ return _schema_scanner->start(state);
+}
+
+SchemaScanOperatorX::SchemaScanOperatorX(ObjectPool* pool, const TPlanNode&
tnode,
+ const DescriptorTbl& descs)
+ : Base(pool, tnode, descs),
+ _table_name(tnode.schema_scan_node.table_name),
+ _common_scanner_param(new SchemaScannerCommonParam()),
+ _tuple_id(tnode.schema_scan_node.tuple_id),
+ _dest_tuple_desc(nullptr),
+ _tuple_idx(0),
+ _slot_num(0) {}
+
+Status SchemaScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
+ RETURN_IF_ERROR(Base::init(tnode, state));
+
+ if (tnode.schema_scan_node.__isset.db) {
+ _common_scanner_param->db =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.db));
+ }
+
+ if (tnode.schema_scan_node.__isset.table) {
+ _common_scanner_param->table =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.table));
+ }
+
+ if (tnode.schema_scan_node.__isset.wild) {
+ _common_scanner_param->wild =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.wild));
+ }
+
+ if (tnode.schema_scan_node.__isset.current_user_ident) {
+ _common_scanner_param->current_user_ident = state->obj_pool()->add(
+ new TUserIdentity(tnode.schema_scan_node.current_user_ident));
+ } else {
+ if (tnode.schema_scan_node.__isset.user) {
+ _common_scanner_param->user =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.user));
+ }
+ if (tnode.schema_scan_node.__isset.user_ip) {
+ _common_scanner_param->user_ip =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.user_ip));
+ }
+ }
+
+ if (tnode.schema_scan_node.__isset.ip) {
+ _common_scanner_param->ip =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.ip));
+ }
+ if (tnode.schema_scan_node.__isset.port) {
+ _common_scanner_param->port = tnode.schema_scan_node.port;
+ }
+
+ if (tnode.schema_scan_node.__isset.thread_id) {
+ _common_scanner_param->thread_id = tnode.schema_scan_node.thread_id;
+ }
+
+ if (tnode.schema_scan_node.__isset.catalog) {
+ _common_scanner_param->catalog =
+ state->obj_pool()->add(new
std::string(tnode.schema_scan_node.catalog));
+ }
+ return Status::OK();
+}
+
+Status SchemaScanOperatorX::open(RuntimeState* state) {
+ RETURN_IF_ERROR(Base::open(state));
+
+ if (_common_scanner_param->user) {
+ TSetSessionParams param;
+ param.__set_user(*_common_scanner_param->user);
+ //TStatus t_status;
+ //RETURN_IF_ERROR(SchemaJniHelper::set_session(param, &t_status));
+ //RETURN_IF_ERROR(Status(t_status));
+ }
+
+ return Status::OK();
+}
+
+Status SchemaScanOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(Base::prepare(state));
+
+ // get dest tuple desc
+ _dest_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
+
+ if (nullptr == _dest_tuple_desc) {
+ return Status::InternalError("Failed to get tuple descriptor.");
+ }
+
+ _slot_num = _dest_tuple_desc->slots().size();
+ // get src tuple desc
+ const SchemaTableDescriptor* schema_table =
+ static_cast<const
SchemaTableDescriptor*>(_dest_tuple_desc->table_desc());
+
+ if (nullptr == schema_table) {
+ return Status::InternalError("Failed to get schema table descriptor.");
+ }
+
+ // new one scanner
+ _schema_scanner = SchemaScanner::create(schema_table->schema_table_type());
+
+ if (nullptr == _schema_scanner) {
+ return Status::InternalError("schema scanner get nullptr pointer.");
+ }
+
+ const std::vector<SchemaScanner::ColumnDesc>&
columns_desc(_schema_scanner->get_column_desc());
+
+ // if src columns size is zero, it's the dummy slots.
+ if (0 == columns_desc.size()) {
+ _slot_num = 0;
+ }
+
+ // check if type is ok.
+ for (int i = 0; i < _slot_num; ++i) {
+ int j = 0;
+ for (; j < columns_desc.size(); ++j) {
+ if (boost::iequals(_dest_tuple_desc->slots()[i]->col_name(),
columns_desc[j].name)) {
+ break;
+ }
+ }
+
+ if (j >= columns_desc.size()) {
+ LOG(WARNING) << "no match column for this column("
+ << _dest_tuple_desc->slots()[i]->col_name() << ")";
+ return Status::InternalError("no match column for this column.");
+ }
+
+ if (columns_desc[j].type != _dest_tuple_desc->slots()[i]->type().type)
{
+ LOG(WARNING) << "schema not match. input is " <<
columns_desc[j].name << "("
+ << columns_desc[j].type << ") and output is "
+ << _dest_tuple_desc->slots()[i]->col_name() << "("
+ << _dest_tuple_desc->slots()[i]->type() << ")";
+ return Status::InternalError("schema not match.");
+ }
+ }
+
+ _tuple_idx = 0;
+
+ return Status::OK();
+}
+
+Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block*
block,
+ SourceState& source_state) {
+ CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ SCOPED_TIMER(local_state.profile()->total_time_counter());
+ RETURN_IF_CANCELLED(state);
+ bool schema_eos = false;
+
+ const std::vector<SchemaScanner::ColumnDesc>& columns_desc(
+ local_state._schema_scanner->get_column_desc());
+
+ do {
+ block->clear();
+ for (int i = 0; i < _slot_num; ++i) {
+ auto dest_slot_desc = _dest_tuple_desc->slots()[i];
+ block->insert(vectorized::ColumnWithTypeAndName(
+ dest_slot_desc->get_empty_mutable_column(),
dest_slot_desc->get_data_type_ptr(),
+ dest_slot_desc->col_name()));
+ }
+
+ // src block columns desc is filled by schema_scanner->get_column_desc.
+ vectorized::Block src_block;
+ for (int i = 0; i < columns_desc.size(); ++i) {
+ TypeDescriptor descriptor(columns_desc[i].type);
+ auto data_type =
+
vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
+
src_block.insert(vectorized::ColumnWithTypeAndName(data_type->create_column(),
+ data_type,
columns_desc[i].name));
+ }
+ while (true) {
+ RETURN_IF_CANCELLED(state);
+
+ // get all slots from schema table.
+
RETURN_IF_ERROR(local_state._schema_scanner->get_next_block(&src_block,
&schema_eos));
+
+ if (schema_eos) {
+ source_state = SourceState::FINISHED;
+ break;
+ }
+
+ if (src_block.rows() >= state->batch_size()) {
+ break;
+ }
+ }
+
+ if (src_block.rows()) {
+ // block->check_number_of_rows();
+ for (int i = 0; i < _slot_num; ++i) {
+ auto dest_slot_desc = _dest_tuple_desc->slots()[i];
+ vectorized::MutableColumnPtr column_ptr =
Review Comment:
warning: variable 'column_ptr' is not initialized
[cppcoreguidelines-init-variables]
```suggestion
vectorized::MutableColumnPtr column_ptr = 0 =
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]