HappenLee commented on code in PR #12010: URL: https://github.com/apache/doris/pull/12010#discussion_r952656149
########## be/src/vec/exec/vjdbc_scan_node.cpp: ########## @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "vec/exec/vjdbc_scan_node.h" +#ifdef LIBJVM +#include <string> +#include "common/status.h" + +namespace doris { +namespace vectorized { + +VJdbcScanNode::VJdbcScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : ScanNode(pool, tnode, descs), + _is_init(false), + _table_name(tnode.jdbc_scan_node.table_name), + _tuple_id(tnode.jdbc_scan_node.tuple_id), + _columns(tnode.jdbc_scan_node.columns), + _filters(tnode.jdbc_scan_node.filters), + _tuple_desc(nullptr) {} + +Status VJdbcScanNode::prepare(RuntimeState* state) { + VLOG_CRITICAL << "VJdbcScanNode::Prepare"; + if (_is_init) { + return Status::OK(); + } + + if (state == nullptr) { + return Status::InternalError("input pointer is NULL of VJdbcScanNode::prepare."); + } + + RETURN_IF_ERROR(ScanNode::prepare(state)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + + // get tuple desc + _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); + if (_tuple_desc == nullptr) { + return Status::InternalError("Failed to get tuple descriptor."); + } + + // get jdbc table info + const JdbcTableDescriptor* jdbc_table = + static_cast<const JdbcTableDescriptor*>(_tuple_desc->table_desc()); + if (jdbc_table == nullptr) { + return Status::InternalError("jdbc table pointer is NULL of VJdbcScanNode::prepare."); + } + _jdbc_param.driver_class = jdbc_table->jdbc_driver_class(); + _jdbc_param.driver_path = jdbc_table->jdbc_driver_url(); + _jdbc_param.resource_name = jdbc_table->jdbc_resource_name(); + _jdbc_param.driver_checksum = jdbc_table->jdbc_driver_checksum(); + _jdbc_param.jdbc_url = jdbc_table->jdbc_url(); + _jdbc_param.user = jdbc_table->jdbc_user(); + _jdbc_param.passwd = jdbc_table->jdbc_passwd(); + _jdbc_param.tuple_desc = _tuple_desc; + + _jdbc_connector.reset(new (std::nothrow) JdbcConnector(_jdbc_param)); + if (_jdbc_connector == nullptr) { + return Status::InternalError("new a jdbc scanner failed."); + } + + _is_init = true; + return Status::OK(); +} + +Status VJdbcScanNode::open(RuntimeState* state) { + START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJdbcScanNode::open"); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(ExecNode::open(state)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + VLOG_CRITICAL << "VJdbcScanNode::open"; + + if (state == nullptr) { Review Comment: what case the state will be `nullptr` ? ########## be/src/runtime/descriptors.h: ########## @@ -275,6 +275,30 @@ class ODBCTableDescriptor : public TableDescriptor { TOdbcTableType::type _type; }; +class JdbcTableDescriptor : public TableDescriptor { +public: + JdbcTableDescriptor(const TTableDescriptor& tdesc); + std::string debug_string() const override; + const std::string jdbc_resource_name() const { return _jdbc_resource_name; } Review Comment: here only need to return &, not copy ########## be/src/runtime/descriptors.cpp: ########## @@ -221,6 +223,28 @@ std::string ODBCTableDescriptor::debug_string() const { return out.str(); } +JdbcTableDescriptor::JdbcTableDescriptor(const TTableDescriptor& tdesc) + : TableDescriptor(tdesc), + _jdbc_resource_name(tdesc.jdbcTable.jdbc_resource_name), + _jdbc_driver_url(tdesc.jdbcTable.jdbc_driver_url), + _jdbc_driver_class(tdesc.jdbcTable.jdbc_driver_class), + _jdbc_driver_checksum(tdesc.jdbcTable.jdbc_driver_checksum), + _jdbc_url(tdesc.jdbcTable.jdbc_url), + _jdbc_table_name(tdesc.jdbcTable.jdbc_table_name), + _jdbc_user(tdesc.jdbcTable.jdbc_user), + _jdbc_passwd(tdesc.jdbcTable.jdbc_password) {} + +std::string JdbcTableDescriptor::debug_string() const { + std::stringstream out; + out << "JDBCTable(" << TableDescriptor::debug_string() Review Comment: use fmt lib to replace `std::stringstream` ########## be/src/vec/exec/vjdbc_scan_node.cpp: ########## @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "vec/exec/vjdbc_scan_node.h" +#ifdef LIBJVM +#include <string> +#include "common/status.h" + +namespace doris { +namespace vectorized { + +VJdbcScanNode::VJdbcScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) + : ScanNode(pool, tnode, descs), + _is_init(false), + _table_name(tnode.jdbc_scan_node.table_name), + _tuple_id(tnode.jdbc_scan_node.tuple_id), + _columns(tnode.jdbc_scan_node.columns), + _filters(tnode.jdbc_scan_node.filters), + _tuple_desc(nullptr) {} + +Status VJdbcScanNode::prepare(RuntimeState* state) { + VLOG_CRITICAL << "VJdbcScanNode::Prepare"; + if (_is_init) { + return Status::OK(); + } + + if (state == nullptr) { + return Status::InternalError("input pointer is NULL of VJdbcScanNode::prepare."); + } + + RETURN_IF_ERROR(ScanNode::prepare(state)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + + // get tuple desc + _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id); + if (_tuple_desc == nullptr) { + return Status::InternalError("Failed to get tuple descriptor."); + } + + // get jdbc table info + const JdbcTableDescriptor* jdbc_table = + static_cast<const JdbcTableDescriptor*>(_tuple_desc->table_desc()); + if (jdbc_table == nullptr) { + return Status::InternalError("jdbc table pointer is NULL of VJdbcScanNode::prepare."); + } + _jdbc_param.driver_class = jdbc_table->jdbc_driver_class(); + _jdbc_param.driver_path = jdbc_table->jdbc_driver_url(); + _jdbc_param.resource_name = jdbc_table->jdbc_resource_name(); + _jdbc_param.driver_checksum = jdbc_table->jdbc_driver_checksum(); + _jdbc_param.jdbc_url = jdbc_table->jdbc_url(); + _jdbc_param.user = jdbc_table->jdbc_user(); + _jdbc_param.passwd = jdbc_table->jdbc_passwd(); + _jdbc_param.tuple_desc = _tuple_desc; + + _jdbc_connector.reset(new (std::nothrow) JdbcConnector(_jdbc_param)); + if (_jdbc_connector == nullptr) { + return Status::InternalError("new a jdbc scanner failed."); + } + + _is_init = true; + return Status::OK(); +} + +Status VJdbcScanNode::open(RuntimeState* state) { + START_AND_SCOPE_SPAN(state->get_tracer(), span, "VJdbcScanNode::open"); + SCOPED_TIMER(_runtime_profile->total_time_counter()); + RETURN_IF_ERROR(ExecNode::open(state)); + SCOPED_CONSUME_MEM_TRACKER(mem_tracker()); + VLOG_CRITICAL << "VJdbcScanNode::open"; + + if (state == nullptr) { + return Status::InternalError("input pointer is NULL of VJdbcScanNode::open."); + } + + if (!_is_init) { + return Status::InternalError("used before initialize of VJdbcScanNode::open."); + } + + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(_jdbc_connector->open()); + RETURN_IF_ERROR(_jdbc_connector->query(_table_name, _columns, _filters, _limit)); + return Status::OK(); +} + +Status VJdbcScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { + VLOG_CRITICAL << "VJdbcScanNode::get_next"; + INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VJdbcScanNode::get_next"); + if (nullptr == state || nullptr == block || nullptr == eos) { + return Status::InternalError("input is NULL pointer"); + } + + if (!_is_init) { + return Status::InternalError("used before initialize of VJdbcScanNode::get_next."); + } + + auto column_size = _tuple_desc->slots().size(); + std::vector<MutableColumnPtr> columns(column_size); + bool mem_reuse = block->mem_reuse(); + // only empty block should be here + DCHECK(block->rows() == 0); + + bool jdbc_eos = false; + do { + RETURN_IF_CANCELLED(state); + + columns.resize(column_size); + for (auto i = 0; i < column_size; i++) { + if (mem_reuse) { + columns[i] = std::move(*block->get_by_position(i).column).mutate(); + } else { + columns[i] = _tuple_desc->slots()[i]->get_empty_mutable_column(); + } + } + + RETURN_IF_ERROR(_jdbc_connector->get_next(&jdbc_eos, columns, state->batch_size())); + + if (jdbc_eos) { + *eos = true; + break; + } + + // Before really use the Block, must clear other ptr of column in block + // So here need do std::move and clear in `columns` + if (!mem_reuse) { + int column_index = 0; + for (const auto slot_desc : _tuple_desc->slots()) { + block->insert(ColumnWithTypeAndName(std::move(columns[column_index++]), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + } else { + columns.clear(); + } + VLOG_ROW << "VJdbcScanNode output rows: " << block->rows(); + } while (block->rows() == 0 && !(*eos)); Review Comment: rethink the logic? maybe should do filter here? ########## be/src/vec/exec/vjdbc_connector.cpp: ########## @@ -0,0 +1,361 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/vjdbc_connector.h" +#ifdef LIBJVM +#include "gen_cpp/Types_types.h" +#include "gutil/strings/substitute.h" +#include "jni.h" +#include "runtime/user_function_cache.h" +#include "util/jni-util.h" +#include "vec/columns/column_nullable.h" +namespace doris { +namespace vectorized { +const char* JDBC_EXECUTOR_CLASS = "org/apache/doris/udf/JdbcExecutor"; +const char* JDBC_EXECUTOR_CTOR_SIGNATURE = "([B)V"; +const char* JDBC_EXECUTOR_QUERYSQL_SIGNATURE = "(Ljava/lang/String;)I"; +const char* JDBC_EXECUTOR_HAS_NEXT_SIGNATURE = "()Z"; +const char* JDBC_EXECUTOR_GET_BLOCK_SIGNATURE = "(I)Ljava/util/List;"; +const char* JDBC_EXECUTOR_CLOSE_SIGNATURE = "()V"; +const char* JDBC_EXECUTOR_CONVERT_DATE_SIGNATURE = "(Ljava/lang/Object;)J"; +const char* JDBC_EXECUTOR_CONVERT_DATETIME_SIGNATURE = "(Ljava/lang/Object;)J"; + +JdbcConnector::JdbcConnector(const JdbcConnectorParam& param) + : _is_open(false), _tuple_desc(param.tuple_desc), _conn_param(param) {} + +JdbcConnector::~JdbcConnector() { + if (!_is_open) { + return; + } + JNIEnv* env; + Status status; + RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env)); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_close_id); + RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env)); + env->DeleteGlobalRef(_executor_obj); +} + +Status JdbcConnector::open() { + if (_is_open) { + LOG(INFO) << "this scanner of jdbc already opened"; + return Status::OK(); + } + + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, JDBC_EXECUTOR_CLASS, &_executor_clazz)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/util/List", &_executor_list_clazz)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Object", &_executor_object_clazz)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Boolean", &_executor_uint8_t_clazz)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Byte", &_executor_int8_t_clazz)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Short", &_executor_int16_t_clazz)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Integer", &_executor_int32_t_clazz)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Long", &_executor_int64_t_clazz)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Float", &_executor_float_clazz)); + RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, "java/lang/Float", &_executor_double_clazz)); + RETURN_IF_ERROR(_register_func_id(env)); + + // Add a scoped cleanup jni reference object. This cleans up local refs made below. + JniLocalFrame jni_frame; + { + std::string local_location; + std::hash<std::string> hash_str; + auto function_cache = UserFunctionCache::instance(); + RETURN_IF_ERROR(function_cache->get_jarpath(hash_str(_conn_param.resource_name), + _conn_param.driver_path, + _conn_param.driver_checksum, &local_location)); + + TJdbcExecutorCtorParams ctor_params; + ctor_params.__set_jar_location_path(local_location); + ctor_params.__set_jdbc_url(_conn_param.jdbc_url); + ctor_params.__set_jdbc_user(_conn_param.user); + ctor_params.__set_jdbc_password(_conn_param.passwd); + ctor_params.__set_jdbc_driver_class(_conn_param.driver_class); + + jbyteArray ctor_params_bytes; + // Pushed frame will be popped when jni_frame goes out-of-scope. + RETURN_IF_ERROR(jni_frame.push(env)); + RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, &ctor_params_bytes)); + _executor_obj = env->NewObject(_executor_clazz, _executor_ctor_id, ctor_params_bytes); + } + RETURN_ERROR_IF_EXC(env); + RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, _executor_obj, &_executor_obj)); + _is_open = true; + return Status::OK(); +} + +Status JdbcConnector::query(const std::string& table, const std::vector<std::string>& fields, + const std::vector<std::string>& filters, const int64_t limit) { + if (!_is_open) { + return Status::InternalError("Query before open of jdbc query."); + } + + _sql_str = "SELECT "; Review Comment: the direct operator of string will cause perfermance problem, may should use fmt buffer here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
