rajvarun77 commented on code in PR #3330: URL: https://github.com/apache/brpc/pull/3330#discussion_r3384395487
########## src/brpc/policy/mysql/mysql_common.h: ########## @@ -0,0 +1,422 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Authors: Yang,Liming ([email protected]) + +#ifndef BRPC_MYSQL_COMMON_H +#define BRPC_MYSQL_COMMON_H + +#include <sstream> +#include <map> +#include "butil/logging.h" // LOG() + +namespace brpc { +// Msql Collation +extern const char* MysqlDefaultCollation; +extern const char* MysqlBinaryCollation; +const std::map<std::string, uint8_t> MysqlCollations = { + {"big5_chinese_ci", 1}, + {"latin2_czech_cs", 2}, Review Comment: Addressed in the current revision — `MysqlCollations` is declared `extern` in `mysql_common.h` and defined once in `mysql_common.cpp`, so there is no in-header definition / ODR issue. ########## src/brpc/policy/mysql/mysql.cpp: ########## @@ -0,0 +1,530 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Authors: Yang,Liming ([email protected]) + +#define INTERNAL_SUPPRESS_PROTOBUF_FIELD_DEPRECATION +#include <algorithm> +#include <gflags/gflags.h> +#include "butil/string_printf.h" +#include "butil/macros.h" +#include "brpc/controller.h" +#include "brpc/policy/mysql/mysql.h" +#include "brpc/policy/mysql/mysql_common.h" + +namespace brpc { + +DEFINE_int32(mysql_multi_replies_size, 10, "multi replies size in one MysqlResponse"); + +// =================================================================== + +butil::Status MysqlStatementStub::PackExecuteCommand(butil::IOBuf* outbuf, uint32_t stmt_id) { + butil::Status st; + // long data + for (const auto& i : _long_data) { + st = MysqlMakeLongDataPacket(outbuf, stmt_id, i.param_id, i.long_data); + if (!st.ok()) { + LOG(ERROR) << "make long data header error " << st; + return st; + } + } + _long_data.clear(); + // execute data + st = MysqlMakeExecutePacket(outbuf, stmt_id, _execute_data); + if (!st.ok()) { + LOG(ERROR) << "make execute header error " << st; + return st; + } + _execute_data.clear(); + _null_mask.mask.clear(); + _null_mask.area = butil::IOBuf::INVALID_AREA; + _param_types.types.clear(); + _param_types.area = butil::IOBuf::INVALID_AREA; + + return st; +} + +MysqlRequest::MysqlRequest() + : NonreflectableMessage<MysqlRequest>() { + SharedCtor(); +} + +MysqlRequest::MysqlRequest(const MysqlTransaction* tx) + : NonreflectableMessage<MysqlRequest>() { + SharedCtor(); + _tx = tx; +} + +MysqlRequest::MysqlRequest(MysqlStatement* stmt) + : NonreflectableMessage<MysqlRequest>() { + SharedCtor(); + _stmt = new MysqlStatementStub(stmt); +} + +MysqlRequest::MysqlRequest(const MysqlTransaction* tx, MysqlStatement* stmt) + : NonreflectableMessage<MysqlRequest>() { + SharedCtor(); + _tx = tx; + _stmt = new MysqlStatementStub(stmt); +} + +MysqlRequest::MysqlRequest(const MysqlRequest& from) + : NonreflectableMessage<MysqlRequest>(from) { + SharedCtor(); + MergeFrom(from); +} + +void MysqlRequest::SharedCtor() { + _has_error = false; + _cached_size_ = 0; + _has_command = false; + _tx = NULL; + _stmt = NULL; + _param_index = 0; +} + +MysqlRequest::~MysqlRequest() { + SharedDtor(); + if (_stmt != NULL) { + delete _stmt; + } + _stmt = NULL; +} + +void MysqlRequest::SharedDtor() { +} + +void MysqlRequest::SetCachedSize(int size) const { + _cached_size_ = size; +} + +void MysqlRequest::Clear() { + _has_error = false; + _buf.clear(); + _has_command = false; + _tx = NULL; + if (_stmt) { + delete _stmt; + _stmt = NULL; + } + _param_index = 0; +} + +size_t MysqlRequest::ByteSizeLong() const { + int total_size = _buf.size(); + _cached_size_ = total_size; + return total_size; +} + +void MysqlRequest::MergeFrom(const MysqlRequest& from) { + if (&from == this) { + return; + } + // Copy all members so CopyFrom/copy-construct yields an equivalent request + // instead of an empty one. + _has_command = from._has_command; + _has_error = from._has_error; + _buf = from._buf; + _cached_size_ = from._cached_size_; + _param_index = from._param_index; + // _tx is a non-owning pointer (never deleted by MysqlRequest): shallow copy. + _tx = from._tx; + // _stmt is owned (deleted in the dtor): deep-copy to avoid double free. + if (_stmt != NULL) { + delete _stmt; + _stmt = NULL; + } + if (from._stmt != NULL) { + _stmt = new MysqlStatementStub(*from._stmt); + } +} + +void MysqlRequest::Swap(MysqlRequest* other) { + if (other != this) { + _buf.swap(other->_buf); + std::swap(_has_error, other->_has_error); + std::swap(_cached_size_, other->_cached_size_); + std::swap(_has_command, other->_has_command); + std::swap(_tx, other->_tx); + std::swap(_stmt, other->_stmt); + std::swap(_param_index, other->_param_index); + } +} + +bool MysqlRequest::SerializeTo(butil::IOBuf* buf) const { + if (_has_error) { + LOG(ERROR) << "Reject serialization due to error in CommandXXX[V]"; + return false; + } + *buf = _buf; + return true; +} + +bool MysqlRequest::Query(const butil::StringPiece& command) { + if (_has_error) { + return false; + } + + if (_has_command) { + return false; + } + + const butil::Status st = MysqlMakeCommand(&_buf, MYSQL_COM_QUERY, command); + if (st.ok()) { + _has_command = true; + return true; + } else { + CHECK(st.ok()) << st; + _has_error = true; + return false; + } +} + +bool MysqlRequest::AddParam(int8_t p) { + if (_has_error) { + return false; + } + if (_stmt == NULL || _stmt->stmt() == NULL) { + _has_error = true; + return false; + } + const butil::Status st = MysqlMakeExecuteData(_stmt, _param_index, &p, MYSQL_FIELD_TYPE_TINY); + if (st.ok()) { + ++_param_index; + return true; + } else { + CHECK(st.ok()) << st; + _has_error = true; + return false; + } +} +bool MysqlRequest::AddParam(uint8_t p) { + if (_stmt == NULL || _stmt->stmt() == NULL) { + _has_error = true; + return false; + } + const butil::Status st = + MysqlMakeExecuteData(_stmt, _param_index, &p, MYSQL_FIELD_TYPE_TINY, true); + if (st.ok()) { + ++_param_index; + return true; + } else { + CHECK(st.ok()) << st; + _has_error = true; + return false; + } +} +bool MysqlRequest::AddParam(int16_t p) { + if (_stmt == NULL || _stmt->stmt() == NULL) { + _has_error = true; + return false; + } + const butil::Status st = MysqlMakeExecuteData(_stmt, _param_index, &p, MYSQL_FIELD_TYPE_SHORT); + if (st.ok()) { + ++_param_index; + return true; + } else { + CHECK(st.ok()) << st; + _has_error = true; + return false; + } +} +bool MysqlRequest::AddParam(uint16_t p) { + if (_stmt == NULL || _stmt->stmt() == NULL) { + _has_error = true; + return false; + } + const butil::Status st = + MysqlMakeExecuteData(_stmt, _param_index, &p, MYSQL_FIELD_TYPE_SHORT, true); + if (st.ok()) { + ++_param_index; + return true; + } else { + CHECK(st.ok()) << st; + _has_error = true; + return false; + } +} +bool MysqlRequest::AddParam(int32_t p) { + if (_stmt == NULL || _stmt->stmt() == NULL) { + _has_error = true; + return false; + } + const butil::Status st = MysqlMakeExecuteData(_stmt, _param_index, &p, MYSQL_FIELD_TYPE_LONG); + if (st.ok()) { + ++_param_index; + return true; + } else { + CHECK(st.ok()) << st; + _has_error = true; + return false; + } +} +bool MysqlRequest::AddParam(uint32_t p) { + if (_stmt == NULL || _stmt->stmt() == NULL) { + _has_error = true; + return false; + } + const butil::Status st = + MysqlMakeExecuteData(_stmt, _param_index, &p, MYSQL_FIELD_TYPE_LONG, true); + if (st.ok()) { + ++_param_index; + return true; + } else { + CHECK(st.ok()) << st; + _has_error = true; + return false; + } +} +bool MysqlRequest::AddParam(int64_t p) { + if (_stmt == NULL || _stmt->stmt() == NULL) { + _has_error = true; + return false; + } + const butil::Status st = + MysqlMakeExecuteData(_stmt, _param_index, &p, MYSQL_FIELD_TYPE_LONGLONG); + if (st.ok()) { + ++_param_index; + return true; + } else { + CHECK(st.ok()) << st; + _has_error = true; + return false; + } +} +bool MysqlRequest::AddParam(uint64_t p) { + if (_stmt == NULL || _stmt->stmt() == NULL) { + _has_error = true; + return false; + } + const butil::Status st = + MysqlMakeExecuteData(_stmt, _param_index, &p, MYSQL_FIELD_TYPE_LONGLONG, true); + if (st.ok()) { + ++_param_index; + return true; + } else { + CHECK(st.ok()) << st; + _has_error = true; + return false; + } +} +bool MysqlRequest::AddParam(float p) { + if (_stmt == NULL || _stmt->stmt() == NULL) { + _has_error = true; + return false; + } + const butil::Status st = MysqlMakeExecuteData(_stmt, _param_index, &p, MYSQL_FIELD_TYPE_FLOAT); + if (st.ok()) { + ++_param_index; + return true; + } else { + CHECK(st.ok()) << st; + _has_error = true; + return false; + } +} +bool MysqlRequest::AddParam(double p) { + if (_stmt == NULL || _stmt->stmt() == NULL) { + _has_error = true; + return false; + } + const butil::Status st = MysqlMakeExecuteData(_stmt, _param_index, &p, MYSQL_FIELD_TYPE_DOUBLE); + if (st.ok()) { + ++_param_index; + return true; + } else { + CHECK(st.ok()) << st; + _has_error = true; + return false; + } +} +bool MysqlRequest::AddParam(const butil::StringPiece& p) { + if (_stmt == NULL || _stmt->stmt() == NULL) { + _has_error = true; + return false; + } + const butil::Status st = MysqlMakeExecuteData(_stmt, _param_index, &p, MYSQL_FIELD_TYPE_STRING); + if (st.ok()) { + ++_param_index; + return true; + } else { + CHECK(st.ok()) << st; + _has_error = true; + return false; + } +} + +void MysqlRequest::Print(std::ostream& os) const { + butil::IOBuf cp = _buf; + { + uint8_t buf[3]; + cp.cutn(buf, 3); + os << "size:" << mysql_uint3korr(buf) << ","; + } + { + uint8_t buf; + cp.cut1((char*)&buf); + os << "sequence:" << (unsigned)buf << ","; + } + os << "payload(hex):"; + while (!cp.empty()) { + uint8_t buf; + cp.cut1((char*)&buf); + os << std::hex << (unsigned)buf; + } +} + +std::ostream& operator<<(std::ostream& os, const MysqlRequest& r) { + r.Print(os); + return os; +} + +// =================================================================== + +#ifndef _MSC_VER +#endif // !_MSC_VER + +MysqlResponse::MysqlResponse() + : NonreflectableMessage<MysqlResponse>() { + SharedCtor(); +} + +MysqlResponse::MysqlResponse(const MysqlResponse& from) + : NonreflectableMessage<MysqlResponse>(from) { + SharedCtor(); + MergeFrom(from); +} + +void MysqlResponse::SharedCtor() { + _nreply = 0; + _cached_size_ = 0; +} + +MysqlResponse::~MysqlResponse() { + SharedDtor(); +} + +void MysqlResponse::SharedDtor() { +} + +void MysqlResponse::SetCachedSize(int size) const { + _cached_size_ = size; +} + +void MysqlResponse::Clear() { + // Reset all response state so a reused MysqlResponse does not return + // stale replies. Mirror what SharedCtor()/ctor initialize. + MysqlReply empty_reply; + _first_reply.Swap(empty_reply); + _other_replies.clear(); + _arena.clear(); + _nreply = 0; + _cached_size_ = 0; +} + +size_t MysqlResponse::ByteSizeLong() const { + return _cached_size_; +} + +void MysqlResponse::MergeFrom(const MysqlResponse& from) { + CHECK_NE(&from, this); +} Review Comment: Fixed — `MysqlResponse` is now explicitly non-copyable (deleted copy ctor + assignment), and `MergeFrom` is a hard `CHECK`-failure instead of a silent no-op, so an accidental copy/CopyFrom is caught rather than silently dropping parsed replies. ########## test/mysql/brpc_mysql_connection_type_unittest.cpp: ########## @@ -0,0 +1,373 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// =========================================================================== +// brpc MySQL-client CONNECTION-TYPE BOUNDARY integration test. +// +// PROVENANCE / CLEAN-ROOM NOTE +// ---------------------------- +// This is NOT derived from any upstream MySQL/MariaDB test suite. It asserts +// a brpc-ARCHITECTURE boundary: how a MySQL prepared statement (whose +// server-side handle is connection-scoped) interacts with brpc's +// CONNECTION_TYPE_SHORT (a fresh TCP connection per request). The data values +// are this file's own; no upstream test code or structure was copied. +// +// THE BOUNDARY (spec fact, asserted -- not derived from our impl) +// -------------------------------------------------------------- +// A MySQL prepared statement is created with COM_STMT_PREPARE on ONE TCP +// connection; the server returns a `stmt_id` that is valid ONLY on that exact +// connection. COM_STMT_EXECUTE must therefore run on the SAME connection. +// +// CONNECTION_TYPE_SHORT opens a brand-new TCP connection for every request and +// closes it afterwards, so there is no connection affinity across requests. +// Consequently an execute under SHORT cannot land on the connection that holds +// the prepared handle -- the prepare/execute affinity is broken by design. +// +// * PreparedStatementUnderShortMustError (PRIMARY): +// build a SHORT channel, prepare "SELECT ? AS v", bind one INT param, +// CallMethod. Must ERROR (cntl.Failed() OR reply(0).is_error()); must +// NOT return a correct result set; must NOT crash. Looped a few times. +// +// * PlainQueryUnderShortMustSucceed (POSITIVE CONTROL): +// same SHORT channel; a stateless COM_QUERY "SELECT 7 AS v" must SUCCEED +// and return 7. Proves SHORT is fine for stateless queries; only the +// connection-scoped prepared-statement handle breaks under SHORT. Review Comment: Addressed — the file-header comment now describes `PreparedStatementUnderShortRePreparesAndSucceeds` (success via transparent re-prepare on each new short connection), matching the actual test. ########## docs/cn/mysql_client.md: ########## @@ -0,0 +1,556 @@ +[MySQL](https://www.mysql.com/)是著名的开源的关系型数据库,为了使用户更快捷地访问mysql并充分利用bthread的并发能力,brpc直接支持mysql协议。示例程序:[example/mysql_c++](https://github.com/brpc/brpc/tree/master/example/mysql_c++/) + +**注意**:只支持MySQL 4.1 及之后的版本的文本协议,支持事务,不支持Prepared statement。目前支持的鉴权方式为mysql_native_password,使用事务的时候不支持single模式。 Review Comment: Addressed — the overview now states that prepared statements are supported, consistent with the Prepared Statement section. ########## src/brpc/policy/mysql/mysql_statement.cpp: ########## @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Authors: Yang,Liming ([email protected]) + +#include <vector> +#include <gflags/gflags.h> +#include "brpc/socket.h" +#include "brpc/policy/mysql/mysql_statement.h" + +namespace brpc { +DEFINE_int32(mysql_statment_map_size, + 100, + "Mysql statement map size, usually equal to max bthread number"); Review Comment: Fixed — the flag is spelled `mysql_statement_map_size` in the current revision (DEFINE + all uses). ########## src/brpc/policy/mysql/mysql.h: ########## @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Authors: Yang,Liming ([email protected]) + +#ifndef BRPC_MYSQL_H +#define BRPC_MYSQL_H + +#include <string> +#include <vector> + +#include "brpc/nonreflectable_message.h" +#include "brpc/pb_compat.h" +#include "butil/iobuf.h" +#include "butil/strings/string_piece.h" +#include "butil/arena.h" +#include "brpc/parse_result.h" +#include "brpc/policy/mysql/mysql_command.h" +#include "brpc/policy/mysql/mysql_reply.h" +#include "brpc/policy/mysql/mysql_transaction.h" +#include "brpc/policy/mysql/mysql_statement.h" + +namespace brpc { +// Request to mysql. +// Notice that you can pipeline multiple commands in one request and sent +// them to ONE mysql-server together. +// Example: +// MysqlRequest request; +// request.Query("select * from table"); +// MysqlResponse response; +// channel.CallMethod(NULL, &controller, &request, &response, NULL/*done*/); +// if (!cntl.Failed()) { +// LOG(INFO) << response.reply(0); +// } + +class MysqlStatementStub { +public: + MysqlStatementStub(MysqlStatement* stmt); + MysqlStatement* stmt(); + butil::IOBuf& execute_data(); + butil::Status PackExecuteCommand(butil::IOBuf* outbuf, uint32_t stmt_id); + // prepare statement null mask + struct NullMask { + NullMask() : area(butil::IOBuf::INVALID_AREA) {} + std::vector<uint8_t> mask; + butil::IOBuf::Area area; + }; + // prepare statement param types + struct ParamTypes { + ParamTypes() : area(butil::IOBuf::INVALID_AREA) {} + std::vector<uint8_t> types; + butil::IOBuf::Area area; + }; + // null mask and param types + NullMask& null_mask(); + ParamTypes& param_types(); + // save long data + void save_long_data(uint16_t param_id, const butil::StringPiece& value); + +private: + MysqlStatement* _stmt; + butil::IOBuf _execute_data; + NullMask _null_mask; + ParamTypes _param_types; + // long data + struct LongData { + uint16_t param_id; + butil::IOBuf long_data; + }; + std::vector<LongData> _long_data; +}; + +inline MysqlStatementStub::MysqlStatementStub(MysqlStatement* stmt) : _stmt(stmt) {} + +inline MysqlStatement* MysqlStatementStub::stmt() { + return _stmt; +} + +inline butil::IOBuf& MysqlStatementStub::execute_data() { + return _execute_data; +} + +inline MysqlStatementStub::NullMask& MysqlStatementStub::null_mask() { + return _null_mask; +} + +inline MysqlStatementStub::ParamTypes& MysqlStatementStub::param_types() { + return _param_types; +} + +inline void MysqlStatementStub::save_long_data(uint16_t param_id, const butil::StringPiece& value) { + LongData d; + d.param_id = param_id; + d.long_data.append(value.data(), value.size()); + _long_data.push_back(d); +} + +class MysqlRequest : public NonreflectableMessage<MysqlRequest> { +public: + MysqlRequest(); + MysqlRequest(const MysqlTransaction* tx); + MysqlRequest(MysqlStatement* stmt); + MysqlRequest(const MysqlTransaction* tx, MysqlStatement* stmt); + ~MysqlRequest() override; + MysqlRequest(const MysqlRequest& from); + inline MysqlRequest& operator=(const MysqlRequest& from) { + CopyFrom(from); + return *this; + } + void Swap(MysqlRequest* other); + + // Serialize the request into `buf'. Return true on success. + bool SerializeTo(butil::IOBuf* buf) const; + + // Protobuf methods. + void MergeFrom(const MysqlRequest& from) override; + void Clear() override; + + size_t ByteSizeLong() const override; + int GetCachedSize() const PB_425_OVERRIDE { + return _cached_size_; + } + + // call query command + bool Query(const butil::StringPiece& command); + // add statement params + bool AddParam(int8_t p); + bool AddParam(uint8_t p); + bool AddParam(int16_t p); + bool AddParam(uint16_t p); + bool AddParam(int32_t p); + bool AddParam(uint32_t p); + bool AddParam(int64_t p); + bool AddParam(uint64_t p); + bool AddParam(float p); + bool AddParam(double p); + bool AddParam(const butil::StringPiece& p); + + // True if previous command failed. + bool has_error() const { + return _has_error; + } + + const MysqlTransaction* get_tx() const { Review Comment: Done — renamed `get_tx()` to `tx()` and updated the callers. ########## src/brpc/policy/mysql/mysql.h: ########## @@ -0,0 +1,244 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Authors: Yang,Liming ([email protected]) + +#ifndef BRPC_MYSQL_H +#define BRPC_MYSQL_H + +#include <string> +#include <vector> + +#include "brpc/nonreflectable_message.h" +#include "brpc/pb_compat.h" +#include "butil/iobuf.h" +#include "butil/strings/string_piece.h" +#include "butil/arena.h" +#include "brpc/parse_result.h" +#include "brpc/policy/mysql/mysql_command.h" +#include "brpc/policy/mysql/mysql_reply.h" +#include "brpc/policy/mysql/mysql_transaction.h" +#include "brpc/policy/mysql/mysql_statement.h" + +namespace brpc { +// Request to mysql. +// Notice that you can pipeline multiple commands in one request and sent +// them to ONE mysql-server together. +// Example: +// MysqlRequest request; +// request.Query("select * from table"); +// MysqlResponse response; +// channel.CallMethod(NULL, &controller, &request, &response, NULL/*done*/); +// if (!cntl.Failed()) { +// LOG(INFO) << response.reply(0); +// } + +class MysqlStatementStub { +public: + MysqlStatementStub(MysqlStatement* stmt); + MysqlStatement* stmt(); + butil::IOBuf& execute_data(); + butil::Status PackExecuteCommand(butil::IOBuf* outbuf, uint32_t stmt_id); + // prepare statement null mask + struct NullMask { + NullMask() : area(butil::IOBuf::INVALID_AREA) {} + std::vector<uint8_t> mask; + butil::IOBuf::Area area; + }; + // prepare statement param types + struct ParamTypes { + ParamTypes() : area(butil::IOBuf::INVALID_AREA) {} + std::vector<uint8_t> types; + butil::IOBuf::Area area; + }; + // null mask and param types + NullMask& null_mask(); + ParamTypes& param_types(); + // save long data + void save_long_data(uint16_t param_id, const butil::StringPiece& value); + +private: + MysqlStatement* _stmt; + butil::IOBuf _execute_data; + NullMask _null_mask; + ParamTypes _param_types; + // long data + struct LongData { + uint16_t param_id; + butil::IOBuf long_data; + }; + std::vector<LongData> _long_data; +}; + +inline MysqlStatementStub::MysqlStatementStub(MysqlStatement* stmt) : _stmt(stmt) {} + +inline MysqlStatement* MysqlStatementStub::stmt() { + return _stmt; +} + +inline butil::IOBuf& MysqlStatementStub::execute_data() { + return _execute_data; +} + +inline MysqlStatementStub::NullMask& MysqlStatementStub::null_mask() { + return _null_mask; +} + +inline MysqlStatementStub::ParamTypes& MysqlStatementStub::param_types() { + return _param_types; +} + +inline void MysqlStatementStub::save_long_data(uint16_t param_id, const butil::StringPiece& value) { + LongData d; + d.param_id = param_id; + d.long_data.append(value.data(), value.size()); + _long_data.push_back(d); +} + +class MysqlRequest : public NonreflectableMessage<MysqlRequest> { +public: + MysqlRequest(); + MysqlRequest(const MysqlTransaction* tx); + MysqlRequest(MysqlStatement* stmt); + MysqlRequest(const MysqlTransaction* tx, MysqlStatement* stmt); + ~MysqlRequest() override; + MysqlRequest(const MysqlRequest& from); + inline MysqlRequest& operator=(const MysqlRequest& from) { + CopyFrom(from); + return *this; + } + void Swap(MysqlRequest* other); + + // Serialize the request into `buf'. Return true on success. + bool SerializeTo(butil::IOBuf* buf) const; + + // Protobuf methods. + void MergeFrom(const MysqlRequest& from) override; + void Clear() override; + + size_t ByteSizeLong() const override; + int GetCachedSize() const PB_425_OVERRIDE { + return _cached_size_; + } + + // call query command + bool Query(const butil::StringPiece& command); + // add statement params + bool AddParam(int8_t p); + bool AddParam(uint8_t p); + bool AddParam(int16_t p); + bool AddParam(uint16_t p); + bool AddParam(int32_t p); + bool AddParam(uint32_t p); + bool AddParam(int64_t p); + bool AddParam(uint64_t p); + bool AddParam(float p); + bool AddParam(double p); + bool AddParam(const butil::StringPiece& p); + + // True if previous command failed. + bool has_error() const { + return _has_error; + } + + const MysqlTransaction* get_tx() const { + return _tx; + } + + MysqlStatementStub* get_stmt() const { Review Comment: Done — renamed `get_stmt()` to `stmt()` and updated the callers. ########## src/brpc/controller.h: ########## @@ -915,6 +925,16 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Defined at both sides StreamSettings *_remote_stream_settings; + // Whether/how to reserve the sending socket after the RPC (mysql transactions). + BindSockAction _bind_sock_action; Review Comment: Done — the `BindSockAction` is now stored in two bits of `Controller::_flags` (`FLAGS_BIND_SOCK_RESERVE`/`FLAGS_BIND_SOCK_USE`) via `set_bind_sock_action()`/`bind_sock_action()`, instead of a dedicated member. No behavior change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
