jamesge commented on a change in pull request #692: Add Mysql Protocol, only
support text protocol now, not support trans…
URL: https://github.com/apache/incubator-brpc/pull/692#discussion_r266728338
##########
File path: src/brpc/mysql_reply.cpp
##########
@@ -0,0 +1,740 @@
+#include "brpc/mysql_reply.h"
+#include <ios>
+
+namespace brpc {
+
+#define MY_ERROR_RET(expr, message) \
+ do { \
+ if ((expr) == true) { \
+ LOG(INFO) << message; \
+ return false; \
+ } \
+ } while (0)
+
+#define MY_ALLOC_CHECK(expr) MY_ERROR_RET(!(expr), "Fail to arena allocate")
+#define MY_PARSE_CHECK(expr) \
+ MY_ERROR_RET(!(expr), "Fail to parse mysql protocol")
+template <class Type>
+inline bool my_alloc_check(butil::Arena* arena, const size_t n,
+ Type*& pointer) {
+ if (pointer == NULL) {
+ pointer = (Type*)arena->allocate(sizeof(Type) * n);
+ if (pointer == NULL) {
+ return false;
+ }
+ for (size_t i = 0; i < n; ++i) {
+ new (pointer + i) Type;
+ }
+ }
+ return true;
+}
+
+const char* MysqlFieldTypeToString(MysqlFieldType type) {
+ switch (type) {
+ case MYSQL_FIELD_TYPE_DECIMAL:
+ case MYSQL_FIELD_TYPE_TINY:
+ return "tiny";
+ case MYSQL_FIELD_TYPE_SHORT:
+ return "short";
+ case MYSQL_FIELD_TYPE_LONG:
+ return "long";
+ case MYSQL_FIELD_TYPE_FLOAT:
+ return "float";
+ case MYSQL_FIELD_TYPE_DOUBLE:
+ return "double";
+ case MYSQL_FIELD_TYPE_NULL:
+ return "null";
+ case MYSQL_FIELD_TYPE_TIMESTAMP:
+ return "timestamp";
+ case MYSQL_FIELD_TYPE_LONGLONG:
+ return "longlong";
+ case MYSQL_FIELD_TYPE_INT24:
+ return "int24";
+ case MYSQL_FIELD_TYPE_DATE:
+ return "date";
+ case MYSQL_FIELD_TYPE_TIME:
+ return "time";
+ case MYSQL_FIELD_TYPE_DATETIME:
+ return "datetime";
+ case MYSQL_FIELD_TYPE_YEAR:
+ return "year";
+ case MYSQL_FIELD_TYPE_NEWDATE:
+ return "new date";
+ case MYSQL_FIELD_TYPE_VARCHAR:
+ return "varchar";
+ case MYSQL_FIELD_TYPE_BIT:
+ return "bit";
+ case MYSQL_FIELD_TYPE_JSON:
+ return "json";
+ case MYSQL_FIELD_TYPE_NEWDECIMAL:
+ return "new decimal";
+ case MYSQL_FIELD_TYPE_ENUM:
+ return "enum";
+ case MYSQL_FIELD_TYPE_SET:
+ return "set";
+ case MYSQL_FIELD_TYPE_TINY_BLOB:
+ return "tiny blob";
+ case MYSQL_FIELD_TYPE_MEDIUM_BLOB:
+ return "blob";
+ case MYSQL_FIELD_TYPE_LONG_BLOB:
+ return "long blob";
+ case MYSQL_FIELD_TYPE_BLOB:
+ return "blob";
+ case MYSQL_FIELD_TYPE_VAR_STRING:
+ return "var string";
+ case MYSQL_FIELD_TYPE_STRING:
+ return "string";
+ case MYSQL_FIELD_TYPE_GEOMETRY:
+ return "geometry";
+ default:
+ return "Unknown Field Type";
+ }
+}
+
+const char* MysqlRspTypeToString(MysqlRspType type) {
+ switch (type) {
+ case MYSQL_RSP_OK:
+ return "ok";
+ case MYSQL_RSP_ERROR:
+ return "error";
+ case MYSQL_RSP_RESULTSET:
+ return "resultset";
+ case MYSQL_RSP_EOF:
+ return "eof";
+ case MYSQL_RSP_AUTH:
+ return "auth";
+ default:
+ return "Unknown Response Type";
+ }
+}
+
+bool ParseHeader(butil::IOBuf& buf, MysqlHeader* value) {
+ // check if the buf is contain a full package
+ uint8_t header[4];
+ const uint8_t* p = (const uint8_t*)buf.fetch(header, sizeof(header));
+ if (p == NULL) {
+ LOG(ERROR) << "fetch mysql protocol header failed";
+ return false;
+ }
+ uint32_t payload_size = mysql_uint3korr(p);
+ if (buf.size() < payload_size + 4) {
+ LOG(INFO) << "IOBuf not enough full mysql message buf size:"
+ << buf.size() << " message size:" << payload_size + 4;
+ return false;
+ }
+
+ {
+ uint8_t tmp[3];
+ buf.cutn(tmp, sizeof(tmp));
+ value->payload_size = mysql_uint3korr(tmp);
+ }
+ {
+ uint8_t tmp;
+ buf.cut1((char*)&tmp);
+ value->seq = tmp;
+ }
+ return true;
+}
+bool ParseEncodeLength(butil::IOBuf& buf, uint64_t* value) {
+ uint8_t f;
+ buf.cut1((char*)&f);
+ if (f <= 250) {
+ *value = f;
+ } else if (f == 251) {
+ *value = 0;
+ } else if (f == 252) {
+ uint8_t tmp[2];
+ buf.cutn(tmp, sizeof(tmp));
+ *value = mysql_uint2korr(tmp);
+ } else if (f == 253) {
+ uint8_t tmp[3];
+ buf.cutn(tmp, sizeof(tmp));
+ *value = mysql_uint3korr(tmp);
+ } else if (f == 254) {
+ uint8_t tmp[8];
+ buf.cutn(tmp, sizeof(tmp));
+ *value = mysql_uint8korr(tmp);
+ }
+ return true;
+}
+
+bool MysqlReply::ConsumePartialIOBuf(butil::IOBuf& buf, butil::Arena* arena,
+ const bool is_auth, bool* is_multi) {
+ *is_multi = false;
+ uint8_t header[5];
+ const uint8_t* p = (const uint8_t*)buf.fetch(header, sizeof(header));
+ if (p == NULL) {
+ return false;
+ }
+ uint8_t type = _type == MYSQL_RSP_UNKNOWN ? p[4] : (uint8_t)_type;
+ if (is_auth && type != 0x00 && type != 0xFF) {
+ _type = MYSQL_RSP_AUTH;
+ Auth* auth = NULL;
+ MY_ALLOC_CHECK(my_alloc_check(arena, 1, auth));
+ MY_PARSE_CHECK(auth->parse(buf, arena));
+ _data.auth = auth;
+ return true;
+ }
+ if (type == 0x00) {
+ _type = MYSQL_RSP_OK;
+ Ok* ok = NULL;
+ MY_ALLOC_CHECK(my_alloc_check(arena, 1, ok));
+ MY_PARSE_CHECK(ok->parse(buf, arena));
+ _data.ok = ok;
+ *is_multi = _data.ok->status() & MYSQL_SERVER_MORE_RESULTS_EXISTS;
+ } else if (type == 0xFF) {
+ _type = MYSQL_RSP_ERROR;
+ Error* error = NULL;
+ MY_ALLOC_CHECK(my_alloc_check(arena, 1, error));
+ MY_PARSE_CHECK(error->parse(buf, arena));
+ _data.error = error;
+ } else if (type == 0xFE) {
+ _type = MYSQL_RSP_EOF;
+ Eof* eof = NULL;
+ MY_ALLOC_CHECK(my_alloc_check(arena, 1, eof));
+ MY_PARSE_CHECK(eof->parse(buf));
+ _data.eof = eof;
+ *is_multi = _data.eof->status() & MYSQL_SERVER_MORE_RESULTS_EXISTS;
+ } else if (type >= 0x01 && type <= 0xFA) {
+ _type = MYSQL_RSP_RESULTSET;
+ MY_ALLOC_CHECK(my_alloc_check(arena, 1, _data.result_set.var));
+ ResultSet& r = *_data.result_set.var;
+ // parse header
+ MY_PARSE_CHECK(r._header.parse(buf));
+ // parse colunms
+ MY_ALLOC_CHECK(
+ my_alloc_check(arena, r._header._column_number, r._columns));
+ for (uint64_t i = 0; i < r._header._column_number; ++i) {
+ MY_PARSE_CHECK(r._columns[i].parse(buf, arena));
+ }
+ // parse eof1
+ MY_PARSE_CHECK(r._eof1.parse(buf));
+ // parse row
+ Eof eof;
+ std::vector<Row*> rows;
+ bool is_first = true;
+ while (!eof.isEof(buf)) {
+ if (is_first) {
+ // we may reenter ConsumePartialIOBuf many times, check the
last
+ // row
+ if (r._last != r._first) {
+ MY_PARSE_CHECK(r._last->parseText(buf));
+ for (uint64_t i = 0; i < r._header._column_number; ++i) {
+ MY_PARSE_CHECK(r._last->_fields[i].parse(
+ buf, r._columns + i, arena));
+ }
+ }
+ is_first = false;
+ continue;
+ }
+
+ Row* row = NULL;
+ Field* fields = NULL;
+ MY_ALLOC_CHECK(my_alloc_check(arena, 1, row));
+ MY_ALLOC_CHECK(
+ my_alloc_check(arena, r._header._column_number, fields));
+ row->_fields = fields;
+ row->_field_number = r._header._column_number;
+ r._last->_next = row;
+ r._last = row;
+ // parse row and fields
+ MY_PARSE_CHECK(row->parseText(buf));
+ for (uint64_t i = 0; i < r._header._column_number; ++i) {
+ MY_PARSE_CHECK(fields[i].parse(buf, r._columns + i, arena));
+ }
+ // row number
+ ++r._row_number;
+ }
+ // parse eof2
+ MY_PARSE_CHECK(r._eof2.parse(buf));
+ *is_multi = r._eof2.status() & MYSQL_SERVER_MORE_RESULTS_EXISTS;
+ } else {
+ LOG(ERROR) << "Unknown Response Type";
+ return false;
+ }
+ return true;
+}
+
+void MysqlReply::Print(std::ostream& os) const {
+ if (_type == MYSQL_RSP_AUTH) {
+ const Auth& auth = *_data.auth;
+ os << "\nprotocol:" << (unsigned)auth._protocol
+ << "\nversion:" << auth._version.as_string()
Review comment:
打印不需要加as_string
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]